본문 바로가기
공부기록/[PyTorch]

Amazon Stock Forecasting in PyTorch with LSTM Neural Network (Time Series Forecasting) | Tutorial 3 (240415, 공부기록 5일차)

by RiverWon 2024. 4. 16.

막연한 꿈 금융시계열 마스터해서 부자되기

의 첫스텝의 첫스텝 LSTM time series forecasting이다

사실 저번주 쯤에 한 번 시도해 봤는데, 텐서가 뭔지, 차원이 뭔지도 잘 모르는 상태여서 안 했다고 봐도 무방할 정도다.. 그래서 아주 약간의 pytorch지식을 장착한 후 재도전한다.

 

lecture Reference : https://www.youtube.com/watchv=q_HS4s1L8UI&list=PLKYEe2WisBTHmsTdhjaQydI58tI7pFCDT&index=3

 

가르쳐주셔 감사함니다..

 

강의 따라한 Colab

https://colab.research.google.com/drive/1YXWyVCD6LxNXdWIvqI6iODfcpiH6Mcmt?usp=sharing

 

Google Colaboratory Notebook

Run, share, and edit Python notebooks

colab.research.google.com

 

 

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn

data = pd.read_csv('AMZN.csv')

data

필요한 라이브러리들 import, 데이터 로드

 

데이터는 대충 이렇게 생겼다

 

가격이 극심하게 차이나는 이유는, 주식병합이나 액면분할 때문인 것 같다. 나중에 MinMaxScaler로 -1, 1사이로 전부 정규화 한다

 

 

 

 

 

 

 

# stock forecasting에 필요한 columns만 가져옴
data = data[['Date', 'Close']]
data

# training을 수행해야 하기에 GPU인 CUDA를 활용할 수 있는지 체크
device = "cuda:0" if torch.cuda.is_available() else 'cpu'
device

# string type의 'Date'를 pandas의 datetime객체로 변경
data['Date'] = pd.to_datetime(data['Date'])
data.head()

# 종가 그림
plt.plot(data['Date'], data['Close'])

Close Price

from copy import deepcopy as dc

def prepare_dataframe_for_lstm(df, n_steps):
  df = dc(df)

  #set_index공부
  df.set_index('Date', inplace=True)

  for i in range(1, n_steps+1):
    df[f'Close(t-{i})'] = df['Close'].shift(i)

  #dropna : 결측치가 있는 부분 제거
  df.dropna(inplace=True)

  return df

# 며칠간의 widnow로 주가 예측할 것인가
lookback = 7
shifted_df = prepare_dataframe_for_lstm(data, lookback)
shifted_df

#inplace=False (기본 설정)으로 설정된 경우, 변경된 새 데이터프레임을 반환하고 원본 데이터프레임은 그대로 유지

shifted_df

#scaling을 위해 dataframe 을 numpy로 바꾸어 줌
shifted_df_as_np = shifted_df.to_numpy()
shifted_df_as_np

shifted_df_as_np.shape
#>>>(6509, 8)

from sklearn.preprocessing import MinMaxScaler

#(-1, 1) MinMaxScaling
scaler = MinMaxScaler(feature_range = (-1, 1))
shifted_df_as_np = scaler.fit_transform(shifted_df_as_np)
shifted_df_as_np

X = shifted_df_as_np[:, 1:]
y = shifted_df_as_np[:, 0]

X.shape, y.shape
#>>>((6509, 7), (6509,))
#LSTM의 input으로 최근 내용이 나중에 들어가야 예측이 더 수월함
# t-1 -> t-2 ... -> t-7에서
# t-7 -> t-6 ... -> t-1로 바뀌면서 최근 주가가 나중에 들어감
X = dc(np.flip(X, axis = 1))
X

"""
array([[-0.99969839, -0.99982128, -0.99983244, ..., -0.99998325,
        -1.        , -0.99995531],
       [-0.99982128, -0.99983244, -0.99987154, ..., -1.        ,
        -0.99994415, -0.99991063],
       [-0.99983244, -0.99987154, -0.99998325, ..., -0.99994415,
        -0.99989946, -0.99993855],
       ...,
       [ 0.05779984,  0.05158   ,  0.0506149 , ...,  0.07431453,
         0.09308121,  0.10690997],
       [ 0.05158   ,  0.0506149 ,  0.04203581, ...,  0.09308121,
         0.10691495,  0.09747299],
       [ 0.0506149 ,  0.04203581,  0.07431453, ...,  0.10691495,
         0.09747802,  0.11398769]])
"""
# train, test set 분리
split_index = int(len(X)*0.95)
split_index
# 6183

X_train = X[:split_index]
X_test = X[split_index:]

y_train = y[:split_index]
y_test = y[split_index:]

X_train.shape, X_test.shape, y_train.shape, y_test.shape
# ((6183, 7), (326, 7), (6183,), (326,))
#(데이터수, 참조할주가수, 예측치1개)
X_train = X_train.reshape((-1, lookback, 1))
X_test = X_test.reshape((-1, lookback, 1))

y_train = y_train.reshape((-1, 1))
y_test = y_test.reshape((-1,1))

X_train.shape, X_test.shape, y_train.shape, y_test.shape
# ((6183, 7, 1), (326, 7, 1), (6183, 1), (326, 1))


X_train = torch.tensor(X_train).float()
X_test = torch.tensor(X_test).float()
y_train = torch.tensor(y_train).float()
y_test = torch.tensor(y_test).float()

X_train.shape, X_test.shape, y_train.shape, y_test.shape

"""
X_train = torch.tensor(X_train).float()
X_test = torch.tensor(X_test).float()
y_train = torch.tensor(y_train).float()
y_test = torch.tensor(y_test).float()

X_train.shape, X_test.shape, y_train.shape, y_test.shape
(torch.Size([6183, 7, 1]),
 torch.Size([326, 7, 1]),
 torch.Size([6183, 1]),
 torch.Size([326, 1]))
"""
from torch.utils.data import Dataset

class TimeSeriesDataset(Dataset):
  def __init__(self, X, y):
    self.X = X
    self.y = y

  def __len__(self):
    return len(self.X)

  def __getitem__(self, i):
    return self.X[i], self.y[i]

train_dataset = TimeSeriesDataset(X_train, y_train)
test_dataset = TimeSeriesDataset(X_test, y_test)

train_dataset
# <__main__.TimeSeriesDataset at 0x7eb9ef630be0>

from torch.utils.data import DataLoader

batch_size = 16

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

#batch_index는 안 필요해서 underline
for _, batch in enumerate(train_loader):
  x_batch, y_batch = batch[0].to(device), batch[1].to(device)
  print(x_batch.shape, y_batch.shape)
  # 첫 번째 batch만 확인
  break
# torch.Size([16, 7, 1]) torch.Size([16, 1])
# LSTM class define

class LSTM(nn.Module):
  def __init__(self, input_size, hidden_size, num_stacked_layers):
    super().__init__()

    #hidden_size, num_stacked_layers는 forward함수에서 다시 사용되기 때문에 인스턴스 초기화
    self.hidden_size = hidden_size
    self.num_stacked_layers = num_stacked_layers

    self.lstm = nn.LSTM(input_size, hidden_size, num_stacked_layers,
                        batch_first = True)

    self.fc = nn.Linear(hidden_size, 1)

#텐서 차원을 이렇게 선언한 이유는 잘 모르겠음..
  def forward(self, x):

    batch_size = x.size(0)
    h0 = torch.zeros(self.num_stacked_layers, batch_size, self.hidden_size).to(device)
    c0 = torch.zeros(self.num_stacked_layers, batch_size, self.hidden_size).to(device)

    out, _ = self.lstm(x, (h0, c0))
    out = self.fc(out[:, -1, :])
    return out

model = LSTM(1,4,1)
model.to(device)
model
"""
LSTM(
  (lstm): LSTM(1, 4, batch_first=True)
  (fc): Linear(in_features=4, out_features=1, bias=True)
)
"""
# Train one Epoch
def train_one_epoch():
    model.train(True)
    print(f'Epoch: {epoch + 1}')
    running_loss = 0.0

    for batch_index, batch in enumerate(train_loader):
        x_batch, y_batch = batch[0].to(device), batch[1].to(device)

        output = model(x_batch)
        loss = loss_function(output, y_batch)
        running_loss += loss.item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch_index % 100 == 99:  # print every 100 batches
            avg_loss_across_batches = running_loss / 100
            print('Batch {0}, Loss: {1:.3f}'.format(batch_index+1,
                                                    avg_loss_across_batches))
            running_loss = 0.0
    print()
def validate_one_epoch():
  model.train(False)
  running_loss = 0.0

  for batch_index, batch in enumerate(test_loader):
    x_batch, y_batch = batch[0].to(device), batch[1].to(device)

    with torch.no_grad():
        output = model(x_batch)
        loss = loss_function(output, y_batch)
        running_loss += loss.item()

  avg_loss_across_batches = running_loss / len(test_loader)

  print('Val Loss: {0:.3f}'.format(avg_loss_across_batches))
  print('***************************************************')
  print()
#model train
learning_rate = 0.001
num_epochs = 10
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(num_epochs):
    train_one_epoch()
    validate_one_epoch()
    
"""
Epoch: 1
Batch 100, Loss: 1.250
Batch 200, Loss: 0.170
Batch 300, Loss: 0.028

Val Loss: 0.018
***************************************************

Epoch: 2
Batch 100, Loss: 0.005
Batch 200, Loss: 0.005
Batch 300, Loss: 0.004

Val Loss: 0.013
***************************************************

Epoch: 3
Batch 100, Loss: 0.004
Batch 200, Loss: 0.004
Batch 300, Loss: 0.003

Val Loss: 0.011
***************************************************

Epoch: 4
Batch 100, Loss: 0.002
Batch 200, Loss: 0.003
Batch 300, Loss: 0.002

Val Loss: 0.011
***************************************************

Epoch: 5
Batch 100, Loss: 0.002
Batch 200, Loss: 0.002
Batch 300, Loss: 0.002

Val Loss: 0.010
***************************************************

Epoch: 6
Batch 100, Loss: 0.001
Batch 200, Loss: 0.001
Batch 300, Loss: 0.001

Val Loss: 0.009
***************************************************

Epoch: 7
Batch 100, Loss: 0.001
Batch 200, Loss: 0.001
Batch 300, Loss: 0.001

Val Loss: 0.007
***************************************************

Epoch: 8
Batch 100, Loss: 0.000
Batch 200, Loss: 0.000
Batch 300, Loss: 0.000

Val Loss: 0.006
***************************************************

Epoch: 9
Batch 100, Loss: 0.000
Batch 200, Loss: 0.000
Batch 300, Loss: 0.000

Val Loss: 0.005
***************************************************

Epoch: 10
Batch 100, Loss: 0.000
Batch 200, Loss: 0.000
Batch 300, Loss: 0.000

Val Loss: 0.005
***************************************************

"""
with torch.no_grad():
  predicted = model(X_train.to(device)).to('cpu').numpy()

plt.plot(y_train, label='Actual Close')
plt.plot(predicted, label='Predicted Close')
plt.xlabel('Day')
plt.ylabel('Close')
plt.legend()
plt.show()

예측값 - 주황색

잘 예측하는 것 같음 but! scaling되어있는 상태라 실제 가격을 모름. 이제 스케일러 풀어주자

 

train_predictions = predicted.flatten()

dummies = np.zeros((X_train.shape[0], lookback+1))
dummies[:, 0] = train_predictions
dummies = scaler.inverse_transform(dummies)

train_predictions = dc(dummies[:, 0])
train_predictions
"""array([  0.47217395,   0.4699507 ,   0.46879461, ..., 169.15940866,
       168.86965258, 169.08869821])"""
       
dummies = np.zeros((X_train.shape[0], lookback+1))
dummies[:, 0] = y_train.flatten()
dummies = scaler.inverse_transform(dummies)

new_y_train = dc(dummies[:, 0])
new_y_train
"""array([7.91646265e-02, 7.65634249e-02, 7.52572660e-02, ...,
       1.69091505e+02, 1.73315001e+02, 1.68871003e+02])"""
       
plt.plot(new_y_train, label='Actual Close')
plt.plot(train_predictions, label='Predicted Close')
plt.xlabel('Day')
plt.ylabel('Close')
plt.legend()
plt.show()

test_predictions = model(X_test.to(device)).detach().cpu().numpy().flatten()

dummies = np.zeros((X_test.shape[0], lookback+1))
dummies[:, 0] = test_predictions
dummies = scaler.inverse_transform(dummies)

test_predictions = dc(dummies[:, 0])
test_predictions



dummies = np.zeros((X_test.shape[0], lookback+1))
dummies[:, 0] = y_test.flatten()
dummies = scaler.inverse_transform(dummies)

new_y_test = dc(dummies[:, 0])
new_y_test



plt.plot(new_y_test, label='Actual Close')
plt.plot(test_predictions, label='Predicted Close')
plt.xlabel('Day')
plt.ylabel('Close')
plt.legend()
plt.show()

 

 

파이토치 간략한 강의 듣고 다시 따라해 봤는데 확실히 이전보단 따라갈 수 있는 느낌이 강하게 들었음. 그런데 차원 설정이라던지, 스케일링 원상복구 하는 부분, 일부 메소드는 아직 정확히 이해하진 못한 것 같다. 코드 다시 복습하면서 메꾸어 나갈 예정이다.

 

또 하나, 아직 스스로 모델 설계를 해 보지 못했다. 모범답안 코랩 띄어 놓고, 흘끔 보고 나머지 코드 완성하고, 다음 순서 살짝 보고 코드 짜 보고 한 수준이다. 모델 훈련 및 validation의 큰 틀은 다르지 않을 것이니, 이 코드로 계속 공부하고 다른 factor들 추가해 보며 stock forecasting쪽 공부해 보겠다.