pytorch

[pytorch] LSTM(Long Short-Term Memory)이란? / LSTM을 Pytorch로 구현 실습 코드

독립성이 강한 ISFP 2024. 9. 26. 02:16
728x90
반응형

RNN(순환신경망)의 한계점

1. 장기 의존성 문제 (Long-term Dependency Problem)
   - RNN은 이전 시점의 정보를 현재 시점에 반영하는 구조이기 때문에, 과거의 중요한 정보를 오랫동안 기억하는 데 어려움이 있습니다.
   - 시간이 길어질수록 (시퀀스가 길어질수록), RNN은 초기 시점의 정보를 잊어버리기 쉬워집니다.
   - 예를 들어, \( h_{100} \)는 \( h_1 \)부터 \( h_{99} \)까지의 은닉 상태에 의존하고 있지만, 시간이 길어질수록 \( h_1 \)의 정보는 거의 전달되지 않게 됩니다.

2. 기울기 소실 문제 (Vanishing Gradient Problem)


RNN에서 은닉 상태의 가중치 \( W_{hh} \)는 매 타임스텝의 은닉 상태를 업데이트하는 데 사용됩니다. 기울기 \( \frac{\partial Loss}{\partial W_{hh}} \)는 손실 함수 \( Loss \)가 \( W_{hh} \)에 의해 얼마나 영향을 받는지를 계산하는 값입니다. 이를 통해 가중치가 어떻게 업데이트되어야 할지를 결정할 수 있습니다.

\[
W_{hh}^{new} = W_{hh}^{old} - \eta \times \frac{\partial Loss}{\partial W_{hh}}
\]

기울기 계산은 타임스텝 \( t \)에서 \( t-1 \)까지의 모든 기울기의 연쇄적인 곱으로 계산됩니다.

\[
\frac{\partial Loss}{\partial W_{hh}} = \frac{\partial Loss}{\partial \hat{y}_t} \times \frac{\partial \hat{y}_t}{\partial h_t} \times \frac{\partial h_t}{\partial h_{t-1}} \times \cdots \times \frac{\partial h_2}{\partial h_1} \times \frac{\partial h_1}{\partial W_{hh}}
\]

여기서 중요한 점은, 시간이 거슬러 올라갈수록 기울기가 계속 곱해지며, 이는 기울기 소실 문제를 초래할 수 있습니다. \( h_1 \)에서 \( h_t \)까지의 은닉 상태 변화에 대한 기울기는 각 타임스텝마다 곱해지므로, 은닉 상태 \( h_1 \)에서의 기울기는 거의 0에 가까워질 수 있습니다.

 

어떻게 0에 가까워지는 걸까요?

RNN에서는 보통 비선형 활성화 함수로 \( \tanh \)를 사용합니다. 은닉 상태 \( h_t \)는 다음과 같이 계산됩니다.

\[
h_t = \tanh(W_{xh} x_t + W_{hh} h_{t-1})
\]

여기서 \( \tanh \) 함수는 입력 값을 -1에서 1 사이로 변환하는 비선형 함수입니다.

하이퍼볼릭 탄젠트 함수 \( \tanh \)

- 이 함수는 입력이 큰 양수일 때 1에 수렴하고, 큰 음수일 때 -1에 수렴합니다. 이 비선형성 덕분에 은닉 상태는 적절한 스케일로 변환됩니다.

- 입력 값이 0에 가까울 때 미분 값은 최대값인 1에 가까워지고, 입력 값이 크거나 작아지면 \( \tanh(x) \)의 미분 값이 0에 가까워집니다.

 


\[
\tanh(x) = \frac{e^x - e^{-x}}{e^x + e^{-x}}
\]

\[
\frac{d}{dx} \tanh(x) = 1 - \tanh^2(x)
\]

 

기울기 소실 문제
기울기 소실 문제는 역전파 과정에서 발생합니다. RNN에서 출력에 대한 기울기를 계산할 때, 은닉 상태의 가중치 \( W_{hh} \)에 대한 기울기는 체인 룰을 통해 계산됩니다. 수식적으로 표현하면, \( \frac{\partial Loss}{\partial W_{hh}} \)는 시간에 따라 누적된 기울기들의 곱으로 계산됩니다. 

 

이러한 \( \tanh \) 함수의 특성 때문에, 입력 값이 커질수록 미분 값이 0에 가까워집니다. 그 결과, 역전파 중에 기울기가 계속해서 작아지며 기울기 소실이 발생할 수 있습니다. 이것은 장기 의존성 문제로 이어지며, LSTM이나 GRU 같은 개선된 RNN 구조가 등장한 이유입니다. 


LSTM(Long Short-Term Memory)

RNN은 장기 의존성 문제를 겪는데, 이를 해결하기 위해 LSTM은 세 가지 게이트(Forget Gate, Input Gate, Output Gate)와 셀 상태(Cell State)를 도입한 구조입니다.

1. RNN의 기본 구조 

- RNN에서는 현재 시점의 입력 \( x_t \)와 이전 시점의 은닉 상태 \( h_{t-1} \)를 이용하여 현재 시점의 은닉 상태 \( h_t \)를 계산하고, 이를 기반으로 최종 출력 \( y_t \)를 계산합니다.
- RNN의 수식은 다음과 같습니다
  \[
  h_t = f(W_{xh} x_t + W_{hh} h_{t-1})
  \]

이 구조에서 은닉 상태 \( h_t \)가 이전 시점의 은닉 상태 \( h_{t-1} \)에 의존하므로, 과거 정보를 오래 유지하는 데 한계가 발생할 수 있습니다. 이로 인해 장기 의존성을 학습하는 것이 어렵고, 기울기 소실 문제(Vanishing Gradient Problem)가 발생하게 됩니다.

2. LSTM 구조 


- LSTM은 이러한 RNN의 문제를 해결하기 위해 셀 상태와 게이트를 추가하여 정보를 더 오래 유지하고, 선택적으로 정보를 기억하거나 잊을 수 있도록 설계되었습니다.


LSTM의 핵심 요소

하이퍼볼릭 탄젠트(tanh) : 정보 생성 - "어떤 새로운 정보를 고려해야 할까?"

시그모이드(sigmoid): 정보 흐름 조절 - "어떤 새로운 정보를 통과(조절)해야 할까?"

✅ Cell State

Cell State는 시간에 따라 변하지 않고 정보를 유지하는 경로입니다.

 

1. cell state의 역할
   - RNN은 hidden state를 통해 정보가 전달되지만, LSTM은 hidden state뿐만 아니라 cell state를 추가로 사용하여 정보를 전달합니다. 하지만 cell state는 출력으로 직접 반영되지 않고, 내부에서만 순환되며 정보를 저장합니다.

2. 게이트의 역할
   - LSTM에서는 3개의 게이트(forget gate, input gate, output gate)가 사용되며, 이 게이트들은 시그모이드 함수를 통해 0과 1 사이의 값을 취합니다. 각 게이트는 수문 또는 밸브와 같은 역할을 하여 정보를 얼마나 잊고, 저장하고, 출력할지를 결정합니다.
   - forget gate는 기존 cell state의 정보를 얼마나 잊을지를 결정하고, input gate는 새로운 정보를 cell state에 얼마나 추가할지를 결정합니다. 마지막으로, output gate는 새로 얻은 정보를 hidden state로 얼마나 출력할지를 결정합니다.

3. cell state 업데이트 과정
   - 기존 cell state는 forget gate를 통해 얼마나 잊을지를 결정하고, input gate를 통해 얼마나 새로운 정보를 추가할지 결정됩니다. 이러한 과정을 통해 cell state는 기억해야 할 중요한 정보를 유지하며 불필요한 정보는 잊어버립니다.


 forget gate

forget gate는 현재 시점에서 이전 cell state에 저장된 정보 중 어떤 부분을 잊어야 할지 결정하는 역할을 합니다. 이를 통해 불필요하거나 중요하지 않은 정보는 삭제하고, 필요한 정보는 유지할 수 있습니다.

 

1. forget gate의 역할
   - forget gate는 이전 cell state에 있는 정보 중 일부를 잊어야 할지를 결정합니다.
   - 입력 데이터와 이전 hidden state를 사용해 affine 변환을 한 후, 이를 시그모이드 함수를 통해 처리합니다. 이때, 시그모이드 함수는 0에서 1 사이의 값을 출력합니다.
     - 시그모이드 값이 0에 가까울수록 해당 정보는 잊히게 되고, 1에 가까울수록 해당 정보는 유지됩니다.

2. 수식



   - forget gate의 값 \( f_t \)는 시그모이드 함수 \( \sigma \)를 통해 계산됩니다.
   \[
   f_t = \sigma(W_x^{(f)} x_t + W_h^{(f)} h_{t-1} + b^{(f)})
   \]
   - 여기서 \( W_x^{(f)} \)와 \( W_h^{(f)} \)는 forget gate의 가중치, \( b^{(f)} \)는 편향입니다. 이 값은 현재 입력 \( x_t \)와 이전 hidden state \( h_{t-1} \)를 기반으로 계산됩니다.

3. Hadamard Product
   - forget gate에서 계산된 \( f_t \)는 이전 cell state \( c_{t-1} \)와 Hadamard Product(요소별 곱셈)를 수행하여, 잊어야 할 정보와 기억해야 할 정보를 구분합니다.
   \[
   c_t = f_t \circ c_{t-1}
   \]
   - 이 과정에서 forget gate의 값이 1에 가까운 위치는 정보를 유지하고, 0에 가까운 위치는 정보를 잊게 됩니다.


 input gate

input gate는 새로운 정보를 cell state에 얼마나 추가할지를 결정하는 중요한 역할을 합니다. 이 과정을 통해 LSTM은 새로운 정보가 얼마나 중요한지 판단하고, cell state에 적절하게 반영할 수 있습니다.

1. input gate의 역할
   - input gate는 forget gate와 함께 cell state를 조정하는데 사용됩니다. forget gate가 기존 정보를 얼마나 잊을지를 결정했다면, input gate는 새로운 정보를 얼마나 추가할지를 결정합니다.
   - 입력 데이터와 이전 hidden state를 기반으로 affine 변환을 거친 후, 시그모이드 함수와 하이퍼볼릭 탄젠트 함수를 이용해 새로운 정보를 계산합니다.

2. 수식


input gate에서 계산되는 값은 두 가지입니다.
     1. 새로운 정보 \( g \)는 하이퍼볼릭 탄젠트 함수 \( \tanh \)를 사용해 계산됩니다.

     - 이때, \( W_x^{(g)} \)와 \( W_h^{(g)} \)는 가중치, \( b^{(g)} \)는 편향입니다.
     \[
     g = \tanh(W_x^{(g)} x_t + W_h^{(g)} h_{t-1} + b^{(g)})
     \]

     2. 시그모이드 함수 \( \sigma \)는 새로운 정보가 얼마나 중요한지 결정하는 값을 계산합니다.

     - 시그모이드 값 \( i \)는 0에서 1 사이의 값을 가지며, 이 값이 1에 가까울수록 해당 정보가 중요하다고 판단해 cell state에 더 많이 반영됩니다.
     \[
     i = \sigma(W_x^{(i)} x_t + W_h^{(i)} h_{t-1} + b^{(i)})
     \]


 cell update  

cell update는 LSTM에서 forget gateinput gate를 사용해 cell state를 업데이트하는 과정을 설명합니다. 이 과정은 이전 시점의 cell state를 수정하고 새로운 정보를 추가해 현재 시점에서의 cell state를 만들어내는 중요한 단계입니다.

1. cell update 과정
   - forget gate와 input gate는 각각 이전 cell state의 정보 중 어떤 부분을 잊고, 새로운 정보를 얼마나 추가할지 결정합니다.
   - 이전 시점의 cell state \( c_{t-1} \)는 forget gate \( f_t \)에 의해 중요한 정보를 유지하고, 불필요한 정보를 잊게 됩니다.
   - input gate는 새로운 정보 \( g_t \)와 그 정보를 얼마나 추가할지를 결정하는 시그모이드 값 \( i_t \)를 통해 cell state에 새로운 정보를 더합니다.

2. 수식


cell state 업데이트는 다음 수식으로 표현됩니다.
   \[
   c_t = f_t \circ c_{t-1} + g_t \circ i_t
   \]
   - 여기서 \( \circ \)는 Hadamard Product(요소별 곱셈)를 의미합니다.
   - 첫 번째 항인 \( f_t \circ c_{t-1} \)는 forget gate를 통해 이전 cell state에서 잊지 않을 정보만 남기는 역할을 합니다.
   - 두 번째 항인 \( g_t \circ i_t \)는 input gate를 통해 새로운 정보를 cell state에 추가하는 과정입니다.

3. 결과
   - 이 과정을 통해 cell state는 잊어야 할 정보는 제거되고, 필요한 새로운 정보는 추가되면서 최신 상태로 업데이트됩니다. 이는 이후 시점에서의 계산에 필요한 중요한 정보를 계속해서 유지하는 데 도움을 줍니다.


  output gate

Output gate는 LSTM에서 현재 시점의 hidden state를 결정하는 데 중요한 역할을 합니다. 이는 다음 단계로 전달되는 출력 정보를 선택적으로 조절하는 메커니즘입니다. Output gate는 LSTM의 계산된 정보를 기반으로 최종 출력값을 만들어냅니다.

1. Output gate의 역할
   - Output gate는 현재 시점에서 cell state의 정보를 기반으로 최종적인 hidden state \( h_t \)를 결정합니다. 이 hidden state는 LSTM의 출력이자 다음 시점으로 전달되는 정보입니다.
   - Output gate는 시그모이드 함수 \( \sigma \)를 사용하여 cell state의 어떤 부분을 출력할지를 결정합니다. 시그모이드 함수는 0에서 1 사이의 값을 출력하며, 이 값이 1에 가까울수록 해당 정보가 많이 출력됩니다.

2. 수식



Output gate의 계산은 다음과 같습니다.

여기서 \( W_o \)는 가중치, \( b_o \)는 편향, \( h_{t-1} \)은 이전 시점의 hidden state, \( x_t \)는 현재 입력입니다.

\[
o_t = \sigma(W_x^{(o)} x_t + W_h^{(o)} h_{t-1} + b^{(o)})
\]

최종적으로, 현재 시점의 hidden state \( h_t \)는 다음과 같이 계산됩니다.

여기서 \( C_t \)는 현재 시점의 cell state이고, \( \tanh(C_t) \)는 cell state의 값을 -1에서 1 사이로 스케일링합니다. Output gate의 값 \( o_t \)와 Hadamard Product(요소별 곱셈)를 통해 최종 hidden state가 결정됩니다.


     \[
     h_t = o_t \times \tanh(C_t)
     \]

3. 작동 원리
   - Output gate는 시그모이드 함수로 cell state의 정보 중 어떤 부분이 출력될지를 조절합니다. 이때, cell state의 전체 값을 그대로 사용하는 것이 아니라, \( \tanh \) 함수를 적용해 적절히 스케일링된 값을 사용합니다.
   - 이 과정을 통해 LSTM은 현재 시점에서 필요한 정보만을 선택적으로 출력할 수 있습니다.

요약하자면, Output gate는 LSTM에서 최종 hidden state \( h_t \)를 결정하는 역할을 하며, cell state의 중요한 정보를 기반으로 다음 단계로 전달될 정보를 선택적으로 출력합니다.


LSTM(Long Short-Term Memory) 구조 정리

f, i, o는 모두 0과 1사이의 값을 갖는 벡터임. (시그모이드를 지났기 때문에)


Pytorch 로 LSTM 구현

실습에 활용한 데이터셋 -> (730, 4)

# device 설정
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 난수 시드 설정
def set_seed(seed_value):
    random.seed(seed_value)  # 파이썬 난수 생성기
    np.random.seed(seed_value)  # Numpy 난수 생성기
    torch.manual_seed(seed_value)  # PyTorch 난수 생성기

    # CUDA 환경에 대한 시드 설정 (GPU 사용 시)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed_value)
        torch.cuda.manual_seed_all(seed_value)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False


# 시퀀스 데이터 생성 함수
def build_sequence_dataset(df, seq_length):
    dataX = []
    dataY = []
    for i in range(0, len(df) - seq_length):
        _x = df.iloc[i:i + seq_length].values  # 시퀀스 데이터
        _y = df.iloc[i + seq_length]['temperature']  # 다음 포인트의 기온을 레이블로 사용
        dataX.append(_x)
        dataY.append(_y)
    return np.array(dataX), np.array(dataY)


class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(LSTMModel, self).__init__()

        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)  # 출력 크기 조정을 위한 선형 레이어

    def forward(self, x):
        batch_size = x.size(0)

        h0, c0 = self.init_hidden(batch_size, x.device)
        out, _ = self.lstm(x, (h0, c0))
        out = self.fc(out[:, -1, :])  # 마지막 타임 스텝의 출력만 사용
        return out

    def init_hidden(self, batch_size, device):
        h0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers, batch_size, self.hidden_size).to(device)
        return h0, c0


def train(model, train_loader, optimizer, criterion):
    model.train()  # 모델을 학습 모드로 설정
    total_loss = 0
    for data, target in train_loader:
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    return total_loss / len(train_loader)


def validate_model(model, test_loader, criterion):
    model.eval()  # 모델을 평가 모드로 설정
    total_loss = 0
    actuals = []
    predictions = []

    with torch.no_grad():  # 그라디언트 계산을 비활성화
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target.view(-1, 1))
            total_loss += loss.item()

            # 예측값과 실제값을 리스트에 저장
            actuals.extend(target.squeeze(1).tolist())
            predictions.extend(output.squeeze(1).tolist())

    # 손실 계산
    avg_loss = total_loss / len(test_loader)

    return avg_loss, actuals, predictions

 

#  데이터 로드
df_weather = pd.read_csv('sample_weather_data.csv')

#  피처 선택
features = ['humidity','rainfall','wspeed','temperature']
df_weather = df_weather[features] 

#  파라미터 설정
seed_value = 42
set_seed(seed_value)  # 위에서 정의한 함수 호출로 모든 시드 설정

# model parameter
num_output = 1
num_hidden = 10
num_features = len(features)

# hyper parameter
seq_length = 6  # 과거 6일의 데이터를 기반으로 다음날의 기온을 예측
batch_size = 32
learning_rate = 0.01

# 정규화를 위한 스케일러 초기화 및 적용
scaler = StandardScaler()
weather_scaled_arr = scaler.fit_transform(df_weather)
df_weather_scaled = pd.DataFrame(weather_scaled_arr, columns=features)

# 시퀀스 데이터 생성
sequence_dataX, sequence_dataY = build_sequence_dataset(df_weather_scaled, seq_length)

# train - test 데이터 분할
# sequence_dataX와 sequence_dataY를 사용하여 데이터를 학습 세트와 테스트 세트로 분할
train_X, test_X, train_Y, test_Y = train_test_split(
    sequence_dataX, sequence_dataY, test_size=0.2, shuffle = False
)


# 텐서로 데이터 변환
train_X_tensor = torch.tensor(train_X, dtype=torch.float32)
train_Y_tensor = torch.tensor(train_Y.reshape(-1, 1), dtype=torch.float32)
test_X_tensor = torch.tensor(test_X, dtype=torch.float32)
test_Y_tensor = torch.tensor(test_Y.reshape(-1, 1), dtype=torch.float32)

# TensorDataset 생성
train_dataset = TensorDataset(train_X_tensor, train_Y_tensor)
test_dataset = TensorDataset(test_X_tensor, test_Y_tensor)

# DataLoader 설정
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# 모델 인스턴스 생성
model_lstm = LSTMModel(input_size=num_features, hidden_size=num_hidden, num_layers=1, output_size=num_output).to(device)

optimizer_lstm = torch.optim.Adam(model_lstm.parameters(), lr=learning_rate)
criterion_lstm = nn.MSELoss()

# 학습
# 각 에포크의 평균 손실을 저장할 리스트 초기화
train_loss_lst = []
test_loss_lst = []

max_epochs = 200
for epoch in range(max_epochs):
    train_loss = train(model_lstm, train_loader, optimizer_lstm, criterion_lstm)
    test_loss, actuals, predictions = validate_model(model_lstm, test_loader, criterion_lstm)

    train_loss_lst.append(train_loss)  # 손실 기록
    test_loss_lst.append(test_loss)  # 손실 기록

    if (epoch+1) % 10 == 0:
        print(f"epoch {epoch+1}: train loss(mse) = {train_loss:.4f}  test loss(mse) = {test_loss:.4f}")

print(f"학습 완료 : 총 {epoch+1} epoch")
728x90
반응형