PyTorch Tutorial DQN

The Theory

The machine (agent) makes an action \(a_t\) at time \(t\) to the environment (game), and the enviroment tells the machine two consequences: 1. The new state of the game, represented by the resulting game screen \(x_t\) 2. Reward \(r_t\) the machine gets

It is impossible to fully understand the current situation from only the current screen \(x_t\). We therefore consider sequences of actions and observations: \(s_t = x_1, a_1, x_2, ..., a_{t−1}, x_t\), and use this instead of \(x_t\) as the state of the game at \(t\)

The machine adjusts itself in order to maxmize future rewards, we define the future discounted return as

\(R_{t}=\sum\limits_{x=t}^T \gamma^{x-t} r_x= r_t+ \gamma R_{t'} \;\;\;\; (1)\)

where \(\gamma \in (0,1)\) is the discount: larger the discount, less the future matters. And we assume time is discrete so \(t'=t+1\), and \(T\) is the time when the game terminates

A policy \(\pi\) is set of rules to decide the action to make when any sequence is obeserved, mathematically, it is a map \(a=\pi(s)\) or a probability distribution \(a\sim\pi(s)\)

We define the optimal action-value function \(Q^*(s,a)\) as the maximum expected return achievable by following any strategy, given that some sequence \(s\) is seen and some action \(a\) is taken

\(Q^*(s, a) = \max\limits_{\pi} \Bbb{E} [R_t|s_t = s, a_t = a, \pi]\)

\(Q^*\) obeys the Bellman equation:

\(Q^*(s, a) = \Bbb{E}_{s'}[r + \gamma \max\limits_{a'} Q^*(s' , a')| s, a]\)

The basic idea behind many reinforcement learning algorithms is to estimate \(Q^*\), by using the Bellman equation as an iterative update

\(Q_i(s, a) =\Bbb{E}\_{s’}[r +\gamma\max\limits\ *{a’} Q*\ {i-1}(s’ , a’)| s, a] ;;;; (2)\)

It is proved in Reinforcement Learning: An Introduction that: \(Q_i \to Q^*\) as \(i \to \infty\)

When state and action space are large, it’s better to use a function with parameters \(\theta\) to approximate \(Q^*\), \(Q(s,a;\theta)\approx Q^*(s,a)\). Since neural networks are essentially function approximators, we use them!

In this case, the target function we want to approximate with our neural network is actually based on another network, as indicated by equation \((2)\), our network represents \(Q_i(s, a)\), while our target function is \(\Bbb{E}\ *{s’}[r + \gamma \max\limits\ {a’}\hat{Q}\ {i-1}(s’ , a’)| s, a]\), represented by the network \(\hat{Q}*\ {i-1}(s’ , a’)\). We call \(Q_i(s, a)\) our actual network (officially \(Q\) network), since this is the one we actually perform gradient descent on, whilst \(\hat{Q} \_{i-1}(s’ , a’)\) is called the target network, it is fixed except when we update it using the actual network. Read the algorithm below for clarification. We distinguish further the actual network with its parameters (weights and bias) \(\theta\_i\), from the target network with its parameters \(\theta_i^-\)

In this case the loss function at each iteration \(i\) is defined as

\(L_i(\theta_i)=\Bbb{E}_{(s,a,r)}[(y_i-Q(s,a;\theta_i))^2]\)

where \(y_i=\Bbb{E}_{s'}[r+\gamma \max\limits_{a'} \hat{Q} (s',a';\theta_i^-)|s,a]\) is the target function we want to approximate for iteration \(i\). Again, when adjusting the parameters \(\theta_i\) using gradient descent, \(\theta_i^-\) are kept fixed. The target network parameters \(\theta_i^-\) are only updated using the actual network parameters \(\theta_i\) every \(C\) iterations and are held fixed between individual updates. (Yes, we are not strictly following the iteration formula \((2)\), which updates the target network every iteration, instead we only update every \(C\) iterations, for stability!)

The gradient is simply:

\(\nabla_{\theta_i}L(\theta_i)=\Bbb{E}_{(s,a,r,s')}\{2[ r+\gamma \max\limits_{a'} \hat{Q} (s',a';\theta_i^-)- Q(s,a;\theta_i) ] \nabla_{\theta_i} Q(s,a;\theta_i) \}\)

We define an episode by a tuple \(e_t=(s_t,a_t,r_t,s_{t+1})\), we store all episodes in a replay memory \(D_t=\{e_1, ..., e_t\}\) And in the above expectation value, \((s,a,r,s')\sim U(D)\), i.e., \((s,a,r,s')\) is sampled from a uniform distribution over \(D\)

The Algorithm

In the following, image preprocessing function \(\phi\) is omitted from the original algorithm.

  • Initialize replay memory \(D\) to capacity \(N\)

  • Initialize actual network \(Q\) with random weights \(\theta\)

  • Initialize target network \(\hat{Q}\) with weights \(\theta^-=\theta\)

  • For episodes \(1\) to \(M\):

  • Initialize sequence \(s_1=(x_1)\)

  • For \(t=1\) to \(T\):

    • With probability \(\epsilon\) perform a random action \(a_t\) and \(1-\epsilon\) perform \(a_t=\arg\max\limits_a Q(s_t,a;\theta)\)

    • Get reward \(r_t\) and game image \(x_{t+1}\)

    • Set \(s_{t+1}=(s_t,a_t,x_{t+1})\)

    • Store transition \((s_t,a_t,r_t,s_{t+1})\) in \(D\)

    • Sample random minibatch of transitions \((s_j,a_j,r_j,s_{j+1})\) from \(D\)

    • If episode terminates at step \(j+1\):

      • Set \(y_j=r_j\)

    • else:

      • Set \(y_j=r_j+\gamma \max\limits_{a'}\hat{Q}(s_{j+1},a';\theta^-)\)

    • Perform a gradient descent step on \([y_j-Q(s_j,a_j;\theta)]^2\) w.r.t. \(\theta\)

    • Every \(C\) steps update \(\hat{Q}=Q\)

The PyTorch code

1. imports

import random
import numpy as np
from collections import namedtuple
from PIL import ImageGrab
import cv2
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T
import torchvision.transforms.functional as TTF
import pyautogui as pag

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

2. using namedtuple

Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))

a = Transition(1,2,3,4)

3. the ReplayMemory class

class ReplayMemory:
# implicitly inherits from the 'object' class in Python3
    def __init__(self, mem_size):
        self.mem_size = mem_size
        self.memory = [None for i in range(mem_size)]
        self.position = 0

    def push(self, *args):
        self.memory[self.position] = Transition(*args)
        self.position = (self.position+1)%self.mem_size

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

mem = ReplayMemory(5)
print(len(mem.memory))

mem.push(1,2,3,4)
mem.push(5,6,7,8)
print(mem.memory)

4.1 the neural network class without fully connected layer (finding parameter for fully connected layer)

class DQN(nn.Module):
# inherits from the 'nn.Module' class
    def __init__(self, chw, num_actions):
        assert chw == (3,50,100),"Resize your image! channel:3, height:50, width: 100"

        super(DQN, self).__init__()
        self.conv1 = nn.Conv2d(3, 16, kernel_size = 5, stride = 2)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16,32, kernel_size = 5, stride = 2)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32,32, kernel_size = 5, stride = 2)
        self.bn3 = nn.BatchNorm2d(32)

        self.fc1 = nn.Linear(864,num_actions)

    def forward(self,x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        return self.fc1(x.view(x.size(0),-1))

print('dqn:')
dqn = DQN((3,50,100),6)
x = torch.randn(3, 3, 50, 100)
x = dqn.forward(x)
print(x)

print('dqn2:')
dqn2 = DQN((2,50,100),6)
x = torch.randn(3, 3, 50, 100)
x = dqn.forward(x)
print(x)

4.3 the neural network class: Image processor added

class DQN(nn.Module):
# inherits from the 'nn.Module' class
    def __init__(self, chw, num_actions):
        super(DQN, self).__init__()
        self.chw = (3,50,100)
        assert chw == self.chw,"Resize your image! channel:3, height:50, width:100"
        self.conv1 = nn.Conv2d(3, 16, kernel_size = 5, stride = 2)
        self.bn1 = nn.BatchNorm2d(16)
        self.conv2 = nn.Conv2d(16,32, kernel_size = 5, stride = 2)
        self.bn2 = nn.BatchNorm2d(32)
        self.conv3 = nn.Conv2d(32,32, kernel_size = 5, stride = 2)
        self.bn3 = nn.BatchNorm2d(32)

        self.fc1 = nn.Linear(864,num_actions)

        self.ToTensor = T.Compose([T.ToTensor()])

    def forward(self,x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        return self.fc1(x.view(x.size(0),-1))

    def get_img(self,scr_region,display=True):
    # input: screen region, output: PyTorch tensor
        img = ImageGrab.grab(bbox=scr_region)
        img = TTF.resize(img,self.chw[1:])
        if display:
            img2 = np.array(img)
            cv2.imshow('window',img2)
            cv2.waitKey(5)
        img = self.ToTensor(img).unsqueeze(0).to(device)
        return img

dqn = DQN((3,50,100), 4)
game_scr = (0,300,900,1000)
while True:
    img = dqn.get_img(game_scr)
    print(img.shape)

5. the Agent class (the main algorithm as explained earlier)

Explanation to some important methods: “.detach()” and “with torch.no_grad()”

class Agent:
    def __init__(self,num_actions,mem_size ):
        self.BATCH_SIZE = 10
        self.GAMMA = 0.999
        self.EPS_START = 0.9
        self.EPS_END = 0.05
        self.EPS_DECAY = 200
        self.TARGET_UPDATE = 1
        self.num_actions = num_actions

        self.actual_net = DQN((3,50,100),num_actions).to(device)
        self.target_net = DQN((3,50,100),num_actions).to(device)
        self.target_net.load_state_dict(self.actual_net.state_dict())
        self.target_net.eval()

        self.optimizer = optim.RMSprop(self.actual_net.parameters())
        self.memory = ReplayMemory(mem_size+self.BATCH_SIZE)

        self.steps_done = 0

    def select_action(self,state):
        eps_threshold = self.EPS_END + (self.EPS_START - self.EPS_END\
            ) * math.exp(-1.*self.steps_done/self.EPS_DECAY)
        self.steps_done += 1
        if random.random() > eps_threshold:
            with torch.no_grad():
                return self.actual_net(state).max(1)[1].view(1,1)
        else:
            return torch.tensor([[random.randrange(self.num_actions)]],
                                device = device, dtype=torch.long)

    def optimize_model(self):
        transitions = self.memory.sample(self.BATCH_SIZE)
        # converts batch-array of Transitions to Transition of batch-arrays:
        batch = Transition(*zip(*transitions))
        print(batch)

        state_batch = torch.cat(batch.state)
        action_batch = torch.cat(batch.action)
        reward_batch = torch.cat(batch.reward)
        next_state_batch = torch.cat(batch.next_state)

        Q = self.actual_net(state_batch).gather(1,action_batch)
        max_a_Q_hat = self.target_net(next_state_batch).max(1)[0].detach()

        y = (reward_batch + self.GAMMA * max_a_Q_hat).unsqueeze(1)

        loss = F.smooth_l1_loss(Q, y)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

6. mygame class (this is totally up to you)

class mygame:
    def __init__(self):
        self.score_scr = (70,900,130,960)
        self.win_range = [10,12]
        self.lose_range = [20,22]
        self.action_list = ['up','down','left','right']
        self.num_actions = len(self.action_list)

    def step(self, action):
        pag.typewrite([self.action_list[action]])
        result_screen = np.array(ImageGrab.grab(bbox=self.score_scr))
        result = int(np.mean(result_screen))
        if result > self.win_range[0] and result < self.win_range[1]:
            reward = 1
        elif result > self.lose_range[0] and result < self.lose_range[1]:
            reward = -1
        else:
            reward = 0.1
        return reward*1.

7. the main loop

game_scr = (0,300,900,1000)
mygame = mygame()
num_actions = mygame.num_actions
mem_size = 10

agent = Agent(num_actions,mem_size)

last_scr = agent.actual_net.get_img(game_scr)
current_scr = agent.actual_net.get_img(game_scr)
state = current_scr - last_scr

# make sure the replay memory is initialized !
for t in range(mem_size + agent.BATCH_SIZE):
    action = agent.select_action(state)
    reward = mygame.step(action)
    reward = torch.tensor([reward],device = device)
    last_scr = current_scr
    current_scr = agent.actual_net.get_img(game_scr)
    next_state = current_scr-last_scr
    agent.memory.push(state, action, next_state, reward)
    state = next_state


for i in range(10):
    last_scr = agent.actual_net.get_img(game_scr)
    current_scr = agent.actual_net.get_img(game_scr)
    state = current_scr - last_scr
    for t in range(10):
        action = agent.select_action(state)
        reward = mygame.step(action)
        reward = torch.tensor([reward],device = device)
        last_scr = current_scr
        current_scr = agent.actual_net.get_img(game_scr)
        next_state = current_scr-last_scr
        agent.memory.push(state, action, next_state, reward)
        state = next_state

        agent.optimize_model()

    if i % agent.TARGET_UPDATE == 0:
        agent.target_net.load_state_dict(agent.actual_net.state_dict())

print("finished")

Code Rewrite

import torch; import torch.nn as nn; import torch.optim as optim
import torch.nn.functional as F
from collections import namedtuple
import math; import random;
import numpy as np; import matplotlib.pyplot as plt

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
Transition=namedtuple('Transition',
                      ('state','action','nextState','reward'))

class ReplayMemory:
  def __init__(self,memSize):
    self.memSize = memSize
    self.memory = []; self.position = 0

  def push(self,*args):
    if len(self.memory) < self.memSize: self.memory.append(None)
    self.memory[self.position] = Transition(*args)
    self.position = (self.position+1)%self.memSize

  def sample(self,batchSize):
    return ramdom.sample(self.memory,batchSize)

  def __len__(self):
    return len(self.memory)

class Net(nn.Module):
  def __init__(self,inSize,outSize):
    super(Net,self).__init__()
    self.linear1 = nn.Linear(inSize,inSize*2)
    self.linear2 = nn.Linear(inSize*2,outSize)

  def forward(self,x):
    x = x.view(x.size(0),-1)
    x = F.relu(self.linear1(x))
    return self.linear2(x)

class Trainer:
  def __init__(self,args):
    self.batchSize = args['batchSize']

    self.env = Env()
    inSize,outSize = self.env.stateSize, self.env.nActions

    self.Qnet = Net(inSize,outSize); self.QhatNet = Net(inSize,outSize)
    self.QhatNet.load_state_dict(self.Qnet.state_dict())
    self.QhatNet.eval()
    self.optimizer = optim.RMSprop(Qnet.parameters())
    self.memory = ReplayMemory(10000)
    self.stepsDone = 0

  def selectAction(self,state):
    self.stepsDone+=1
    epsA,epsB,epsDecay = 0.9,0.05,200
    if random.random() > epsB+(epsA-epsB)*\
      math.exp(-1.*self.stepsDone/epsDecay):
      with torch.no_grad():
        # max returns (values, indices), so max()[1] are indices
        return self.Qnet(state).max(dim=1)[1].view([1,1]).item()
    else: return torch.tensor([[random.randrange(self.env.nActions)]],dtype=torch.long).item()

  def optimize(self):
    gamma = 0.999
    if len(self.memory) < self.batchSize: return
    transitions = self.memory.sample(self.batchSize)
    batch = Transition(*zip(*transitions))
    states = torch.cat(batch.state)
    actions = torch.cat(batch.action)
    nextStates = torch.cat(batch.nextState)
    rewards = torch.cat(batch.reward)
    Q = self.Qnet(states).gather(dim=1,index=actions)
    # max returns (values, indices), so max()[0] are values
    y = rewards+gamma*self.QhatNet(nextStates).max(dim=1)[0].detach()
    loss = F.smooth_l1_loss(Q,y.unsqueeze(1))
    self.optimizer.zero_grad()
    loss.backward()
    for param in self.Qnet.parameters(): pram.grad.data.clamp_(-1,1)
    self.optimizer.step()

  def train(self,nGames,nStepsPerGame,nUpdate):
    for i in range(nGames):
      self.env.reset()
      state = self.env.getState()
      for k in range(nStepsPerGame):
        state,action,nextState,reward = self.env.step(self.selectAction(state))
        memory.push(state,action,nextState,reward)
        state = nextState
        self.optimize()
      if i % nUpdate==0:
        self.QhatNet.load_state_dict(self.Qnet.state_dict())

class Env:
  def __init__(self):
    self.state =
    self.actions =
    self.stateSize = self.state.shape[0]
    self.nActions = len(self.actions)

  def reset(self):

  def getState(self):

  def step(self):