Awesome Stock Ai

date:

22 oct 2019

  from google.colab import drive
drive.mount('/content/drive')
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
import numpy as np; import random; import matplotlib.pyplot as plt
import pandas_datareader.data as reader
SOS=0; EOS=1

class DataToIndex:
  def __init__(self,rawData,numEmbed,method='ratio'):
    if method=='raw': self.data=np.asarray(rawData)
    elif method=='diff': self.data=np.asarray(rawData[1:])-np.asarray(rawData[:-1])
    elif method=='ratio': self.data=np.asarray(rawData[1:])/np.asarray(rawData[:-1])
    self.k1=(numEmbed-2)/2; self.b1=(2+numEmbed)/2
    self.k2=(max(self.data)-min(self.data))/2
    self.b2=(max(self.data)+min(self.data))/2
    self.indices=((self.data-self.b2)/self.k2*self.k1+self.b1).astype(int)

  def plot(self,components=[10,100,1000]):
    fft=np.fft.fft(self.indices)
    plt.figure(figsize=(8,4),dpi=100)
    for c in components:
      copy=np.copy(fft);copy[c:-c]=0
      plt.plot(np.fft.ifft(copy),label="fft:{}".format(c))
    plt.plot(self.indices,label="real");plt.legend();plt.show()

  def toData(self,index):
    return ((np.array(index)-self.b1)/self.k1*self.k2+self.b2)[:-1].tolist()

  def sample(self,inputSeqLen,targetSeqLen,sampleRange):
    data=self.indices[sampleRange[0]:sampleRange[1]]
    while True:
      inputBegin=random.choice(range(data.shape[0]))
      inputEnd=inputBegin+inputSeqLen
      if(inputEnd+targetSeqLen<data.shape[0]):
        input=data[inputBegin:inputEnd].tolist()
        input.append(EOS)
        target=data[inputEnd:inputEnd+targetSeqLen].tolist()
        target.append(EOS)
        #print(data[inputBegin:inputBegin+inputSeqLen+targetSeqLen])
        return input,target

rawData=reader.get_data_yahoo('AAPL','2010-10-10','2019-10-10')['Close']
dataToIndex=DataToIndex(rawData,256,'ratio')
dataToIndex.plot(); dataToIndex.sample(3,3,[0,-1])
dataToIndex.toData([18, 19, 20, 1])
/usr/local/lib/python3.6/dist-packages/numpy/core/numeric.py:538: ComplexWarning: Casting complex values to real discards the imaginary part
  return array(a, dtype, copy=False, order=order)
../_images/Awesome-Stock-Ai-output_1_1.webp
[0.8898152303867198, 0.890651054511195, 0.8914868786356702]
image.webp

image.webp

image.webp

image.webp

import torch; import torch.nn as nn; from torch import optim
import torch.nn.functional as F

class Encoder(nn.Module):
  def __init__(self,numEmbed,hiddenSize):
    super(Encoder,self).__init__()
    self.hiddenSize=hiddenSize
    self.embedding=nn.Embedding(num_embeddings=numEmbed+1,
                                embedding_dim=hiddenSize)
    self.gru=nn.GRU(input_size=hiddenSize,
                    hidden_size=hiddenSize)
  def forward(self,input,hidden):
    embedded=self.embedding(input).view([1,1,-1])
    return self.gru(embedded,hidden) # output,hidden
  def initHidden(self):
    return torch.zeros([1,1,self.hiddenSize]).to(device)
class AttentionDecoder(nn.Module):
  def __init__(self,numEmbed,hiddenSize,dropoutProb):
    super(AttentionDecoder,self).__init__()
    self.embedding=nn.Embedding(num_embeddings=numEmbed+1,
                                embedding_dim=hiddenSize)
    self.attention=nn.Linear(hiddenSize*2,numEmbed)
    self.attentionCombine=nn.Linear(hiddenSize*2,hiddenSize)
    self.dropout=nn.Dropout(dropoutProb)
    self.gru=nn.GRU(input_size=hiddenSize,
                    hidden_size=hiddenSize)
    self.outLinear=nn.Linear(hiddenSize,numEmbed+1)
  def forward(self,input,hidden,encoderOutputs):
    embedded=self.embedding(input).view([1,1,-1])
    embedded=self.dropout(embedded)
    attentionWeights=F.softmax(self.attention(
        torch.cat((embedded[0],hidden[0]),dim=1)
    ),dim=1)
    attentionApplied=torch.bmm(attentionWeights.unsqueeze(0),
                               encoderOutputs.unsqueeze(0))
    output=torch.cat((embedded[0],attentionApplied[0]),dim=1)
    output=F.relu(self.attentionCombine(output).unsqueeze(0))
    output,hidden=self.gru(output,hidden)
    output=F.log_softmax(self.outLinear(output[0]),dim=1)
    return output,hidden,attentionWeights

class Trainer:
  def __init__(self,rawData,savePath,method,
               numEmbed=256,hiddenSize=256,learnRate=0.01,
               teachProb=0.5,dropoutProb=0.1):
    self.savePath=savePath; self.teachProb=teachProb
    self.numEmbed=numEmbed; self.hiddenSize=hiddenSize

    self.dataToIndex=DataToIndex(rawData,numEmbed,method)
    #self.dataToIndex.plot()
    self.encoder=Encoder(numEmbed,hiddenSize).to(device)
    self.decoder=AttentionDecoder(numEmbed,hiddenSize,dropoutProb).to(device)
    self.encoderOptimizer=optim.SGD(self.encoder.parameters(),lr=learnRate)
    self.decoderOptimizer=optim.SGD(self.decoder.parameters(),lr=learnRate)
    self.criterion=nn.NLLLoss()

  def train(self,nIters,inputSeqLen,targetSeqLen,saveEvery,sampleRange):
    self.losses=[];lossTotal=0
    for iter in range(nIters):
      input,target=self.dataToIndex.sample(inputSeqLen,targetSeqLen,sampleRange)
      inputTensor=torch.tensor(input,dtype=torch.long).view(-1, 1).to(device)
      targetTensor=torch.tensor(target,dtype=torch.long).view(-1, 1).to(device)
      teach=True if random.random()<self.teachProb else False
      prediction,loss=self.trainOne(inputTensor,targetTensor,teach)
      lossTotal+=loss
      if iter%saveEvery==0 and iter>0:
        self.losses.append(lossTotal); lossTotal=0
        plt.title("\nProgress {:2.2%}".format(iter/nIters))
        plt.plot(self.losses); plt.savefig(self.savePath+'_loss'+'.webp'); plt.show()
        self.save()

  def trainOne(self,input,target,teach,evaluate=False):
    prediction=[]; loss=0
    if not evaluate:
      self.encoderOptimizer.zero_grad();self.decoderOptimizer.zero_grad();
    enOutputs=torch.zeros([self.numEmbed,self.hiddenSize]).to(device)
    enHidden=self.encoder.initHidden()
    for i in range(input.size(0)):
      enOutput,enHidden=self.encoder(input[i],enHidden)
      enOutputs[i]=enOutput[0,0]
    #===============================================
    deInput=torch.tensor([[SOS]]).to(device)
    deHidden=enHidden
    for i in range(target.size(0)):
      deOutput,deHidden,deAttentionWeights=self.decoder(
        deInput,deHidden,enOutputs)
      if not evaluate:
        loss+=self.criterion(deOutput,target[i])
      if teach: deInput=target[i] # target as input
      else: # own prediction as input
        topValue,topIndex=deOutput.topk(1)
        deInput=topIndex.squeeze().detach() # detach from graph
        prediction.append(deInput.item())
        if deInput.item()==EOS: break
    #===============================================
    if evaluate: return prediction
    loss.backward();self.encoderOptimizer.step();self.decoderOptimizer.step()
    return prediction,loss.item()/target.size(0)

  def evaluate(self,input):
    with torch.no_grad():
      target=np.zeros(self.numEmbed)
      inputTensor=torch.tensor(input,dtype=torch.long).view(-1, 1).to(device)
      targetTensor=torch.tensor(target,dtype=torch.long).view(-1, 1).to(device)
      prediction=self.trainOne(inputTensor,targetTensor,teach=False,evaluate=True)
      return prediction

  def load(self,path):
    try:
      checkPoint=torch.load(path+'.pth')
      self.encoder.load_state_dict(checkPoint['encoder']); self.encoder.eval()
      self.decoder.load_state_dict(checkPoint['decoder']); self.decoder.eval()
      self.encoderOptimizer.load_state_dict(checkPoint['encoderOptimizer'])
      self.decoderOptimizer.load_state_dict(checkPoint['decoderOptimizer'])
      print('Loaded!')
    except:
      print("Model Not Found: "+path+'.pth')
  def save(self):
    torch.save({
        'encoder':self.encoder.state_dict(),
        'decoder':self.decoder.state_dict(),
        'encoderOptimizer':self.encoderOptimizer.state_dict(),
        'decoderOptimizer':self.decoderOptimizer.state_dict(),
    },self.savePath+'.pth'); print("Saved!")

import json
class BenchMarker:
  def __init__(self,nIters,configs):
    self.nIters=nIters; self.configs=configs
    self.rawData=reader.get_data_yahoo('AAPL','2000-01-01','2019-10-19')['Close']
  def run(self,action):
    log=[]
    for method in self.configs["method"]:
      for inputSeqLen in self.configs["inputSeqLen"]:
        for targetSeqLen in self.configs["targetSeqLen"]:
          savePathDir='/content/drive/My Drive/Colab Notebooks/Machine Learning/'
          fileName=method+'_'+str(inputSeqLen)+'_'+str(targetSeqLen)
          savePath=savePathDir+fileName
          trainer=Trainer(self.rawData,savePath,method)
          trainer.load(savePath)
          if action=='train':
            trainer.train(self.nIters,inputSeqLen,targetSeqLen,saveEvery=100,sampleRange=[0,-365])
          elif action=='predict':
            input,target=trainer.dataToIndex.sample(inputSeqLen,targetSeqLen,sampleRange=[-365,-1])
            prediction=trainer.evaluate(input)
            target=trainer.dataToIndex.toData(target)
            prediction=trainer.dataToIndex.toData(prediction)
            logItem={
              'fileName':fileName,
              'method':method,'inputSeqLen':inputSeqLen,'targetSeqLen':targetSeqLen,
              'input':input,'target':target,'prediction':prediction,
            }
            self.plot(logItem,savePath); log.append(logItem)
            with open(savePathDir+'StockAiBenchMark.json', 'w', encoding='utf-8') as f:
              json.dump(log, f, ensure_ascii=False, indent=4)
  def plot(self,logItem,savePath):
    plt.title(logItem['fileName'])
    plt.plot(logItem['target'][:-1],label='target')
    plt.plot(logItem['prediction'][:-1],label='prediction')
    plt.savefig(savePath+'_predict'+'.webp'); plt.legend(); plt.show()

device=torch.device('cuda' if torch.cuda.is_available() else 'cpu')
configs={
  'method':['ratio'],
  'inputSeqLen':[255],
  'targetSeqLen':[7],
}
benchMarker=BenchMarker(10000,configs)
#benchMarker.run(action='train')
benchMarker.run(action='predict')
Loaded!
../_images/Awesome-Stock-Ai-output_5_1.webp

Recycle Bin