PyTorch Tutorial DQN ========================= The Theory ---------- The machine (agent) makes an action :math:`a_t` at time :math:`t` to the environment (game), and the enviroment tells the machine two consequences: 1. The new state of the game, represented by the resulting game screen :math:`x_t` 2. Reward :math:`r_t` the machine gets It is impossible to fully understand the current situation from only the current screen :math:`x_t`. We therefore consider sequences of actions and observations: :math:`s_t = x_1, a_1, x_2, ..., a_{t−1}, x_t`, and use this instead of :math:`x_t` as the state of the game at :math:`t` The machine adjusts itself in order to maxmize future rewards, we define the future discounted return as :math:`R_{t}=\sum\limits_{x=t}^T \gamma^{x-t} r_x= r_t+ \gamma R_{t'} \;\;\;\; (1)` where :math:`\gamma \in (0,1)` is the discount: larger the discount, less the future matters. And we assume time is discrete so :math:`t'=t+1`, and :math:`T` is the time when the game terminates A policy :math:`\pi` is set of rules to decide the action to make when any sequence is obeserved, mathematically, it is a map :math:`a=\pi(s)` or a probability distribution :math:`a\sim\pi(s)` We define the optimal action-value function :math:`Q^*(s,a)` as the maximum expected return achievable by following any strategy, given that some sequence :math:`s` is seen and some action :math:`a` is taken :math:`Q^*(s, a) = \max\limits_{\pi} \Bbb{E} [R_t|s_t = s, a_t = a, \pi]` :math:`Q^*` obeys the Bellman equation: :math:`Q^*(s, a) = \Bbb{E}_{s'}[r + \gamma \max\limits_{a'} Q^*(s' , a')| s, a]` The basic idea behind many reinforcement learning algorithms is to estimate :math:`Q^*`, by using the Bellman equation as an iterative update :math:`Q_i(s, a) =\Bbb{E}\_{s’}[r +\gamma\max\limits\ *{a’} Q*\ {i-1}(s’ , a’)| s, a] ;;;; (2)` It is proved in Reinforcement Learning: An Introduction that: :math:`Q_i \to Q^*` as :math:`i \to \infty` When state and action space are large, it’s better to use a function with parameters :math:`\theta` to approximate :math:`Q^*`, :math:`Q(s,a;\theta)\approx Q^*(s,a)`. Since neural networks are essentially function approximators, we use them! In this case, the target function we want to approximate with our neural network is actually based on another network, as indicated by equation :math:`(2)`, our network represents :math:`Q_i(s, a)`, while our target function is :math:`\Bbb{E}\ *{s’}[r + \gamma \max\limits\ {a’}\hat{Q}\ {i-1}(s’ , a’)| s, a]`, represented by the network :math:`\hat{Q}*\ {i-1}(s’ , a’)`. We call :math:`Q_i(s, a)` our actual network (officially :math:`Q` network), since this is the one we actually perform gradient descent on, whilst :math:`\hat{Q} \_{i-1}(s’ , a’)` is called the target network, it is fixed except when we update it using the actual network. Read the algorithm below for clarification. We distinguish further the actual network with its parameters (weights and bias) :math:`\theta\_i`, from the target network with its parameters :math:`\theta_i^-` In this case the loss function at each iteration :math:`i` is defined as :math:`L_i(\theta_i)=\Bbb{E}_{(s,a,r)}[(y_i-Q(s,a;\theta_i))^2]` where :math:`y_i=\Bbb{E}_{s'}[r+\gamma \max\limits_{a'} \hat{Q} (s',a';\theta_i^-)|s,a]` is the target function we want to approximate for iteration :math:`i`. Again, when adjusting the parameters :math:`\theta_i` using gradient descent, :math:`\theta_i^-` are kept fixed. The target network parameters :math:`\theta_i^-` are only updated using the actual network parameters :math:`\theta_i` every :math:`C` iterations and are held fixed between individual updates. (Yes, we are not strictly following the iteration formula :math:`(2)`, which updates the target network every iteration, instead we only update every :math:`C` iterations, for stability!) The gradient is simply: :math:`\nabla_{\theta_i}L(\theta_i)=\Bbb{E}_{(s,a,r,s')}\{2[ r+\gamma \max\limits_{a'} \hat{Q} (s',a';\theta_i^-)- Q(s,a;\theta_i) ] \nabla_{\theta_i} Q(s,a;\theta_i) \}` We define an episode by a tuple :math:`e_t=(s_t,a_t,r_t,s_{t+1})`, we store all episodes in a replay memory :math:`D_t=\{e_1, ..., e_t\}` And in the above expectation value, :math:`(s,a,r,s')\sim U(D)`, i.e., :math:`(s,a,r,s')` is sampled from a uniform distribution over :math:`D` The Algorithm ------------- In the following, image preprocessing function :math:`\phi` is omitted from the original algorithm. - Initialize replay memory :math:`D` to capacity :math:`N` - Initialize actual network :math:`Q` with random weights :math:`\theta` - Initialize target network :math:`\hat{Q}` with weights :math:`\theta^-=\theta` - For episodes :math:`1` to :math:`M`: - Initialize sequence :math:`s_1=(x_1)` - For :math:`t=1` to :math:`T`: - With probability :math:`\epsilon` perform a random action :math:`a_t` and :math:`1-\epsilon` perform :math:`a_t=\arg\max\limits_a Q(s_t,a;\theta)` - Get reward :math:`r_t` and game image :math:`x_{t+1}` - Set :math:`s_{t+1}=(s_t,a_t,x_{t+1})` - Store transition :math:`(s_t,a_t,r_t,s_{t+1})` in :math:`D` - Sample random minibatch of transitions :math:`(s_j,a_j,r_j,s_{j+1})` from :math:`D` - If episode terminates at step :math:`j+1`: - Set :math:`y_j=r_j` - else: - Set :math:`y_j=r_j+\gamma \max\limits_{a'}\hat{Q}(s_{j+1},a';\theta^-)` - Perform a gradient descent step on :math:`[y_j-Q(s_j,a_j;\theta)]^2` w.r.t. :math:`\theta` - Every :math:`C` steps update :math:`\hat{Q}=Q` The PyTorch code ---------------- 1. imports ~~~~~~~~~~ .. code:: python import random import numpy as np from collections import namedtuple from PIL import ImageGrab import cv2 import torch import torch.nn as nn import torch.optim as optim import torch.nn.functional as F import torchvision.transforms as T import torchvision.transforms.functional as TTF import pyautogui as pag device = torch.device("cuda" if torch.cuda.is_available() else "cpu") 2. using namedtuple ~~~~~~~~~~~~~~~~~~~ .. code:: python Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward')) a = Transition(1,2,3,4) 3. the ReplayMemory class ~~~~~~~~~~~~~~~~~~~~~~~~~ .. code:: python class ReplayMemory: # implicitly inherits from the 'object' class in Python3 def __init__(self, mem_size): self.mem_size = mem_size self.memory = [None for i in range(mem_size)] self.position = 0 def push(self, *args): self.memory[self.position] = Transition(*args) self.position = (self.position+1)%self.mem_size def sample(self, batch_size): return random.sample(self.memory, batch_size) mem = ReplayMemory(5) print(len(mem.memory)) mem.push(1,2,3,4) mem.push(5,6,7,8) print(mem.memory) 4.1 the neural network class without fully connected layer (finding parameter for fully connected layer) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code:: python class DQN(nn.Module): # inherits from the 'nn.Module' class def __init__(self, chw, num_actions): assert chw == (3,50,100),"Resize your image! channel:3, height:50, width: 100" super(DQN, self).__init__() self.conv1 = nn.Conv2d(3, 16, kernel_size = 5, stride = 2) self.bn1 = nn.BatchNorm2d(16) self.conv2 = nn.Conv2d(16,32, kernel_size = 5, stride = 2) self.bn2 = nn.BatchNorm2d(32) self.conv3 = nn.Conv2d(32,32, kernel_size = 5, stride = 2) self.bn3 = nn.BatchNorm2d(32) self.fc1 = nn.Linear(864,num_actions) def forward(self,x): x = F.relu(self.bn1(self.conv1(x))) x = F.relu(self.bn2(self.conv2(x))) x = F.relu(self.bn3(self.conv3(x))) return self.fc1(x.view(x.size(0),-1)) print('dqn:') dqn = DQN((3,50,100),6) x = torch.randn(3, 3, 50, 100) x = dqn.forward(x) print(x) print('dqn2:') dqn2 = DQN((2,50,100),6) x = torch.randn(3, 3, 50, 100) x = dqn.forward(x) print(x) 4.3 the neural network class: Image processor added ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code:: python class DQN(nn.Module): # inherits from the 'nn.Module' class def __init__(self, chw, num_actions): super(DQN, self).__init__() self.chw = (3,50,100) assert chw == self.chw,"Resize your image! channel:3, height:50, width:100" self.conv1 = nn.Conv2d(3, 16, kernel_size = 5, stride = 2) self.bn1 = nn.BatchNorm2d(16) self.conv2 = nn.Conv2d(16,32, kernel_size = 5, stride = 2) self.bn2 = nn.BatchNorm2d(32) self.conv3 = nn.Conv2d(32,32, kernel_size = 5, stride = 2) self.bn3 = nn.BatchNorm2d(32) self.fc1 = nn.Linear(864,num_actions) self.ToTensor = T.Compose([T.ToTensor()]) def forward(self,x): x = F.relu(self.bn1(self.conv1(x))) x = F.relu(self.bn2(self.conv2(x))) x = F.relu(self.bn3(self.conv3(x))) return self.fc1(x.view(x.size(0),-1)) def get_img(self,scr_region,display=True): # input: screen region, output: PyTorch tensor img = ImageGrab.grab(bbox=scr_region) img = TTF.resize(img,self.chw[1:]) if display: img2 = np.array(img) cv2.imshow('window',img2) cv2.waitKey(5) img = self.ToTensor(img).unsqueeze(0).to(device) return img dqn = DQN((3,50,100), 4) game_scr = (0,300,900,1000) while True: img = dqn.get_img(game_scr) print(img.shape) 5. the Agent class (the main algorithm as explained earlier) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Explanation to some important methods: `“.detach()” and “with torch.no_grad()” `__ .. code:: python class Agent: def __init__(self,num_actions,mem_size ): self.BATCH_SIZE = 10 self.GAMMA = 0.999 self.EPS_START = 0.9 self.EPS_END = 0.05 self.EPS_DECAY = 200 self.TARGET_UPDATE = 1 self.num_actions = num_actions self.actual_net = DQN((3,50,100),num_actions).to(device) self.target_net = DQN((3,50,100),num_actions).to(device) self.target_net.load_state_dict(self.actual_net.state_dict()) self.target_net.eval() self.optimizer = optim.RMSprop(self.actual_net.parameters()) self.memory = ReplayMemory(mem_size+self.BATCH_SIZE) self.steps_done = 0 def select_action(self,state): eps_threshold = self.EPS_END + (self.EPS_START - self.EPS_END\ ) * math.exp(-1.*self.steps_done/self.EPS_DECAY) self.steps_done += 1 if random.random() > eps_threshold: with torch.no_grad(): return self.actual_net(state).max(1)[1].view(1,1) else: return torch.tensor([[random.randrange(self.num_actions)]], device = device, dtype=torch.long) def optimize_model(self): transitions = self.memory.sample(self.BATCH_SIZE) # converts batch-array of Transitions to Transition of batch-arrays: batch = Transition(*zip(*transitions)) print(batch) state_batch = torch.cat(batch.state) action_batch = torch.cat(batch.action) reward_batch = torch.cat(batch.reward) next_state_batch = torch.cat(batch.next_state) Q = self.actual_net(state_batch).gather(1,action_batch) max_a_Q_hat = self.target_net(next_state_batch).max(1)[0].detach() y = (reward_batch + self.GAMMA * max_a_Q_hat).unsqueeze(1) loss = F.smooth_l1_loss(Q, y) self.optimizer.zero_grad() loss.backward() self.optimizer.step() 6. mygame class (this is totally up to you) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code:: python class mygame: def __init__(self): self.score_scr = (70,900,130,960) self.win_range = [10,12] self.lose_range = [20,22] self.action_list = ['up','down','left','right'] self.num_actions = len(self.action_list) def step(self, action): pag.typewrite([self.action_list[action]]) result_screen = np.array(ImageGrab.grab(bbox=self.score_scr)) result = int(np.mean(result_screen)) if result > self.win_range[0] and result < self.win_range[1]: reward = 1 elif result > self.lose_range[0] and result < self.lose_range[1]: reward = -1 else: reward = 0.1 return reward*1. 7. the main loop ~~~~~~~~~~~~~~~~ .. code:: python game_scr = (0,300,900,1000) mygame = mygame() num_actions = mygame.num_actions mem_size = 10 agent = Agent(num_actions,mem_size) last_scr = agent.actual_net.get_img(game_scr) current_scr = agent.actual_net.get_img(game_scr) state = current_scr - last_scr # make sure the replay memory is initialized ! for t in range(mem_size + agent.BATCH_SIZE): action = agent.select_action(state) reward = mygame.step(action) reward = torch.tensor([reward],device = device) last_scr = current_scr current_scr = agent.actual_net.get_img(game_scr) next_state = current_scr-last_scr agent.memory.push(state, action, next_state, reward) state = next_state for i in range(10): last_scr = agent.actual_net.get_img(game_scr) current_scr = agent.actual_net.get_img(game_scr) state = current_scr - last_scr for t in range(10): action = agent.select_action(state) reward = mygame.step(action) reward = torch.tensor([reward],device = device) last_scr = current_scr current_scr = agent.actual_net.get_img(game_scr) next_state = current_scr-last_scr agent.memory.push(state, action, next_state, reward) state = next_state agent.optimize_model() if i % agent.TARGET_UPDATE == 0: agent.target_net.load_state_dict(agent.actual_net.state_dict()) print("finished") Code Rewrite --------------------- .. code:: python import torch; import torch.nn as nn; import torch.optim as optim import torch.nn.functional as F from collections import namedtuple import math; import random; import numpy as np; import matplotlib.pyplot as plt device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') Transition=namedtuple('Transition', ('state','action','nextState','reward')) class ReplayMemory: def __init__(self,memSize): self.memSize = memSize self.memory = []; self.position = 0 def push(self,*args): if len(self.memory) < self.memSize: self.memory.append(None) self.memory[self.position] = Transition(*args) self.position = (self.position+1)%self.memSize def sample(self,batchSize): return ramdom.sample(self.memory,batchSize) def __len__(self): return len(self.memory) class Net(nn.Module): def __init__(self,inSize,outSize): super(Net,self).__init__() self.linear1 = nn.Linear(inSize,inSize*2) self.linear2 = nn.Linear(inSize*2,outSize) def forward(self,x): x = x.view(x.size(0),-1) x = F.relu(self.linear1(x)) return self.linear2(x) class Trainer: def __init__(self,args): self.batchSize = args['batchSize'] self.env = Env() inSize,outSize = self.env.stateSize, self.env.nActions self.Qnet = Net(inSize,outSize); self.QhatNet = Net(inSize,outSize) self.QhatNet.load_state_dict(self.Qnet.state_dict()) self.QhatNet.eval() self.optimizer = optim.RMSprop(Qnet.parameters()) self.memory = ReplayMemory(10000) self.stepsDone = 0 def selectAction(self,state): self.stepsDone+=1 epsA,epsB,epsDecay = 0.9,0.05,200 if random.random() > epsB+(epsA-epsB)*\ math.exp(-1.*self.stepsDone/epsDecay): with torch.no_grad(): # max returns (values, indices), so max()[1] are indices return self.Qnet(state).max(dim=1)[1].view([1,1]).item() else: return torch.tensor([[random.randrange(self.env.nActions)]],dtype=torch.long).item() def optimize(self): gamma = 0.999 if len(self.memory) < self.batchSize: return transitions = self.memory.sample(self.batchSize) batch = Transition(*zip(*transitions)) states = torch.cat(batch.state) actions = torch.cat(batch.action) nextStates = torch.cat(batch.nextState) rewards = torch.cat(batch.reward) Q = self.Qnet(states).gather(dim=1,index=actions) # max returns (values, indices), so max()[0] are values y = rewards+gamma*self.QhatNet(nextStates).max(dim=1)[0].detach() loss = F.smooth_l1_loss(Q,y.unsqueeze(1)) self.optimizer.zero_grad() loss.backward() for param in self.Qnet.parameters(): pram.grad.data.clamp_(-1,1) self.optimizer.step() def train(self,nGames,nStepsPerGame,nUpdate): for i in range(nGames): self.env.reset() state = self.env.getState() for k in range(nStepsPerGame): state,action,nextState,reward = self.env.step(self.selectAction(state)) memory.push(state,action,nextState,reward) state = nextState self.optimize() if i % nUpdate==0: self.QhatNet.load_state_dict(self.Qnet.state_dict()) class Env: def __init__(self): self.state = self.actions = self.stateSize = self.state.shape[0] self.nActions = len(self.actions) def reset(self): def getState(self): def step(self):