728x90
반응형
728x90

안녕하세요! 오늘은 기존에 작성한 cart pole 문제DDQN(Double Deep Q-Network) 으로 진행하는 방법에 대해 포스팅 하겠습니다.

  • DDQN (Double Deep Q-Network) 이란?

강화 학습의 한 방법으로, DQN과 비슷하지만 학습할 때 Neural Network(인공 신경망)를 두 개 사용하는 학습 방법입니다.

DQN에서 Q-Network가 두 개 사용되었다고 보시면 될 것 같아요!

 

DQN에 대해 궁금하신 분들은 cart pole를 DQN으로 진행하는 아래 포스팅을 참고 해주시면 됩니다.

https://yhj9855.com/entry/RL-gymnasium-cart-pole-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-DQN

 

[RL] gymnasium cart pole 강화 학습 - DQN

안녕하세요! 오늘은 기존에 작성한 cart pole 문제를 DQN(Deep Q-Network) 으로 진행하는 방법에 대해 포스팅 하겠습니다.DQN (Deep Q-Network) 이란?강화 학습의 한 방법으로, Q-learning에서 Q-table 대신 Neural Netw

yhj9855.com

 

cart pole에 관한 전체적인 설명은 아래 포스팅에서 진행하고 있으니, 먼저 확인해주세요!!

https://yhj9855.com/entry/RL-gymnasium-cart-pole-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-1

 

[RL] gymnasium cart pole 강화 학습 - 1

안녕하세요! 오늘은 gymnasium에서 제공하는 cart pole 문제 설명에 대한 포스팅 진행하겠습니다. 저는 cart pole 문제를 DQN과 DDQN 두 가지 방법으로 풀어보았는데요!각각 전체 코드는 포스팅 가장 아래

yhj9855.com


그럼 본격적으로 DDQN으로 cart pole 문제를 풀어보도록 하겠습니다.

Neural Network 구현

우선 먼저 DDQN에서 Neural Network을 먼저 구현합니다.

Neural Network을 구현하는 코드는 아래와 같습니다.

class DDQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DDQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, x):
    	x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

레이어 층을 몇 개 사용할 것인지, 활성 함수는 어떤 것을 사용할 것인지는 모두 하이퍼 파라미터 입니다!

저는 3개의 층을 사용했고, 모두 Relu 함수를 사용했습니다.

보통 어떤 활성 함수를 사용하면 좋을지 모를 때, Relu 함수를 많이 사용하는데요, 그래도 다른 활성 함수도 사용해보시는 걸 추천 드려요!!

저는 tanh 함수를 사용했지만, 학습이 너무 진행되지 않아서 포기했었습니다ㅜㅜ

Replay Memory 구현

다음은 Replay Memory를 구현합니다.

  • Replay Memory란?

DDQN처럼 강화 학습과 딥러닝이 혼합된 알고리즘을 사용할 때, 더 안정적이고 효율적인 학습을 하기 위한 방법입니다.

환경에서 만들어진 에피소드를 저장해두었다가, 랜덤으로 샘플링해서 학습하는 것이 핵심 아이디어 입니다.

 

매 순간순간 에피소드를 학습하게 되면, 비슷한 경험으로 학습이 되지 않고, 편향적으로 학습될 가능성이 있습니다.

Replay Memory 방법을 활용하면 해당 단점을 완화할 수 있어, 자주 사용되는 방법입니다.

 

Replay Memory를 코드로 구현하는 방법은 아래와 같습니다.

class ReplayMemory:
    def __init__(self, max_len):
        self.memory = deque(maxlen=max_len)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

행동 선택하기

다음은 Q-learning과 비슷하게 Q값을 바탕으로 행동을 선택하는 것을 구현합니다.

저는 여기서 행동을 선택하는 방법을 두 가지 소개 드리려고 합니다!

 

1.  ϵ-greedy로 행동 선택하기

  • ϵ-greedy란?

탐험과 이용의 균형을 맞추기 위한 행동 선택 방법으로, 아래 공식을 따릅니다.

여기서 ϵ은 0과 1 사이의 값으로 ϵ 확률 만큼은 랜덤하게 행동을 하게 하여 탐험을 진행하도록 하고, (1-ϵ) 확률 만큼 Q 값이 가장 높은 행동을 선택하도록 합니다.

해당 ϵ을 초반에 높게 설정하고 점차 ϵ을 줄임으로써, 초반에는 랜덤 행동을 통한 탐험을 하게 하고 점차 Q 값을 이용하도록 행동을 선택할 수 있습니다.

 

ϵ-greedy를 활용하여 행동을 선택하는 코드는 아래와 같습니다.

def select_action(state, target_net, action_dim):
    if random.random() < epsilon:
        return random.randint(0, action_dim - 1)
    else:
        return target_net(state).argmax().item()

여기서 행동 값을 target_net을 바탕으로 진행을 하고 있는데요.

학습이 진행 중인 policy_net에서 행동을 선택할 경우, 학습이 불안정할 가능성이 높아 target_net에서 진행하는 것이 더 좋습니다.

 

2. 확률로 행동 선택하기

해당 방법은 Q값을 확률로 변경한 다음, 확률대로 행동을 선택하게 하는 것을 의미합니다.

이 방법을 사용할 경우, 확률적으로 행동을 선택하기 때문에 어느 정도 학습이 진행되어도 탐험을 보장한다는 것과 ϵ을 따로 세팅해주지 않아도 되는 것이 장점이라고 볼 수 있습니다!

 

확률로 행동을 선택하는 코드는 아래와 같습니다.

def select_action(state, target_net, action_dim):
    q_value = target_net(state)
    # Q 값을 확률 값으로 바꾸는 과정
    p = F.softmax(q_value, dim=0).tolist()
    # 부동소수점 오차로 인해 합이 1이 안되는 문제 해결
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

DDQN 공식을 사용해서 업데이트

이제 DDQN 공식을 사용해서 네트워크를 업데이트 하는 것을 구현해보도록 하겠습니다.

코드로 들어가지 전에 먼저 DDQN 공식을 먼저 살펴보겠습니다.

기본적으로는 DQN과 비슷한 공식입니다!

DQN과 Q가 두 번 사용된 것을 보실 수 있습니다.

 

DQN과 마찬가지로 DDQN은 딥러닝을 사용하기 때문에 자체적으로 옵티마이저와 learning rate가 들어가게 됩니다.

 두 개가 DDQN의 α로 작용을 하기 때문에 실제로 사용하는 공식에는 α가 사라져 아래와 같은 공식이 됩니다!

θ와 θ ̄ 는 α 대신 네트워크가 작동하는 부분이라고 생각하시면 됩니다.

 

이제 해당 공식을 바탕으로 DDQN을 진행하는 코드는 아래와 같습니다.

def optimize_model(memory, policy_net, target_net, optimizer):
	# batch_size만큼 데이터가 메모리에 쌓였을 때만 학습 진행
    if len(memory) < batch_size:
        return

    # transitions = (state, action, reward, next_state, done)
    transitions = memory.sample(batch_size)
    # state, action, reward, next_state, done을 각각 묶어서 list의 형태로 만드는 작업
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    # DDQN
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_action = policy_net(next_state_batch).argmax(1).unsqueeze(1)
    next_q_values = target_net(next_state_batch).gather(1, next_action).squeeze().detach()
    # (1-done_batch)을 통해 에피소드가 끝났는지 아닌지를 판단
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

policy_net에서 가져온 next_action을 바탕으로 target_net에서 next_q_values 값을 구하고 있습니다.

이처럼 policy_net, target_net 두 가지에서 가져온 값으로 계산을 진행하기 때문에 Double Deep Q-Network가 된 것입니다.

모델 학습

이제 본격적으로 학습을 진행해보도록 하겠습니다.

한 가지 중요한 점은 Cart Pole 환경이 500점을 달성해도, 에피소드 완료라고 판단하지 않기 때문에 저희가 직접 판단해줘야 합니다.

 

학습을 진행하는 코드는 아래와 같습니다.

# 초기 세팅
policy_net = DDQN(state_dim, action_dim)
target_net = DDQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon-greedy 방법으로 행동을 선택할 때 필요
# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0

save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

# 모델 학습
for episode in range(episodes):
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 1000 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/1000}")
    if episode % 1000 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)
        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)
        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
         model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
         torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    # epsilon-greedy로 action을 선택할 때는 있어야 함
    # if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

리워드 시각화 및 모델 테스트

마지막으로 저희가 학습한 모델이 잘 학습되었는지 확인하기 위해, 리워드를 시각화하고 모델을 테스트해보겠습니다.

먼저 리워드 시각화 하는 코드는 아래와 같습니다

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN on CartPole')
plt.show()

저는 위와 같은 결과 값이 나왔는데요, 한결 같은 값을 가지는 것은 아니지만 전체적으로 점점 리워드가 상승하는 것을 볼 수 있습니다.

이처럼 여러분의 리워드도 전체적으로 상승하는 형상을 보이고 있다면, 학습이 잘 된 것으로 보실 수 있습니다.

다음은 모델 테스트를 진행해보겠습니다.

저는 모든 모델을 테스트 한 것은 아니도, 500점 이상을 달성한 모델만 따로 저장하여 모델 테스트를 진행해보았습니다.

리워드 그래프에서도 보셨듯, 전체적으로 리워드가 상승하는 것이지 모든 모델이 좋은 모델이라고는 볼 수 없기 때문에 최대치의 리워드를 달성한 모델로 테스트를 진행하였어요!

 

모델 테스트를 진행하는 코드는 아래와 같습니다.

# 테스트 시, render 활성화 필요
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

특정 모델은 500점 리워드를 달성하지 못하는 경우도 있었지만, 대부분은 500점 이상을 달성하는 것을 볼 수 있었습니다.

모델마다 다른 방식으로 500점을 달성하는데, 한 번 구경해보시는 것도 좋을 것 같아요!

전체 코드

import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import os
import glob
import torch.nn.functional as F

# 하이퍼 파라미터
gamma = 0.99
learning_rate = 0.0005
batch_size = 100
memory_size = 5000
episodes = 5000
# ϵ-greedy 사용 시, 필요
# epsilon_start = 1.0
# epsilon_end = 0.001
# epsilon_decay = 0.995

class DDQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 32)
        self.fc2 = nn.Linear(32, 32)
        self.fc3 = nn.Linear(32, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

def select_action(state, target_net, action_dim):
	# ϵ-greedy
	# if random.random() < epsilon:
    #     return random.randint(0, action_dim - 1)
    # else:
    #     return target_net(state).argmax().item()
    
    q_value = target_net(state)
    p = F.softmax(q_value, dim=0).tolist()
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

def optimize_model(memory, policy_net, target_net, optimizer):
	if len(memory) < batch_size:
        return

    transitions = memory.sample(batch_size)
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_action = policy_net(next_state_batch).argmax(1).unsqueeze(1)
    next_q_values = target_net(next_state_batch).gather(1, next_action).squeeze().detach()
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = DDQN(state_dim, action_dim)
target_net = DDQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0
save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

for episode in range(episodes):
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 100 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/100}")
    if episode % 100 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)

        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)

        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
        model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
        torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    
    # ϵ-greedy 사용 시, 필요
  	# if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN on CartPole')
plt.show()

# 테스트 진행
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

 

사실 cart pole 문제는 간단한 편이라서, DQN이랑 DDQN이랑 성능이 거의 차이가 나지 않았습니다!

다만 그래도 Q-Network를 두 개를 사용한다는 개념이 헷갈릴 수 있기 때문에 한 번은 구현해보시는 걸 추천드려요:)

코드에 대해 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

728x90
반응형
728x90
반응형

728x90

안녕하세요! 오늘은 gymnasium에서 제공하는 cart pole 문제 설명에 대한 포스팅 진행하겠습니다.

 

저는 cart pole 문제를 DQNDDQN 두 가지 방법으로 풀어보았는데요!

각각 전체 코드는 포스팅 가장 아래에 있습니다.


  • cart pole 규칙

우선 gymnasium에서 제공하는 cart pole 문제는 아래 링크에서 자세하게 보실 수 있습니다.

https://gymnasium.farama.org/environments/classic_control/cart_pole/

 

Gymnasium Documentation

A standard API for reinforcement learning and a diverse set of reference environments (formerly Gym)

gymnasium.farama.org

 

위의 사진처럼 생긴 막대를 지속적으로 세우는 것이 cart pole의 가장 큰 목표 입니다.

cart pole의 자세한 규칙은 아래와 같습니다.

  1. 이동 방향좌, 우만 존재
  2. 현재 막대의 상태는 막대 위치, 속도, 각도, 각속도 총 4가지로 표현
  3. 하나의 step이 에피소드 종료되지 않으면 보상 1을 획득
  4. 각도가 좌우 12 º 이상 벗어나거나, 막대가 화면을 벗어나거나, 보상 500(혹은 200) 달성하면 에피소드 종료

저희는 위의 규칙을 잘 생각하여, 막대가 특정 각도를 넘어가거나 화면에 벗어나지 않게 500 에피소드 이상 버티는 것을 학습해주어야 합니다.

  • 필요한 라이브러리 설치

frozen lake는 gymnasium에서 제공하고 있기 때문에 가장 먼저 gymnasium을 설치해주시면 됩니다!

그리고 render 모드 활성화를 진행하기 위해 gymnasium[toy-text] 설치도 같이 진행해주세요.

두 패키지 모두 pip install로 설치해주시면 금방 설치할 수 있습니다.

import gymnasium as gym
from collections import defaultdict
import numpy as np
  • cart pole 환경 세팅

필요한 라이브러리를 설치한 후에는 cart pole 환경을 세팅해봅니다.

우선 환경 세팅을 하는 코드는 아래와 같습니다.

env = gym.make("CartPole-v1")

cart pole은 학습을 진행해주지 않으면 바로 에피소드가 종료가 되기 때문에 테스트를 진행해보기가 어렵습니다ㅠㅠ

어떤 식으로 진행되는지 궁금하신 분들은 전체 코드 복사 후 실행해보시면, cart pole이 어떻게 진행되는지 아실 수 있습니다!


여기까지가 cart pole 환경 세팅이었습니다.

cart pole 환경 세팅은 그렇게 어렵지 않아서, 금방 하실 수 있어요!

 

이번 포스팅에서는 DQN과 DDQN에 관한 전체 코드만 업로드하고, 자세한 포스팅은 추후에 진행하도록 하겠습니다.

두 가지 모두 500 에피소드에 달성한 모델이 다수 존재했습니다.

[DQN 전체 코드]

import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import os
import glob
import torch.nn.functional as F

# 하이퍼 파라미터
gamma = 0.99
learning_rate = 0.001
batch_size = 100
memory_size = 5000
# epsilon에 의해 행동을 선택할 때는 해당 부분 필요
# epsilon_start = 1.0
# epsilon_end = 0.001
# epsilon_decay = 0.995
episodes = 5000

class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, x):
    	x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class ReplayMemory:
    def __init__(self, max_len):
        self.memory = deque(maxlen=max_len)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

def select_action(state, target_net, action_dim):
	# epsilon-greedy를 바탕으로 행동을 선택하는 과정
    # if random.random() < epsilon:
    #     return random.randint(0, action_dim - 1)
    # else:
    #     return target_net(state).argmax().item()
    
    # 행동 값을 확률로 변경하여, 확률에 따라 행동을 선택하는 과정
    q_value = target_net(state)
    p = F.softmax(q_value, dim=0).tolist()
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

def optimize_model(memory, policy_net, target_net, optimizer):
    if len(memory) < batch_size:
        return

    transitions = memory.sample(batch_size)
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    # DQN
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_q_values = target_net(next_state_batch).max(1)[0].detach()
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon-greedy 방법으로 행동을 선택할 때 필요
# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0

save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

# 500을 10회 진행하면 성공이라고 판단하여, 종료를 하기 위한 장치로 잠깐 쓰인 것
# count = 0

for episode in range(episodes):
    # if count > 10 :
    #     break
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 1000 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/1000}")
    if episode % 1000 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)
        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)
        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
    #    count += 1
         model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
         torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    # epsilon-greedy로 action을 선택할 때는 있어야 함
    # if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN on CartPole')
plt.show()

# 테스트 진행 - render를 켜줘야 확인이 가능
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

[DDQN 전체 코드]

import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import os
import glob
import torch.nn.functional as F

# 하이퍼 파라미터
gamma = 0.99
learning_rate = 0.001
batch_size = 100
memory_size = 5000
# epsilon에 의해 행동을 선택할 때는 해당 부분 필요
# epsilon_start = 1.0
# epsilon_end = 0.001
# epsilon_decay = 0.995
episodes = 5000

class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, x):
    	x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class ReplayMemory:
    def __init__(self, max_len):
        self.memory = deque(maxlen=max_len)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

def select_action(state, target_net, action_dim):
	# epsilon-greedy를 바탕으로 행동을 선택하는 과정
    # if random.random() < epsilon:
    #     return random.randint(0, action_dim - 1)
    # else:
    #     return target_net(state).argmax().item()
    
    # 행동 값을 확률로 변경하여, 확률에 따라 행동을 선택하는 과정
    q_value = target_net(state)
    p = F.softmax(q_value, dim=0).tolist()
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

def optimize_model(memory, policy_net, target_net, optimizer):
    if len(memory) < batch_size:
        return

    transitions = memory.sample(batch_size)
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    # DDQN
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_action = policy_net(next_state_batch).argmax(1).unsqueeze(1)
    next_q_values = target_net(next_state_batch).gather(1, next_action).squeeze().detach()
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon-greedy 방법으로 행동을 선택할 때 필요
# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0

save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

# 500을 10회 진행하면 성공이라고 판단하여, 종료를 하기 위한 장치로 잠깐 쓰인 것
# count = 0

for episode in range(episodes):
    # if count > 10 :
    #     break
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 1000 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/1000}")
    if episode % 1000 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)
        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)
        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
    #    count += 1
         model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
         torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    # epsilon-greedy로 action을 선택할 때는 있어야 함
    # if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DDQN on CartPole')
plt.show()

# 테스트 진행 - render를 켜줘야 확인이 가능
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

결과물

 

코드에 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

728x90
반응형

+ Recent posts