728x90
반응형
728x90

안녕하세요! 오늘은 기존에 작성한 cart pole 문제DDQN(Double Deep Q-Network) 으로 진행하는 방법에 대해 포스팅 하겠습니다.

  • DDQN (Double Deep Q-Network) 이란?

강화 학습의 한 방법으로, DQN과 비슷하지만 학습할 때 Neural Network(인공 신경망)를 두 개 사용하는 학습 방법입니다.

DQN에서 Q-Network가 두 개 사용되었다고 보시면 될 것 같아요!

 

DQN에 대해 궁금하신 분들은 cart pole를 DQN으로 진행하는 아래 포스팅을 참고 해주시면 됩니다.

https://yhj9855.com/entry/RL-gymnasium-cart-pole-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-DQN

 

[RL] gymnasium cart pole 강화 학습 - DQN

안녕하세요! 오늘은 기존에 작성한 cart pole 문제를 DQN(Deep Q-Network) 으로 진행하는 방법에 대해 포스팅 하겠습니다.DQN (Deep Q-Network) 이란?강화 학습의 한 방법으로, Q-learning에서 Q-table 대신 Neural Netw

yhj9855.com

 

cart pole에 관한 전체적인 설명은 아래 포스팅에서 진행하고 있으니, 먼저 확인해주세요!!

https://yhj9855.com/entry/RL-gymnasium-cart-pole-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-1

 

[RL] gymnasium cart pole 강화 학습 - 1

안녕하세요! 오늘은 gymnasium에서 제공하는 cart pole 문제 설명에 대한 포스팅 진행하겠습니다. 저는 cart pole 문제를 DQN과 DDQN 두 가지 방법으로 풀어보았는데요!각각 전체 코드는 포스팅 가장 아래

yhj9855.com


그럼 본격적으로 DDQN으로 cart pole 문제를 풀어보도록 하겠습니다.

Neural Network 구현

우선 먼저 DDQN에서 Neural Network을 먼저 구현합니다.

Neural Network을 구현하는 코드는 아래와 같습니다.

class DDQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DDQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, x):
    	x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

레이어 층을 몇 개 사용할 것인지, 활성 함수는 어떤 것을 사용할 것인지는 모두 하이퍼 파라미터 입니다!

저는 3개의 층을 사용했고, 모두 Relu 함수를 사용했습니다.

보통 어떤 활성 함수를 사용하면 좋을지 모를 때, Relu 함수를 많이 사용하는데요, 그래도 다른 활성 함수도 사용해보시는 걸 추천 드려요!!

저는 tanh 함수를 사용했지만, 학습이 너무 진행되지 않아서 포기했었습니다ㅜㅜ

Replay Memory 구현

다음은 Replay Memory를 구현합니다.

  • Replay Memory란?

DDQN처럼 강화 학습과 딥러닝이 혼합된 알고리즘을 사용할 때, 더 안정적이고 효율적인 학습을 하기 위한 방법입니다.

환경에서 만들어진 에피소드를 저장해두었다가, 랜덤으로 샘플링해서 학습하는 것이 핵심 아이디어 입니다.

 

매 순간순간 에피소드를 학습하게 되면, 비슷한 경험으로 학습이 되지 않고, 편향적으로 학습될 가능성이 있습니다.

Replay Memory 방법을 활용하면 해당 단점을 완화할 수 있어, 자주 사용되는 방법입니다.

 

Replay Memory를 코드로 구현하는 방법은 아래와 같습니다.

class ReplayMemory:
    def __init__(self, max_len):
        self.memory = deque(maxlen=max_len)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

행동 선택하기

다음은 Q-learning과 비슷하게 Q값을 바탕으로 행동을 선택하는 것을 구현합니다.

저는 여기서 행동을 선택하는 방법을 두 가지 소개 드리려고 합니다!

 

1.  ϵ-greedy로 행동 선택하기

  • ϵ-greedy란?

탐험과 이용의 균형을 맞추기 위한 행동 선택 방법으로, 아래 공식을 따릅니다.

여기서 ϵ은 0과 1 사이의 값으로 ϵ 확률 만큼은 랜덤하게 행동을 하게 하여 탐험을 진행하도록 하고, (1-ϵ) 확률 만큼 Q 값이 가장 높은 행동을 선택하도록 합니다.

해당 ϵ을 초반에 높게 설정하고 점차 ϵ을 줄임으로써, 초반에는 랜덤 행동을 통한 탐험을 하게 하고 점차 Q 값을 이용하도록 행동을 선택할 수 있습니다.

 

ϵ-greedy를 활용하여 행동을 선택하는 코드는 아래와 같습니다.

def select_action(state, target_net, action_dim):
    if random.random() < epsilon:
        return random.randint(0, action_dim - 1)
    else:
        return target_net(state).argmax().item()

여기서 행동 값을 target_net을 바탕으로 진행을 하고 있는데요.

학습이 진행 중인 policy_net에서 행동을 선택할 경우, 학습이 불안정할 가능성이 높아 target_net에서 진행하는 것이 더 좋습니다.

 

2. 확률로 행동 선택하기

해당 방법은 Q값을 확률로 변경한 다음, 확률대로 행동을 선택하게 하는 것을 의미합니다.

이 방법을 사용할 경우, 확률적으로 행동을 선택하기 때문에 어느 정도 학습이 진행되어도 탐험을 보장한다는 것과 ϵ을 따로 세팅해주지 않아도 되는 것이 장점이라고 볼 수 있습니다!

 

확률로 행동을 선택하는 코드는 아래와 같습니다.

def select_action(state, target_net, action_dim):
    q_value = target_net(state)
    # Q 값을 확률 값으로 바꾸는 과정
    p = F.softmax(q_value, dim=0).tolist()
    # 부동소수점 오차로 인해 합이 1이 안되는 문제 해결
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

DDQN 공식을 사용해서 업데이트

이제 DDQN 공식을 사용해서 네트워크를 업데이트 하는 것을 구현해보도록 하겠습니다.

코드로 들어가지 전에 먼저 DDQN 공식을 먼저 살펴보겠습니다.

기본적으로는 DQN과 비슷한 공식입니다!

DQN과 Q가 두 번 사용된 것을 보실 수 있습니다.

 

DQN과 마찬가지로 DDQN은 딥러닝을 사용하기 때문에 자체적으로 옵티마이저와 learning rate가 들어가게 됩니다.

 두 개가 DDQN의 α로 작용을 하기 때문에 실제로 사용하는 공식에는 α가 사라져 아래와 같은 공식이 됩니다!

θ와 θ ̄ 는 α 대신 네트워크가 작동하는 부분이라고 생각하시면 됩니다.

 

이제 해당 공식을 바탕으로 DDQN을 진행하는 코드는 아래와 같습니다.

def optimize_model(memory, policy_net, target_net, optimizer):
	# batch_size만큼 데이터가 메모리에 쌓였을 때만 학습 진행
    if len(memory) < batch_size:
        return

    # transitions = (state, action, reward, next_state, done)
    transitions = memory.sample(batch_size)
    # state, action, reward, next_state, done을 각각 묶어서 list의 형태로 만드는 작업
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    # DDQN
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_action = policy_net(next_state_batch).argmax(1).unsqueeze(1)
    next_q_values = target_net(next_state_batch).gather(1, next_action).squeeze().detach()
    # (1-done_batch)을 통해 에피소드가 끝났는지 아닌지를 판단
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

policy_net에서 가져온 next_action을 바탕으로 target_net에서 next_q_values 값을 구하고 있습니다.

이처럼 policy_net, target_net 두 가지에서 가져온 값으로 계산을 진행하기 때문에 Double Deep Q-Network가 된 것입니다.

모델 학습

이제 본격적으로 학습을 진행해보도록 하겠습니다.

한 가지 중요한 점은 Cart Pole 환경이 500점을 달성해도, 에피소드 완료라고 판단하지 않기 때문에 저희가 직접 판단해줘야 합니다.

 

학습을 진행하는 코드는 아래와 같습니다.

# 초기 세팅
policy_net = DDQN(state_dim, action_dim)
target_net = DDQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon-greedy 방법으로 행동을 선택할 때 필요
# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0

save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

# 모델 학습
for episode in range(episodes):
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 1000 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/1000}")
    if episode % 1000 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)
        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)
        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
         model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
         torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    # epsilon-greedy로 action을 선택할 때는 있어야 함
    # if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

리워드 시각화 및 모델 테스트

마지막으로 저희가 학습한 모델이 잘 학습되었는지 확인하기 위해, 리워드를 시각화하고 모델을 테스트해보겠습니다.

먼저 리워드 시각화 하는 코드는 아래와 같습니다

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN on CartPole')
plt.show()

저는 위와 같은 결과 값이 나왔는데요, 한결 같은 값을 가지는 것은 아니지만 전체적으로 점점 리워드가 상승하는 것을 볼 수 있습니다.

이처럼 여러분의 리워드도 전체적으로 상승하는 형상을 보이고 있다면, 학습이 잘 된 것으로 보실 수 있습니다.

다음은 모델 테스트를 진행해보겠습니다.

저는 모든 모델을 테스트 한 것은 아니도, 500점 이상을 달성한 모델만 따로 저장하여 모델 테스트를 진행해보았습니다.

리워드 그래프에서도 보셨듯, 전체적으로 리워드가 상승하는 것이지 모든 모델이 좋은 모델이라고는 볼 수 없기 때문에 최대치의 리워드를 달성한 모델로 테스트를 진행하였어요!

 

모델 테스트를 진행하는 코드는 아래와 같습니다.

# 테스트 시, render 활성화 필요
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

특정 모델은 500점 리워드를 달성하지 못하는 경우도 있었지만, 대부분은 500점 이상을 달성하는 것을 볼 수 있었습니다.

모델마다 다른 방식으로 500점을 달성하는데, 한 번 구경해보시는 것도 좋을 것 같아요!

전체 코드

import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import os
import glob
import torch.nn.functional as F

# 하이퍼 파라미터
gamma = 0.99
learning_rate = 0.0005
batch_size = 100
memory_size = 5000
episodes = 5000
# ϵ-greedy 사용 시, 필요
# epsilon_start = 1.0
# epsilon_end = 0.001
# epsilon_decay = 0.995

class DDQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 32)
        self.fc2 = nn.Linear(32, 32)
        self.fc3 = nn.Linear(32, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

def select_action(state, target_net, action_dim):
	# ϵ-greedy
	# if random.random() < epsilon:
    #     return random.randint(0, action_dim - 1)
    # else:
    #     return target_net(state).argmax().item()
    
    q_value = target_net(state)
    p = F.softmax(q_value, dim=0).tolist()
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

def optimize_model(memory, policy_net, target_net, optimizer):
	if len(memory) < batch_size:
        return

    transitions = memory.sample(batch_size)
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_action = policy_net(next_state_batch).argmax(1).unsqueeze(1)
    next_q_values = target_net(next_state_batch).gather(1, next_action).squeeze().detach()
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = DDQN(state_dim, action_dim)
target_net = DDQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0
save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

for episode in range(episodes):
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 100 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/100}")
    if episode % 100 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)

        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)

        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
        model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
        torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    
    # ϵ-greedy 사용 시, 필요
  	# if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN on CartPole')
plt.show()

# 테스트 진행
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

 

사실 cart pole 문제는 간단한 편이라서, DQN이랑 DDQN이랑 성능이 거의 차이가 나지 않았습니다!

다만 그래도 Q-Network를 두 개를 사용한다는 개념이 헷갈릴 수 있기 때문에 한 번은 구현해보시는 걸 추천드려요:)

코드에 대해 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

728x90
반응형
728x90
반응형

 

728x90

안녕하세요! 오늘은 기존에 작성한 cart pole 문제DQN(Deep Q-Network) 으로 진행하는 방법에 대해 포스팅 하겠습니다.

  • DQN (Deep Q-Network) 이란?

강화 학습의 한 방법으로, Q-learning에서 Q-table 대신 Neural Network(인공 신경망)을 사용해서 학습하는 방법입니다.

state가 많아질수록 Q-table을 저장하고 업데이트 하는 것이 어렵기 때문에, 신경망이 대신 예측하도록 하는 것입니다.

Q-learning+DNN(Deep Neural Network)라고 보시면 좋을 것 같아요!

 

Q-learning에 대해 궁금하신 분들은 frozen lake를 Q-learning으로 진행하는 아래 포스팅을 참고 해주시면 됩니다.

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-2

 

[RL] gymnasium frozen lake 강화 학습 - 2

안녕하세요! 오늘은 기존에 작성한 frozen lake 문제를 Q-learning으로 진행하는 방법에 대해 포스팅 하겠습니다. Q-learning이란?강화 학습의 한 방법으로, Q라는 테이블을 이용하는 것입니다.Q 테이블

yhj9855.com

 

cart pole에 관한 전체적인 설명은 아래 포스팅에서 진행하고 있으니, 먼저 확인해주세요!!

https://yhj9855.com/entry/RL-gymnasium-cart-pole-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-1

 

[RL] gymnasium cart pole 강화 학습 - 1

안녕하세요! 오늘은 gymnasium에서 제공하는 cart pole 문제 설명에 대한 포스팅 진행하겠습니다. 저는 cart pole 문제를 DQN과 DDQN 두 가지 방법으로 풀어보았는데요!각각 전체 코드는 포스팅 가장 아래

yhj9855.com


그럼 본격적으로 DQN으로 cart pole 문제를 풀어보도록 하겠습니다.

Neural Network 구현

우선 먼저 DQN에서 Neural Network을 먼저 구현합니다.

Neural Network을 구현하는 코드는 아래와 같습니다.

class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, x):
    	x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

레이어 층을 몇 개 사용할 것인지, 활성 함수는 어떤 것을 사용할 것인지는 모두 하이퍼 파라미터 입니다!

저는 3개의 층을 사용했고, 모두 Relu 함수를 사용했습니다.

보통 어떤 활성 함수를 사용하면 좋을지 모를 때, Relu 함수를 많이 사용하는데요, 그래도 다른 활성 함수도 사용해보시는 걸 추천 드려요!!

저는 tanh 함수를 사용했지만, 학습이 너무 진행되지 않아서 포기했었습니다ㅜㅜ

Replay Memory 구현

다음은 Replay Memory를 구현합니다.

  • Replay Memory란?

DQN처럼 강화 학습과 딥러닝이 혼합된 알고리즘을 사용할 때, 더 안정적이고 효율적인 학습을 하기 위한 방법입니다.

환경에서 만들어진 에피소드를 저장해두었다가, 랜덤으로 샘플링해서 학습하는 것이 핵심 아이디어 입니다.

 

매 순간순간 에피소드를 학습하게 되면, 비슷한 경험으로 학습이 되지 않고, 편향적으로 학습될 가능성이 있습니다.

Replay Memory 방법을 활용하면 해당 단점을 완화할 수 있어, 자주 사용되는 방법입니다.

 

Replay Memory를 코드로 구현하는 방법은 아래와 같습니다.

class ReplayMemory:
    def __init__(self, max_len):
        self.memory = deque(maxlen=max_len)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

 

행동 선택하기

다음은 Q-learning과 비슷하게 Q값을 바탕으로 행동을 선택하는 것을 구현합니다.

저는 여기서 행동을 선택하는 방법을 두 가지 소개 드리려고 합니다!

 

1.  ϵ-greedy로 행동 선택하기

  • ϵ-greedy란?

탐험과 이용의 균형을 맞추기 위한 행동 선택 방법으로, 아래 공식을 따릅니다.

여기서 ϵ은 0과 1 사이의 값으로 ϵ 확률 만큼은 랜덤하게 행동을 하게 하여 탐험을 진행하도록 하고, (1-ϵ) 확률 만큼 Q 값이 가장 높은 행동을 선택하도록 합니다.

해당 ϵ을 초반에 높게 설정하고 점차 ϵ을 줄임으로써, 초반에는 랜덤 행동을 통한 탐험을 하게 하고 점차 Q 값을 이용하도록 행동을 선택할 수 있습니다.

 

ϵ-greedy를 활용하여 행동을 선택하는 코드는 아래와 같습니다.

def select_action(state, target_net, action_dim):
    if random.random() < epsilon:
        return random.randint(0, action_dim - 1)
    else:
        return target_net(state).argmax().item()

여기서 행동 값을 target_net을 바탕으로 진행을 하고 있는데요.

학습이 진행 중인 policy_net에서 행동을 선택할 경우, 학습이 불안정할 가능성이 높아 target_net에서 진행하는 것이 더 좋습니다.

 

2. 확률로 행동 선택하기

해당 방법은 Q값을 확률로 변경한 다음, 확률대로 행동을 선택하게 하는 것을 의미합니다.

이 방법을 사용할 경우, 확률적으로 행동을 선택하기 때문에 어느 정도 학습이 진행되어도 탐험을 보장한다는 것과 ϵ을 따로 세팅해주지 않아도 되는 것이 장점이라고 볼 수 있습니다!

 

확률로 행동을 선택하는 코드는 아래와 같습니다.

def select_action(state, target_net, action_dim):
    q_value = target_net(state)
    # Q 값을 확률 값으로 바꾸는 과정
    p = F.softmax(q_value, dim=0).tolist()
    # 부동소수점 오차로 인해 합이 1이 안되는 문제 해결
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

DQN 공식을 사용해서 업데이트

이제 DQN 공식을 사용해서 네트워크를 업데이트 하는 것을 구현해보도록 하겠습니다.

코드로 들어가지 전에 먼저 DQN 공식을 먼저 살펴보겠습니다.

기본적으로는 Q-learning과 동일한 공식입니다!

하지만 DQN은 딥러닝을 사용하기 때문에 자체적으로 옵티마이저와 learning rate가 들어가게 됩니다.

두 개가 DQN의 α로 작용을 하기 때문에 실제로 사용하는 공식에는 α가 사라져 아래와 같은 공식이 됩니다!

θ ̄ 는 α 대신 네트워크가 작동하는 부분이라고 생각하시면 됩니다.

 

α를 사용하지 않기 때문에 일종의 TD 으로도 보실 수 있는데요.

TD 공식으로 변형해서 사용하셔도 문제 없이 학습하실 수 있습니다:)

 

이제 해당 공식을 바탕으로 DQN을 진행하는 코드는 아래와 같습니다.

def optimize_model(memory, policy_net, target_net, optimizer):
	# batch_size만큼 데이터가 메모리에 쌓였을 때만 학습 진행
    if len(memory) < batch_size:
        return

    # transitions = (state, action, reward, next_state, done)
    transitions = memory.sample(batch_size)
    # state, action, reward, next_state, done을 각각 묶어서 list의 형태로 만드는 작업
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    # DQN
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_q_values = target_net(next_state_batch).max(1)[0].detach()
    # (1-done_batch)을 통해 에피소드가 끝났는지 아닌지를 판단
    target_q_values = reward_batch + (gamma * next_q_values * (1-done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

q_values를 policy_net에서 가져오는 이유는 학습 중인 policy_net과 DQN 공식을 적용한 target_net가 비슷해지도록 학습이 되야 하기 때문입니다.

 

모델 학습

이제 본격적으로 학습을 진행해보도록 하겠습니다.

한 가지 중요한 점은 Cart Pole 환경이 500점을 달성해도, 에피소드 완료라고 판단하지 않기 때문에 저희가 직접 판단해줘야 합니다.

 

학습을 진행하는 코드는 아래와 같습니다.

# 초기 세팅
policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon-greedy 방법으로 행동을 선택할 때 필요
# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0

save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

# 모델 학습
for episode in range(episodes):
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 1000 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/1000}")
    if episode % 1000 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)
        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)
        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
         model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
         torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    # epsilon-greedy로 action을 선택할 때는 있어야 함
    # if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

리워드 시각화 및 모델 테스트

마지막으로 저희가 학습한 모델이 잘 학습되었는지 확인하기 위해, 리워드를 시각화하고 모델을 테스트해보겠습니다.

먼저 리워드 시각화 하는 코드는 아래와 같습니다.

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN on CartPole')
plt.show()

저는 위와 같은 결과 값이 나왔는데요, 한결 같은 값을 가지는 것은 아니지만 전체적으로 점점 리워드가 상승하는 것을 볼 수 있습니다.

이처럼 여러분의 리워드도 전체적으로 상승하는 형상을 보이고 있다면, 학습이 잘 된 것으로 보실 수 있습니다.

 

 

다음은 모델 테스트를 진행해보겠습니다.

저는 모든 모델을 테스트 한 것은 아니도, 500점 이상을 달성한 모델만 따로 저장하여 모델 테스트를 진행해보았습니다.

리워드 그래프에서도 보셨듯, 전체적으로 리워드가 상승하는 것이지 모든 모델이 좋은 모델이라고는 볼 수 없기 때문에 최대치의 리워드를 달성한 모델로 테스트를 진행하였어요!

 

모델 테스트를 진행하는 코드는 아래와 같습니다.

# 테스트 시, render 활성화 필요
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

특정 모델은 500점 리워드를 달성하지 못하는 경우도 있었지만, 대부분은 500점 이상을 달성하는 것을 볼 수 있었습니다.

모델마다 다른 방식으로 500점을 달성하는데, 한 번 구경해보시는 것도 좋을 것 같아요!

전체 코드

import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import os
import glob
import torch.nn.functional as F

# 하이퍼 파라미터
gamma = 0.99
learning_rate = 0.0005
batch_size = 100
memory_size = 5000
episodes = 5000
# ϵ-greedy 사용 시, 필요
# epsilon_start = 1.0
# epsilon_end = 0.001
# epsilon_decay = 0.995

class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 32)
        self.fc2 = nn.Linear(32, 32)
        self.fc3 = nn.Linear(32, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

def select_action(state, target_net, action_dim):
	# ϵ-greedy
	# if random.random() < epsilon:
    #     return random.randint(0, action_dim - 1)
    # else:
    #     return target_net(state).argmax().item()
    
    q_value = target_net(state)
    p = F.softmax(q_value, dim=0).tolist()
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

def optimize_model(memory, policy_net, target_net, optimizer):
    if len(memory) < batch_size:
        return

    transitions = memory.sample(batch_size)
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_q_values = target_net(next_state_batch).max(1)[0].detach()
    # DQN
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0
save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

for episode in range(episodes):
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 100 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/100}")
    if episode % 100 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)

        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)

        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
        model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
        torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    
    # ϵ-greedy 사용 시, 필요
  	# if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN on CartPole')
plt.show()

# 테스트 진행
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

 

딥러닝+강화학습으로 진행되기 때문에 모델 학습이나 이런 부분이 진행하면서 많이 어려웠습니다.

저도 이거 풀면서 딥러닝에 대한 지식이 부족하다는 것을 깨닫고 요즘은 딥러닝을 공부하고 있는데, 쉽지 않네요ㅠㅠ

 

DQN이랑 비슷한 DDQN으로 cart pole 문제를 푸는 방법은 아래 링크를 참고해주세요.

https://yhj9855.com/entry/RL-gymnasium-cart-pole-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-DDQN

 

[RL] gymnasium cart pole 강화 학습 - DDQN

안녕하세요! 오늘은 기존에 작성한 cart pole 문제를 DDQN(Double Deep Q-Network) 으로 진행하는 방법에 대해 포스팅 하겠습니다.DDQN (Double Deep Q-Network) 이란?강화 학습의 한 방법으로, DQN과 비슷하지만 학

yhj9855.com

 

코드에 대해 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

 

728x90
반응형
728x90
반응형

728x90

안녕하세요! 오늘은 gymnasium에서 제공하는 cart pole 문제 설명에 대한 포스팅 진행하겠습니다.

 

저는 cart pole 문제를 DQNDDQN 두 가지 방법으로 풀어보았는데요!

각각 전체 코드는 포스팅 가장 아래에 있습니다.


  • cart pole 규칙

우선 gymnasium에서 제공하는 cart pole 문제는 아래 링크에서 자세하게 보실 수 있습니다.

https://gymnasium.farama.org/environments/classic_control/cart_pole/

 

Gymnasium Documentation

A standard API for reinforcement learning and a diverse set of reference environments (formerly Gym)

gymnasium.farama.org

 

위의 사진처럼 생긴 막대를 지속적으로 세우는 것이 cart pole의 가장 큰 목표 입니다.

cart pole의 자세한 규칙은 아래와 같습니다.

  1. 이동 방향좌, 우만 존재
  2. 현재 막대의 상태는 막대 위치, 속도, 각도, 각속도 총 4가지로 표현
  3. 하나의 step이 에피소드 종료되지 않으면 보상 1을 획득
  4. 각도가 좌우 12 º 이상 벗어나거나, 막대가 화면을 벗어나거나, 보상 500(혹은 200) 달성하면 에피소드 종료

저희는 위의 규칙을 잘 생각하여, 막대가 특정 각도를 넘어가거나 화면에 벗어나지 않게 500 에피소드 이상 버티는 것을 학습해주어야 합니다.

  • 필요한 라이브러리 설치

frozen lake는 gymnasium에서 제공하고 있기 때문에 가장 먼저 gymnasium을 설치해주시면 됩니다!

그리고 render 모드 활성화를 진행하기 위해 gymnasium[toy-text] 설치도 같이 진행해주세요.

두 패키지 모두 pip install로 설치해주시면 금방 설치할 수 있습니다.

import gymnasium as gym
from collections import defaultdict
import numpy as np
  • cart pole 환경 세팅

필요한 라이브러리를 설치한 후에는 cart pole 환경을 세팅해봅니다.

우선 환경 세팅을 하는 코드는 아래와 같습니다.

env = gym.make("CartPole-v1")

cart pole은 학습을 진행해주지 않으면 바로 에피소드가 종료가 되기 때문에 테스트를 진행해보기가 어렵습니다ㅠㅠ

어떤 식으로 진행되는지 궁금하신 분들은 전체 코드 복사 후 실행해보시면, cart pole이 어떻게 진행되는지 아실 수 있습니다!


여기까지가 cart pole 환경 세팅이었습니다.

cart pole 환경 세팅은 그렇게 어렵지 않아서, 금방 하실 수 있어요!

 

이번 포스팅에서는 DQN과 DDQN에 관한 전체 코드만 업로드하고, 자세한 포스팅은 추후에 진행하도록 하겠습니다.

두 가지 모두 500 에피소드에 달성한 모델이 다수 존재했습니다.

[DQN 전체 코드]

import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import os
import glob
import torch.nn.functional as F

# 하이퍼 파라미터
gamma = 0.99
learning_rate = 0.001
batch_size = 100
memory_size = 5000
# epsilon에 의해 행동을 선택할 때는 해당 부분 필요
# epsilon_start = 1.0
# epsilon_end = 0.001
# epsilon_decay = 0.995
episodes = 5000

class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, x):
    	x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class ReplayMemory:
    def __init__(self, max_len):
        self.memory = deque(maxlen=max_len)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

def select_action(state, target_net, action_dim):
	# epsilon-greedy를 바탕으로 행동을 선택하는 과정
    # if random.random() < epsilon:
    #     return random.randint(0, action_dim - 1)
    # else:
    #     return target_net(state).argmax().item()
    
    # 행동 값을 확률로 변경하여, 확률에 따라 행동을 선택하는 과정
    q_value = target_net(state)
    p = F.softmax(q_value, dim=0).tolist()
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

def optimize_model(memory, policy_net, target_net, optimizer):
    if len(memory) < batch_size:
        return

    transitions = memory.sample(batch_size)
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    # DQN
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_q_values = target_net(next_state_batch).max(1)[0].detach()
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon-greedy 방법으로 행동을 선택할 때 필요
# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0

save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

# 500을 10회 진행하면 성공이라고 판단하여, 종료를 하기 위한 장치로 잠깐 쓰인 것
# count = 0

for episode in range(episodes):
    # if count > 10 :
    #     break
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 1000 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/1000}")
    if episode % 1000 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)
        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)
        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
    #    count += 1
         model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
         torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    # epsilon-greedy로 action을 선택할 때는 있어야 함
    # if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN on CartPole')
plt.show()

# 테스트 진행 - render를 켜줘야 확인이 가능
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

[DDQN 전체 코드]

import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import os
import glob
import torch.nn.functional as F

# 하이퍼 파라미터
gamma = 0.99
learning_rate = 0.001
batch_size = 100
memory_size = 5000
# epsilon에 의해 행동을 선택할 때는 해당 부분 필요
# epsilon_start = 1.0
# epsilon_end = 0.001
# epsilon_decay = 0.995
episodes = 5000

class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, x):
    	x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class ReplayMemory:
    def __init__(self, max_len):
        self.memory = deque(maxlen=max_len)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

def select_action(state, target_net, action_dim):
	# epsilon-greedy를 바탕으로 행동을 선택하는 과정
    # if random.random() < epsilon:
    #     return random.randint(0, action_dim - 1)
    # else:
    #     return target_net(state).argmax().item()
    
    # 행동 값을 확률로 변경하여, 확률에 따라 행동을 선택하는 과정
    q_value = target_net(state)
    p = F.softmax(q_value, dim=0).tolist()
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

def optimize_model(memory, policy_net, target_net, optimizer):
    if len(memory) < batch_size:
        return

    transitions = memory.sample(batch_size)
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    # DDQN
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_action = policy_net(next_state_batch).argmax(1).unsqueeze(1)
    next_q_values = target_net(next_state_batch).gather(1, next_action).squeeze().detach()
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon-greedy 방법으로 행동을 선택할 때 필요
# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0

save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

# 500을 10회 진행하면 성공이라고 판단하여, 종료를 하기 위한 장치로 잠깐 쓰인 것
# count = 0

for episode in range(episodes):
    # if count > 10 :
    #     break
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 1000 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/1000}")
    if episode % 1000 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)
        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)
        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
    #    count += 1
         model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
         torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    # epsilon-greedy로 action을 선택할 때는 있어야 함
    # if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DDQN on CartPole')
plt.show()

# 테스트 진행 - render를 켜줘야 확인이 가능
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

결과물

 

코드에 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

728x90
반응형
728x90
반응형
728x90

 

안녕하세요! 오늘은 기존에 작성한 frozen lake 문제를 Q-learning으로 진행하는 방법에 대해 포스팅 하겠습니다.

 

  • Q-learning이란?

강화 학습의 한 방법으로, Q라는 테이블을 이용하는 것입니다.

Q 테이블 내 값은 특정 상황에서 어떤 행동을 했을 때의 보상 값의 큰 정도를 나타내는 것으로, 학습이 진행되면서 해당 값들을 업데이트 하여 최적의 행동을 찾는 것입니다.

 

frozen lake에 관한 전체적인 설명은 아래 포스팅에서 진행하고 있으니, 먼저 확인해주세요!

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-1

 

[RL] gymnasium frozen lake 강화 학습 - 1

안녕하세요! 오늘은 gymnasium에서 제공하는 frozen lake 문제에 대한 설명에 대해 포스팅 하겠습니다. 강화 학습이란?행동을 통해 얻는 보상을 기반으로 학습하는 AI의 한 분야입니다.흔히 Reinforcemen

yhj9855.com


그럼 본격적으로 Q-learing으로 frozen lake 문제를 풀어보도록 하겠습니다.

Q-learning으로 frozen lake를 풀 때, 크게 두 가지를 생각하시면 됩니다.

  1. Q 값을 바탕으로 행동을 선택
  2. 선택한 행동을 Q-learning 공식을 사용해서 Q 값 업데이트

위의 두 가지를 하나씩 자세히 살펴보겠습니다.

Q 값을 바탕으로 행동을 선택

해당 부분은 현재까지 업데이트 된 Q 값을 바탕으로 행동을 선택하는 것입니다.

기본적으로는 Q 값이 가장 높은 행동을 선택하면 됩니다.

 

하지만 여기에는 한 가지 문제점이 있습니다.

Q 값이 제대로 업데이트가 될 때까지 충분한 탐험을 진행하지 못했을 경우, 제대로 된 행동을 추출할 수 없다는 것입니다.

예를 들어, 초반에는 Q 값이 모두 0이기 때문에 (0, 0)에서 왼쪽으로 가는 행동만 선택하기 때문에 게임을 진행할 수가 없습니다.

 

저희는 이 문제를 해결하기 위해, ϵ-greedy 방법을 사용할 수 있습니다.

  • ϵ-greedy란?

탐험과 이용의 균형을 맞추기 위한 행동 선택 방법으로, 아래 공식을 따릅니다.

여기서 ϵ은 0과 1 사이의 값으로 ϵ 확률 만큼은 랜덤하게 행동을 하게 하여 탐험을 진행하도록 하고, (1-ϵ) 확률 만큼 Q 값이 가장 높은 행동을 선택하도록 합니다.

해당 ϵ을 초반에 높게 설정하고 점차 ϵ을 줄임으로써, 초반에는 랜덤 행동을 통한 탐험을 하게 하고 점차 Q 값을 이용하도록 행동을 선택할 수 있습니다.

 

ϵ-greedy를 활용하여 행동을 선택하는 코드는 아래와 같습니다.

# 초기 값은 보통 1로 설정
epsilon = 1.0
train = True

# ϵ-greedy를 활용한 행동 선택
def select_action(state) :
	# 훈련을 할 경우에는 ϵ-greedy 방법을 사용
   	# 테스트를 진행할 때는 온전히 Q 값만 사용
   	# np.random.rand()를 넣어, 후반에도 종종 탐험을 할 수 있도록 함
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

 

선택한 행동을 Q-learning 공식을 사용해서 Q 값 업데이트

해당 부분은 위에서 선택한 행동을 환경에서 실행해보고, 그 결과 값을 Q-learning 공식에 맞게 Q 값을 업데이트 하는 것입니다.

코드로 들어가기 전에 먼저 Q 테이블을 업데이트하는 공식을 먼저 살펴보겠습니다.

해당 수식은 Q(s, a)를 업데이트 하는데, 특정 학습률 α에 있어 (1- α)만큼 현재의 Q 값α만큼의 (보상값 r + 할인율  γ * 다음 state의 가장 높은 Q값 maxQ(s', a'))를 반영한다는 의미입니다.

  • 학습률 α

값이 높을수록 다음 행동 값 즉, 새로운 정보를 더 많이 반영한다는 것이고, 낮을수록 현재의 Q 값 즉, 기존의 경험을 더 많이 유지한다는 의미입니다.

  • 할인율 γ

미래 보상의 중요도를 나타내는 지표로, 보통은 미래의 보상에 너무 의존하지 않도록 1보다 약간 작은 수로 지정하는 것이 보통입니다.

  • maxQ(s', a')

다음 상태인 s'에서 가능한 모든 행동 중 가장 높은 Q 값을 의미하며, s'은 현재 state에서 위에서 고른 행동을 실행한 결과 값이라고 보시면 됩니다.

 

이제 해당 공식을 바탕으로 Q-learning을 하는 코드는 아래와 같습니다.

# 학습을 진행할 때는 render 모드 비활성화
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
# 환경의 행동 범위 : 여기서는 상, 하, 좌, 우 총 4개
action_size = env.action_space.n

# defaultdict은 키가 없을 때 자동으로 기본값을 생성하기 때문에 강화 학습에서 많이 사용
Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
# 총 학습을 진행할 에피소드 수
max_episode = 10000

def learn() :
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        # 에피소드를 처음 시작할 때 reset
        state, _ = env.reset()
        done = False
        all_reward = 0
        # 에피소드가 종료될 때까지 반복
        while not done :
        	# Q 테이블을 바탕으로 action을 고르는 함수
            action = select_action(state)
            # state, reward, done 외 사용하지 않기 때문에 _ 처리
            new_state, reward, done, _, _ = env.step(action)
            # Q-learning 공식
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*np.max(Q[new_state]))
            all_reward += reward
            state = new_state
        # 50번째 에피소드 마다 ϵ 값을 줄여줌
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

위의 두 가지 과정을 합치면 Q-learing으로 frozen lake를 풀 수 있는 코드가 완성됩니다!

학습 후 테스트를 진행하고 싶으신 경우에는 render를 킨 환경을 다시 세팅해서 해주시면 됩니다.

 

전체 코드

행동 선택, 학습, 테스트 과정을 모두 포함한 전체 코드는 아래와 같습니다.

import gymnasium as gym
from collections import defaultdict
import numpy as np

# 미끄러짐 옵션 True/False 선택 가능
is_slippery = True
# 8x8 중에 선택 가능
map_size = '4x4'
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
action_size = env.action_space.n

Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
epsilon = 1.0
train = True
max_episode = 100000


def select_action(state) :
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

def learn() :
    global epsilon
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        state, _ = env.reset()
        done = False
        all_reward = 0
        while not done :
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*np.max(Q[new_state]))
            all_reward += reward
            state = new_state
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

# 학습한 Q를 바탕으로 frozen lake 테스트
def testing_after_learning():
	# render를 켜야 제대로 학습이 되었는지 확인할 수 있음
    env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery, render_mode='human')
    total_test_episode = 10
    rewards = []
    for episode in range(total_test_episode):
        state, _ = env.reset()
        episode_reward = 0
        while True: 
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            episode_reward += reward
            if done:
                rewards.append(episode_reward)
                break
            state = new_state
    print("")
    print("avg: " + str(sum(rewards) / total_test_episode))

if __name__ == "__main__" :
    learn()
    testing_after_learning()

 

 

테스트를 진행하면서 is_slippery 옵션을 껐을 경우에는 1.0 보상을 받으면 성공이고, is_slippery 옵션을 켰을 경우에는 70% 이상 1.0 보상을 받으면 성공이라고 보실 수 있습니다.

 

추가로  is_slippery 옵션을 켰을 경우에는 학습을 많이 진행해야 어느 정도 수렴하시는 걸 보실 수 있습니다!

아무래도 model-free로 진행을 하니까 많이 느리더라구요ㅠㅠ

  • model-based

model-free가 아닌 어느 정도 model-based로 빠르게 학습을 하고 싶으신 경우 아래 상황을 고려할 수 있습니다.

  1. 행동 한 번을 진행할 때마다 reward에 - 진행
    → RL이 최단 경로로 진행하려는 경향을 학습할 수 있음
  2. 구멍에 빠졌을 경우, reward에 크게 - 진행
    → 구멍에 빠지지 않는 쪽으로 빠르게 학습할 수 있음
  3. 도착했을 경우, reward를 크게 추가
    → 도착 지점에 확실히 도착하기 위해 큰 reward를 지급

그 외에도 벽에 부딪히거나 하는 등 맵을 알고 있기 때문에 환경에 맞게 reward를 추가로 주거나 마이너스를 진행하여, model-based 모델을 만들 수도 있습니다.

 

그래도 강화 학습을 제대로 알기 위해서는 model-free로 진행해보는 것을 추천드립니다!

 

Q-learning이랑 비슷한 SARSA로 frozen lake 문제를 푸는 방법은 아래 링크를 참고해주세요.

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-SARSA

 

[RL] gymnasium frozen lake 강화 학습 - SARSA

안녕하세요! 오늘은 기존 작성한 frozen lake 문제를 SARSA로 진행하는 방법에 대해 포스팅 하겠습니다.SARSA 란?강화 학습의 한 방법으로, Q-learning과 비슷하지만 행동 선택이 다른 방식입니다.Q-learning

yhj9855.com

 

코드에 대해 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

728x90
반응형
728x90
반응형
728x90

 

안녕하세요! 오늘은 gymnasium에서 제공하는 frozen lake 문제에 대한 설명에 대해 포스팅 하겠습니다.

 

  • 강화 학습이란?

행동을 통해 얻는 보상을 기반으로 학습하는 AI의 한 분야입니다.

흔히 Reinforcement Learning 줄여서, RL이라고 부르는 강화 학습은 컴퓨터가 스스로 경험을 쌓으면서 어떤 행동이 가장 좋은 보상을 얻는지 배우는 과정이라고 볼 수 있습니다.

 

저는 frozen lake 문제를 Q-learningSARSA 두 가지 방법으로 풀어보았는데요!

각각 전체 코드는 포스팅 가장 아래에 있습니다.


  • frozen lake 규칙

우선 gymnasium에서 제공하는 frozen lake 문제는 아래 링크에서 자세하게 보실 수 있습니다.

https://gymnasium.farama.org/environments/toy_text/frozen_lake/

 

Gymnasium Documentation

A standard API for reinforcement learning and a diverse set of reference environments (formerly Gym)

gymnasium.farama.org

 

frozen lake의 기본 맵은 아래 사진처럼 생겼습니다.

  1. frozen lake의 기본 맵은 4x4 혹은 8x8로 존재
  2. 맵 내 구멍이 존재
    → 구멍의 수나 위치는 조절할 수 없음
  3. 선물에 있는 곳에 도착하거나, 구멍에 빠지면 에피소드 종료
    물에 있는 곳에 도착해야만 보상 1을 획득할 수 있음
  4. 움직임은 상, 하, 좌, 우 존재
  5. 얼음 내 미끄러짐을 키고 끌 수 있음
    → 미끄러짐을 킬 경우, 각 얼음마다 특정 확률로 다른 곳으로 움직이는 확률이 존재
    → 각 얼음마다 존재하는 미끄러질 확률을 조절할 수 없음

저희는 위의 규칙을 잘 생각하여, 구멍에 빠지지 않고 선물에 도착하는 경로를 학습해주어야 합니다.

  • 필요한 라이브러리 설치

frozen lake는 gymnasium에서 제공하고 있기 때문에 가장 먼저 gymnasium을 설치해주시면 됩니다!

그리고 render 모드 활성화를 진행하기 위해 gymnasium[toy-text] 설치도 같이 진행해주세요.

두 패키지 모두 pip install로 설치해주시면 금방 설치할 수 있습니다.

import gymnasium as gym
from collections import defaultdict
import numpy as np
  • frozen lake 환경 세팅

필요한 라이브러리를 설치한 후에는 frozen lake 환경을 세팅해봅니다.

우선 환경 세팅을 하는 코드는 아래와 같습니다.

# 미끄러지는 얼음을 만들지 결정하는 변수
is_slippery = False
# 맵 사이즈
map_size = '4x4'
# frozen lake 환경 설정
# render 활성화 환경
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery, render_mode='human')
# 환경을 초기화하는 함수
env.reset()

env.reset()의 경우 기본 기능은 환경을 초기화하는 함수지만, 환경을 세팅한 다음에 해당 코드를 부르지 않으면 오류가 발생하니 반드시 환경 사용 전에 env.reset()을 먼저 불러주세요!

  • frozen lake 테스트 진행하기

이제 만든 환경을 바탕으로 gymnasium에서 제공하는 frozen lake 테스트를 진행해봅시다!

테스트를 진행하는 코드는 아래와 같습니다.

while True:
    # 0 : 왼쪽, 1 : 아래, 2 : 오른쪽, 3 : 위
    action = input("이동할 방향: ")
    if action not in ['0','1','2','3']:
        continue
    action = int(action)
    # step() : 환경에 액션을 넣고 실행하는 함수
    state, reward, done, info, prob = env.step(action)
    print("위치: ", state, "행동: ", action, "보상: ", reward)
    print()
    if done:
        print("에피소드 종료", reward)
        break

해당 코드를 실행해보면, 4x4 맵이 있는 창에 입력한 행동대로 움직이는 것을 확인할 수 있습니다.

구멍에 빠지거나 선물에 도착하기 전에는 계속 움직일 수 있으니, 환경을 이해하기 위해 편하게 테스트 진행해보세요!

맵을 키우거나 미끄러지는 얼음을 키고 테스트 해보시는 것도 추천드립니다.

 


여기까지가 frozen lake 환경 세팅이었습니다.

이번 포스팅에서는 Q-learning과 SARSA에 관한 model-free 전체 코드만 업로드 되어 있습니다.

두 가지 모두 is_slippery가 False일 경우에는 1.0 만점을 받았고, is_silppery가 True일 경우에는 0.8 이상의 보상을 획득했습니다.

 

Q-learning에 관한 자세한 내용이 궁금하신 분들은 아래 링크에서 확인해주세요!

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-2

 

[RL] gymnasium frozen lake 강화 학습 - 2

안녕하세요! 오늘은 기존에 작성한 frozen lake 문제를 Q-learning으로 진행하는 방법에 대해 포스팅 하겠습니다. Q-learning이란?강화 학습의 한 방법으로, Q라는 테이블을 이용하는 것입니다.Q 테이블

yhj9855.com

 

SARSA에 관한 자세한 내용이 궁금하진 분들은 아래 링크에서 확인해주세요!

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-SARSA

 

[Q-learning 전체 코드]

import gymnasium as gym
from collections import defaultdict
import numpy as np

is_slippery = True
map_size = '8x8'
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
action_size = env.action_space.n

Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
epsilon = 1.0
train = True
max_episode = 100000


def select_action(state) :
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

def learn() :
    global epsilon
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        # 에피소드를 처음 시작할 때는 reset을 해줘야 함
        state, _ = env.reset()
        done = False
        all_reward = 0
        # 에피소드가 종료될 때까지 반복
        while not done :
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            # Q 러닝
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*np.max(Q[new_state]))
            all_reward += reward
            state = new_state
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

def testing_after_learning():
    env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery, render_mode='human')
    total_test_episode = 10
    rewards = []
    for episode in range(total_test_episode):
        state, _ = env.reset()
        episode_reward = 0
        while True: 
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            episode_reward += reward
            if done:
                rewards.append(episode_reward)
                break
            state = new_state
    print("")
    print("avg: " + str(sum(rewards) / total_test_episode))

if __name__ == "__main__" :
    learn()
    testing_after_learning()

결과물

is_slippery = False / True

[SARSA 전체 코드]

import gymnasium as gym
from collections import defaultdict
import numpy as np

is_slippery = True
map_size = '4x4'
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
action_size = env.action_space.n

Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
epsilon = 1.0
train = True
max_episode = 100000


def select_action(state) :
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

def learn() :
    global epsilon
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        # 에피소드를 처음 시작할 때는 reset을 해줘야 함
        state, _ = env.reset()
        done = False
        all_reward = 0
        # 에피소드가 종료될 때까지 반복
        while not done :
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            next_action = select_action(new_state)
            # SARSA
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*Q[new_state][next_action])
            all_reward += reward
            state = new_state
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

def testing_after_learning():
    env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery, render_mode='human')
    total_test_episode = 10
    rewards = []
    for episode in range(total_test_episode):
        state, _ = env.reset()
        episode_reward = 0
        while True: 
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            episode_reward += reward
            if done:
                rewards.append(episode_reward)
                break
            state = new_state
    print("")
    print("avg: " + str(sum(rewards) / total_test_episode))

if __name__ == "__main__" :
    learn()
    testing_after_learning()

결과물

is_slippery = False / True

 

코드에 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

 

728x90
반응형

+ Recent posts