PyTorch Tutorial DQN
The Theory
The machine (agent) makes an action \(a_t\) at time \(t\) to the environment (game), and the enviroment tells the machine two consequences: 1. The new state of the game, represented by the resulting game screen \(x_t\) 2. Reward \(r_t\) the machine gets
It is impossible to fully understand the current situation from only the current screen \(x_t\). We therefore consider sequences of actions and observations: \(s_t = x_1, a_1, x_2, ..., a_{t−1}, x_t\), and use this instead of \(x_t\) as the state of the game at \(t\)
The machine adjusts itself in order to maxmize future rewards, we define the future discounted return as
\(R_{t}=\sum\limits_{x=t}^T \gamma^{x-t} r_x= r_t+ \gamma R_{t'} \;\;\;\; (1)\)
where \(\gamma \in (0,1)\) is the discount: larger the discount, less the future matters. And we assume time is discrete so \(t'=t+1\), and \(T\) is the time when the game terminates
A policy \(\pi\) is set of rules to decide the action to make when any sequence is obeserved, mathematically, it is a map \(a=\pi(s)\) or a probability distribution \(a\sim\pi(s)\)
We define the optimal action-value function \(Q^*(s,a)\) as the maximum expected return achievable by following any strategy, given that some sequence \(s\) is seen and some action \(a\) is taken
\(Q^*(s, a) = \max\limits_{\pi} \Bbb{E} [R_t|s_t = s, a_t = a, \pi]\)
\(Q^*\) obeys the Bellman equation:
\(Q^*(s, a) = \Bbb{E}_{s'}[r + \gamma \max\limits_{a'} Q^*(s' , a')| s, a]\)
The basic idea behind many reinforcement learning algorithms is to estimate \(Q^*\), by using the Bellman equation as an iterative update
\(Q_i(s, a) =\Bbb{E}\_{s’}[r +\gamma\max\limits\ *{a’} Q*\ {i-1}(s’ , a’)| s, a] ;;;; (2)\)
It is proved in Reinforcement Learning: An Introduction that: \(Q_i \to Q^*\) as \(i \to \infty\)
When state and action space are large, it’s better to use a function with parameters \(\theta\) to approximate \(Q^*\), \(Q(s,a;\theta)\approx Q^*(s,a)\). Since neural networks are essentially function approximators, we use them!
In this case, the target function we want to approximate with our neural network is actually based on another network, as indicated by equation \((2)\), our network represents \(Q_i(s, a)\), while our target function is \(\Bbb{E}\ *{s’}[r + \gamma \max\limits\ {a’}\hat{Q}\ {i-1}(s’ , a’)| s, a]\), represented by the network \(\hat{Q}*\ {i-1}(s’ , a’)\). We call \(Q_i(s, a)\) our actual network (officially \(Q\) network), since this is the one we actually perform gradient descent on, whilst \(\hat{Q} \_{i-1}(s’ , a’)\) is called the target network, it is fixed except when we update it using the actual network. Read the algorithm below for clarification. We distinguish further the actual network with its parameters (weights and bias) \(\theta\_i\), from the target network with its parameters \(\theta_i^-\)
In this case the loss function at each iteration \(i\) is defined as
\(L_i(\theta_i)=\Bbb{E}_{(s,a,r)}[(y_i-Q(s,a;\theta_i))^2]\)
where \(y_i=\Bbb{E}_{s'}[r+\gamma \max\limits_{a'} \hat{Q} (s',a';\theta_i^-)|s,a]\) is the target function we want to approximate for iteration \(i\). Again, when adjusting the parameters \(\theta_i\) using gradient descent, \(\theta_i^-\) are kept fixed. The target network parameters \(\theta_i^-\) are only updated using the actual network parameters \(\theta_i\) every \(C\) iterations and are held fixed between individual updates. (Yes, we are not strictly following the iteration formula \((2)\), which updates the target network every iteration, instead we only update every \(C\) iterations, for stability!)
The gradient is simply:
\(\nabla_{\theta_i}L(\theta_i)=\Bbb{E}_{(s,a,r,s')}\{2[ r+\gamma \max\limits_{a'} \hat{Q} (s',a';\theta_i^-)- Q(s,a;\theta_i) ] \nabla_{\theta_i} Q(s,a;\theta_i) \}\)
We define an episode by a tuple \(e_t=(s_t,a_t,r_t,s_{t+1})\), we store all episodes in a replay memory \(D_t=\{e_1, ..., e_t\}\) And in the above expectation value, \((s,a,r,s')\sim U(D)\), i.e., \((s,a,r,s')\) is sampled from a uniform distribution over \(D\)
The Algorithm
In the following, image preprocessing function \(\phi\) is omitted from the original algorithm.
Initialize replay memory \(D\) to capacity \(N\)
Initialize actual network \(Q\) with random weights \(\theta\)
Initialize target network \(\hat{Q}\) with weights \(\theta^-=\theta\)
For episodes \(1\) to \(M\):
Initialize sequence \(s_1=(x_1)\)
For \(t=1\) to \(T\):
With probability \(\epsilon\) perform a random action \(a_t\) and \(1-\epsilon\) perform \(a_t=\arg\max\limits_a Q(s_t,a;\theta)\)
Get reward \(r_t\) and game image \(x_{t+1}\)
Set \(s_{t+1}=(s_t,a_t,x_{t+1})\)
Store transition \((s_t,a_t,r_t,s_{t+1})\) in \(D\)
Sample random minibatch of transitions \((s_j,a_j,r_j,s_{j+1})\) from \(D\)
If episode terminates at step \(j+1\):
Set \(y_j=r_j\)
else:
Set \(y_j=r_j+\gamma \max\limits_{a'}\hat{Q}(s_{j+1},a';\theta^-)\)
Perform a gradient descent step on \([y_j-Q(s_j,a_j;\theta)]^2\) w.r.t. \(\theta\)
Every \(C\) steps update \(\hat{Q}=Q\)
The PyTorch code
1. imports
import random
import numpy as np
from collections import namedtuple
from PIL import ImageGrab
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import torchvision.transforms.functional as TTF
import pyautogui as pag
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
2. using namedtuple
Transition = namedtuple('Transition',
('state', 'action', 'next_state', 'reward'))
a = Transition(1,2,3,4)
3. the ReplayMemory class
class ReplayMemory:
# implicitly inherits from the 'object' class in Python3
def __init__(self, mem_size):
self.mem_size = mem_size
self.memory = [None for i in range(mem_size)]
self.position = 0
def push(self, *args):
self.memory[self.position] = Transition(*args)
self.position = (self.position+1)%self.mem_size
def sample(self, batch_size):
return random.sample(self.memory, batch_size)
mem = ReplayMemory(5)
print(len(mem.memory))
mem.push(1,2,3,4)
mem.push(5,6,7,8)
print(mem.memory)
4.1 the neural network class without fully connected layer (finding parameter for fully connected layer)
class DQN(nn.Module):
# inherits from the 'nn.Module' class
def __init__(self, chw, num_actions):
assert chw == (3,50,100),"Resize your image! channel:3, height:50, width: 100"
super(DQN, self).__init__()
self.conv1 = nn.Conv2d(3, 16, kernel_size = 5, stride = 2)
self.bn1 = nn.BatchNorm2d(16)
self.conv2 = nn.Conv2d(16,32, kernel_size = 5, stride = 2)
self.bn2 = nn.BatchNorm2d(32)
self.conv3 = nn.Conv2d(32,32, kernel_size = 5, stride = 2)
self.bn3 = nn.BatchNorm2d(32)
self.fc1 = nn.Linear(864,num_actions)
def forward(self,x):
x = F.relu(self.bn1(self.conv1(x)))
x = F.relu(self.bn2(self.conv2(x)))
x = F.relu(self.bn3(self.conv3(x)))
return self.fc1(x.view(x.size(0),-1))
print('dqn:')
dqn = DQN((3,50,100),6)
x = torch.randn(3, 3, 50, 100)
x = dqn.forward(x)
print(x)
print('dqn2:')
dqn2 = DQN((2,50,100),6)
x = torch.randn(3, 3, 50, 100)
x = dqn.forward(x)
print(x)
4.3 the neural network class: Image processor added
class DQN(nn.Module):
# inherits from the 'nn.Module' class
def __init__(self, chw, num_actions):
super(DQN, self).__init__()
self.chw = (3,50,100)
assert chw == self.chw,"Resize your image! channel:3, height:50, width:100"
self.conv1 = nn.Conv2d(3, 16, kernel_size = 5, stride = 2)
self.bn1 = nn.BatchNorm2d(16)
self.conv2 = nn.Conv2d(16,32, kernel_size = 5, stride = 2)
self.bn2 = nn.BatchNorm2d(32)
self.conv3 = nn.Conv2d(32,32, kernel_size = 5, stride = 2)
self.bn3 = nn.BatchNorm2d(32)
self.fc1 = nn.Linear(864,num_actions)
self.ToTensor = T.Compose([T.ToTensor()])
def forward(self,x):
x = F.relu(self.bn1(self.conv1(x)))
x = F.relu(self.bn2(self.conv2(x)))
x = F.relu(self.bn3(self.conv3(x)))
return self.fc1(x.view(x.size(0),-1))
def get_img(self,scr_region,display=True):
# input: screen region, output: PyTorch tensor
img = ImageGrab.grab(bbox=scr_region)
img = TTF.resize(img,self.chw[1:])
if display:
img2 = np.array(img)
cv2.imshow('window',img2)
cv2.waitKey(5)
img = self.ToTensor(img).unsqueeze(0).to(device)
return img
dqn = DQN((3,50,100), 4)
game_scr = (0,300,900,1000)
while True:
img = dqn.get_img(game_scr)
print(img.shape)
5. the Agent class (the main algorithm as explained earlier)
Explanation to some important methods: “.detach()” and “with torch.no_grad()”
class Agent:
def __init__(self,num_actions,mem_size ):
self.BATCH_SIZE = 10
self.GAMMA = 0.999
self.EPS_START = 0.9
self.EPS_END = 0.05
self.EPS_DECAY = 200
self.TARGET_UPDATE = 1
self.num_actions = num_actions
self.actual_net = DQN((3,50,100),num_actions).to(device)
self.target_net = DQN((3,50,100),num_actions).to(device)
self.target_net.load_state_dict(self.actual_net.state_dict())
self.target_net.eval()
self.optimizer = optim.RMSprop(self.actual_net.parameters())
self.memory = ReplayMemory(mem_size+self.BATCH_SIZE)
self.steps_done = 0
def select_action(self,state):
eps_threshold = self.EPS_END + (self.EPS_START - self.EPS_END\
) * math.exp(-1.*self.steps_done/self.EPS_DECAY)
self.steps_done += 1
if random.random() > eps_threshold:
with torch.no_grad():
return self.actual_net(state).max(1)[1].view(1,1)
else:
return torch.tensor([[random.randrange(self.num_actions)]],
device = device, dtype=torch.long)
def optimize_model(self):
transitions = self.memory.sample(self.BATCH_SIZE)
# converts batch-array of Transitions to Transition of batch-arrays:
batch = Transition(*zip(*transitions))
print(batch)
state_batch = torch.cat(batch.state)
action_batch = torch.cat(batch.action)
reward_batch = torch.cat(batch.reward)
next_state_batch = torch.cat(batch.next_state)
Q = self.actual_net(state_batch).gather(1,action_batch)
max_a_Q_hat = self.target_net(next_state_batch).max(1)[0].detach()
y = (reward_batch + self.GAMMA * max_a_Q_hat).unsqueeze(1)
loss = F.smooth_l1_loss(Q, y)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
6. mygame class (this is totally up to you)
class mygame:
def __init__(self):
self.score_scr = (70,900,130,960)
self.win_range = [10,12]
self.lose_range = [20,22]
self.action_list = ['up','down','left','right']
self.num_actions = len(self.action_list)
def step(self, action):
pag.typewrite([self.action_list[action]])
result_screen = np.array(ImageGrab.grab(bbox=self.score_scr))
result = int(np.mean(result_screen))
if result > self.win_range[0] and result < self.win_range[1]:
reward = 1
elif result > self.lose_range[0] and result < self.lose_range[1]:
reward = -1
else:
reward = 0.1
return reward*1.
7. the main loop
game_scr = (0,300,900,1000)
mygame = mygame()
num_actions = mygame.num_actions
mem_size = 10
agent = Agent(num_actions,mem_size)
last_scr = agent.actual_net.get_img(game_scr)
current_scr = agent.actual_net.get_img(game_scr)
state = current_scr - last_scr
# make sure the replay memory is initialized !
for t in range(mem_size + agent.BATCH_SIZE):
action = agent.select_action(state)
reward = mygame.step(action)
reward = torch.tensor([reward],device = device)
last_scr = current_scr
current_scr = agent.actual_net.get_img(game_scr)
next_state = current_scr-last_scr
agent.memory.push(state, action, next_state, reward)
state = next_state
for i in range(10):
last_scr = agent.actual_net.get_img(game_scr)
current_scr = agent.actual_net.get_img(game_scr)
state = current_scr - last_scr
for t in range(10):
action = agent.select_action(state)
reward = mygame.step(action)
reward = torch.tensor([reward],device = device)
last_scr = current_scr
current_scr = agent.actual_net.get_img(game_scr)
next_state = current_scr-last_scr
agent.memory.push(state, action, next_state, reward)
state = next_state
agent.optimize_model()
if i % agent.TARGET_UPDATE == 0:
agent.target_net.load_state_dict(agent.actual_net.state_dict())
print("finished")
Code Rewrite
import torch; import torch.nn as nn; import torch.optim as optim
import torch.nn.functional as F
from collections import namedtuple
import math; import random;
import numpy as np; import matplotlib.pyplot as plt
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
Transition=namedtuple('Transition',
('state','action','nextState','reward'))
class ReplayMemory:
def __init__(self,memSize):
self.memSize = memSize
self.memory = []; self.position = 0
def push(self,*args):
if len(self.memory) < self.memSize: self.memory.append(None)
self.memory[self.position] = Transition(*args)
self.position = (self.position+1)%self.memSize
def sample(self,batchSize):
return ramdom.sample(self.memory,batchSize)
def __len__(self):
return len(self.memory)
class Net(nn.Module):
def __init__(self,inSize,outSize):
super(Net,self).__init__()
self.linear1 = nn.Linear(inSize,inSize*2)
self.linear2 = nn.Linear(inSize*2,outSize)
def forward(self,x):
x = x.view(x.size(0),-1)
x = F.relu(self.linear1(x))
return self.linear2(x)
class Trainer:
def __init__(self,args):
self.batchSize = args['batchSize']
self.env = Env()
inSize,outSize = self.env.stateSize, self.env.nActions
self.Qnet = Net(inSize,outSize); self.QhatNet = Net(inSize,outSize)
self.QhatNet.load_state_dict(self.Qnet.state_dict())
self.QhatNet.eval()
self.optimizer = optim.RMSprop(Qnet.parameters())
self.memory = ReplayMemory(10000)
self.stepsDone = 0
def selectAction(self,state):
self.stepsDone+=1
epsA,epsB,epsDecay = 0.9,0.05,200
if random.random() > epsB+(epsA-epsB)*\
math.exp(-1.*self.stepsDone/epsDecay):
with torch.no_grad():
# max returns (values, indices), so max()[1] are indices
return self.Qnet(state).max(dim=1)[1].view([1,1]).item()
else: return torch.tensor([[random.randrange(self.env.nActions)]],dtype=torch.long).item()
def optimize(self):
gamma = 0.999
if len(self.memory) < self.batchSize: return
transitions = self.memory.sample(self.batchSize)
batch = Transition(*zip(*transitions))
states = torch.cat(batch.state)
actions = torch.cat(batch.action)
nextStates = torch.cat(batch.nextState)
rewards = torch.cat(batch.reward)
Q = self.Qnet(states).gather(dim=1,index=actions)
# max returns (values, indices), so max()[0] are values
y = rewards+gamma*self.QhatNet(nextStates).max(dim=1)[0].detach()
loss = F.smooth_l1_loss(Q,y.unsqueeze(1))
self.optimizer.zero_grad()
loss.backward()
for param in self.Qnet.parameters(): pram.grad.data.clamp_(-1,1)
self.optimizer.step()
def train(self,nGames,nStepsPerGame,nUpdate):
for i in range(nGames):
self.env.reset()
state = self.env.getState()
for k in range(nStepsPerGame):
state,action,nextState,reward = self.env.step(self.selectAction(state))
memory.push(state,action,nextState,reward)
state = nextState
self.optimize()
if i % nUpdate==0:
self.QhatNet.load_state_dict(self.Qnet.state_dict())
class Env:
def __init__(self):
self.state =
self.actions =
self.stateSize = self.state.shape[0]
self.nActions = len(self.actions)
def reset(self):
def getState(self):
def step(self):