728x90
반응형
728x90

안녕하세요! 오늘은 파이썬으로 하는 시각화 활용 공통 사항에 대해 포스팅 하려고 합니다.

 

어떤 그래프를 그려도, x축/y축/범례/제목 등은 공통적으로 지정해야 하는데요.

해당 부분이 간단해보이더라도, 시각화에서 상당히 중요한 역할을 하는 경우가 있습니다!

그래서 제가 시각화 하면서 활용했던 부분들을 정리해보려고 합니다('. • ᵕ •. `)

 

그래프 예시는 line plot으로 진행할 예정입니다.

line plot에 대한 자세한 내용이 궁금하신 분들은 아래 링크를 참고해주세요:)

https://yhj9855.com/entry/%EC%8B%9C%EA%B0%81%ED%99%94-%ED%8C%8C%EC%9D%B4%EC%8D%AC-%EC%8B%9C%EA%B0%81%ED%99%94-%ED%99%9C%EC%9A%A9-Line-plot

 

[시각화] 파이썬 시각화 활용 - Line plot

안녕하세요! 오늘은 파이썬으로 하는 시각화 활용 Line plot에 대해 포스팅 하려고 합니다.Line plot이란?시간이나 연속적인 값을 나타낼 때 사용되는 그래프로, 일반적인 선 그래프 입니다.보통 x축

yhj9855.com

 

X축, Y축, 제목 설정하기

  • 기본 설정하기

우선 가장 기본적으로 x축, y축, 제목을 설정하는 것부터 시작하겠습니다!

import numpy as np
import matplotlib.pyplot as plt

# 데이터 생성
x = np.linspace(0, 10, 100)
y = np.sin(x)

plt.xlabel("X-axis")
plt.ylabel("Y-axis")
plt.title("Title")
plt.plot(x, y)

  • 위치 변경하기

x축, y축, 제목은 모두 위치를 변경할 수 있습니다.

위치는 두 가지 방법으로 옮길 수 있습니다.

 

1. pad를 사용하여 간격을 조절

import numpy as np
import matplotlib.pyplot as plt

# 데이터 생성
x = np.linspace(0, 10, 100)
y = np.sin(x)

plt.xlabel("X-axis", labelpad=40)
plt.ylabel("Y-axis", labelpad= 30)
plt.title("Title", pad=30)
plt.plot(x, y)

각 축과 제목의 간격이 멀어지신게 보이시나요?

pad 내 숫자가 커질수록 그래프와 축/제목 사이의 간격을 멀게 설정할 수 있습니다.

 

2. 좌표를 설정하여 위치를 조절

제목은 좌표를 설정해서 위치를 조절할 수 있습니다!

축의 경우에도 동일하게 좌표 설정을 할 수 있는데, 좌표대로 잘 움직이지 않아 거의 사용하지 않습니다ㅠㅠ

import numpy as np
import matplotlib.pyplot as plt

# 데이터 생성
x = np.linspace(0, 10, 100)
y = np.sin(x)

fig, ax = plt.subplots(figsize=(6, 4))
# 글씨 크기 조절 가능
plt.xlabel("X-axis", fontsize=14)
plt.ylabel("Y-axis", fontsize=14)
ax.set_title("Title", fontsize=14, x=0.8, y=1.05)
plt.plot(x, y)

제목의 위치가 변경되신게 보이시나요?

x는 좌우의 위치를, y는 상하의 위치를 변경할 수 있습니다!

범례 설정하기

  • 범례란?

범례는 지도나 차트 등에서 참고하라는 뜻으로 나타낸 정보입니다.

파이썬 시각화에서는 보통 각 그래프가 어떤 것을 나타내는지 표기할 때 많이 사용합니다!

 

아래 그래프처럼 노란색과 연두색이 각각 어떤 그래프를 나타내는지 아래쪽에 표기된 것이 범례입니다.

  • 범례 생성하기

보통 범례는 자동으로 생성되는 경우가 많은데, 그래프를 각각 그릴 경우에는 범례가 생성되지 않습니다.

이 때 직접 범례를 설정하는 것도 가능합니다.

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# 데이터 생성
x = np.linspace(0, 10, 100)
y = np.cos(x)
y1 = np.sin(x)

line1, = plt.plot(x, y, color='lightskyblue')
line2, = plt.plot(x, y1, color='lightcoral')

# 범례 직접 설정
plt.legend(handles=[line1, line2], labels=["Cos(x)", "Sin(x)"])

plt.xlabel("X-axis")
plt.ylabel("Y-axis")
plt.title("Legend Example")

plt.show()

plt.legend(handles=[line1, line2], labels=["Cos(x)", "Sin(x)"]) 여기서 loc = 옵션을 추가하게 되면 범례의 위치를 어느 정도 조정할 수 있습니다!

예를 들어 upper right 옵션으로 하게 되면, 오른쪽 위에 범례가 생성되는데요, 옵션을 정하지 않으면 가장 적당한 위치에 알아서 생성이 됩니다.

  • 위치 변경하기

위의 사진처럼 범례가 자동으로 생성될 때 그래프를 가리는 경우를 자주 접하실 수 있는데요!

이 때 범례 위치를 변경하는 코드는 알아두시면 유용합니다:)

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# 데이터 생성
x = np.linspace(0, 10, 100)
y = np.cos(x)
y1 = np.sin(x)


line1, = plt.plot(x, y, color='lightskyblue')
line2, = plt.plot(x, y1, color='lightcoral')

# 범례 직접 설정
plt.legend(handles=[line1, line2], labels=["Cos(x)", "Sin(x)"], loc='lower right', bbox_to_anchor=(0.81, 0.03))

plt.xlabel("X-axis")
plt.ylabel("Y-axis")
plt.title("Legend Example")

plt.show()

범례의 위치가 변경된 게 보이시나요?

지금은 예쁘게 옮긴 건 아니지만, 범례를 자유롭게 움직일 수 있는 것은 굉장히 편리하니 잘 사용해주세요!

먼저 loc 옵션을 조정하여 큰 틀의 위치를 정해주시고, bbox_to_anchor 내 좌표로 세세한 위치를 조정해주시면 됩니다:)

한글 설정하기

마지막으로 각 축, 제목, 범례를 한글로 정하는 방법에 대해서 알려드리겠습니다!

보통 한글로 설정을 하게 되면 아래 사진처럼 한글이 깨져서 나오기 때문에 한글 설정이 먼저 필요합니다.

한글 설정을 위해서는 먼저 한글 폰트를 찾아야 합니다.

C:\Windows\Fonts 해당 경로로 가시면, 컴퓨터에 설치되어 있는 폰트를 보실 수 있습니다!

이제 저희가 사용하고 싶은 폰트를 고르면 되는데, 아쉽게도 모든 폰트를 지원하지는 않습니다ㅠㅠ

 

파이썬은 바탕, 굴림, 궁서체 중 골라서 사용하시는게 안전합니다:) (그래도 이것저것 해보시는 걸 추천 드려요)

위의 글씨체 중 하나를 골라 마우스 오른쪽 클릭→속성→이름 복사를 하시면 되는데, 이름은 .ttc 앞까지만 복사해주세요!

※ 만약에 속성이 나타나지 않는다면, 폰트를 더블 클릭해서 들어가신 다음 진행하시면 됩니다.

간혹 HY시리즈는 이름 그대로를 사용하셔야 되는 경우도 있습니다.

예를 들면 HYPost의 경우 HYPost-Medium, HY고딕의 경우 HYGothic-Medium을 사용합니다.

 

이제 아래 코드를 실행하시게 되면 한글 지원이 가능합니다.

plt.rcParams['font.family'] = 'HYPost-Medium'

이제 한글로 잘 보이는 걸 알 수 있습니다!!

하지만 한글로 변경할 때는 종종 숫자의 마이너스가 깨지는 경우가 있어요ㅠㅠ

해당 경우는 마이너스가 지원되는 한글을 써야하는데, 저는 보통 굴림을 사용합니다.

plt.rcParams['font.family'] = 'gulim'

이제 한글과 마이너스가 모두 잘 보이는 것을 확인할 수 있습니다!

 

여기까지 축, 제목, 범례 활용을 정리해보았습니다!

이것저것 쓰다보니 꽤 길어졌는데요, 시각화는 예쁘면 예쁠수록 도움이 되기 때문에 세세한 부분이라도 잘 활용하시면 좋을 것 같습니다:)

특히 한글 설정 같은 경우, 글씨체가 이쁘면 보기도 좋으니 여러 폰트로 한 번 사용해보시길 추천드려요ദ്ദി・ᴗ・)✧

★읽어주셔서 감사합니다★

 

728x90
반응형
728x90
반응형
728x90

 

안녕하세요! 오늘은 파이썬으로 하는 시각화 활용 Line plot에 대해 포스팅 하려고 합니다.

  • Line plot이란?

시간이나 연속적인 값을 나타낼 때 사용되는 그래프로, 일반적인 선 그래프 입니다.

보통 x축에는 연속적인 변수y축에는 수치형 데이터를 배치해서 사용하는 경우가 일반적입니다.

 

저는 보통 식을 그릴 때는 matplotlib, 데이터 프레임이 있는 경우에는 seaborn, matplotlib 두 개를 함께 사용해서 line plot을 그립니다.

matplotlib로 단일 그래프 그리기

우선 먼저 matplotlib를 사용해서 간단한 그래프를 그려보겠습니다.

아래처럼 숫자를 직접 입력하거나, 특정 식이 존재한다면 matplotlib만 사용해서 그리는 것이 간단합니다!

import matplotlib.pyplot as plt

# 왼쪽이 x 값, 오른쪽이 y 값
plt.plot([1, 2, 3, 4], [2, 3, 5, 10])
plt.show()

import numpy as np
import matplotlib.pyplot as plt

# 데이터 생성
x = np.linspace(0, 10, 100)
y = np.sin(x)

plt.plot(x, y)
plt.show()

seaborn을 함께 사용하여 단일 그래프 그리기

seaborn은 데이터 프레임과 호환성이 좋기 때문에 보통 데이터 프레임으로 사용합니다.

import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# 데이터 생성
x = np.linspace(0, 10, 100)
y = np.cos(x)
df = pd.DataFrame({"X": x, "Y": y})

sns.lineplot(x="X", y="Y", data=df)

그래프 커스텀 하기

사실 그래프를 그리는 것은 정말 간단합니다!

하지만 그래프를 단순히 그리는 것과 이를 커스텀해서 사용하는 것은 정말 큰 차이가 있습니다.

지금부터는 다양하게 그래프를 커스텀하는 방법에 대해 소개해드리겠습니다!

  • 색상 사용하기

그래프에 색상을 입히는 것입니다.

파이썬에서 사용할 수 있는 색상은 아래 포스팅을 참고해주세요!

https://yhj9855.com/entry/%EC%8B%9C%EA%B0%81%ED%99%94-%ED%8C%8C%EC%9D%B4%EC%8D%AC-%EC%8B%9C%EA%B0%81%ED%99%94-%EC%83%89%EC%83%81-%EC%A0%95%EB%A6%AC-Matplotlib-Seaborn

 

[시각화] 파이썬 시각화 색상 정리 (Matplotlib, Seaborn)

안녕하세요! 오늘은 데이터 분석에서 정말 중요한 그래프 색상을 정리하는 포스팅을 진행하도록 하겠습니다. 데이터 분석에서 시각화는 정말 중요한데요.똑같이 데이터 분석을 진행했다고 해

yhj9855.com

 

# matplot
plt.plot(x, y, color='deepskyblue')

# seaborn
sns.lineplot(x="X", y="Y", data=df, color='deeppink')

  • 그래프 동시에 그리기

하나의 영역에 여러 개의 그래프를 그릴 수 있습니다.

그래프가 N개면 N개만큼 그려주면 동일 영역에 그려지는 것을 확인하실 수 있습니다.

# 데이터 생성
x = np.linspace(0, 10, 100)
y = np.cos(x)
y1 = np.sin(x)
df = pd.DataFrame({"X": x, "Y": y})
df1 = pd.DataFrame({"X": x, "Y": y1})

# seaborn
sns.lineplot(x="X", y="Y", data=df, color='lightskyblue')
sns.lineplot(x="X", y="Y", data=df1, color='lightcoral')

# matplotlib
plt.plot(x, y, color='deepskyblue')
plt.plot(x, y1, color='deeppink')

 

seaborn에서는 하나의 데이터 프레임에서 특정 열을 기준으로 두 개의 그래프를 나눌 수 있습니다.

아래 코드를 보시면 Y열이 label을 기준으로 cos그래프와 sin그래프가 나눠져있기 때문에 이를 label로 분리하여, 그래프를 그릴 수 있습니다.

데이터 분석을 하실 때 생각보다 많이 사용되기 때문에 잘 활용하시면 좋습니다!

# 데이터 생성
x = np.linspace(0, 10, 100)
y = np.cos(x)
y1 = np.sin(x)
df1 = pd.DataFrame({"X": x, "Y": y, "label": 'cos'})
df2 = pd.DataFrame({"X": x, "Y": y1, "label":'sin'})

# df1 밑에 df2를 붙이는 작업
df = pd.concat([df1, df2], ignore_index= True)

# 그래프 영역 크기 정하기 (가로/세로)
plt.figure(figsize=(8, 5))
sns.lineplot(x="X", y="Y", data=df, hue='label', palette=['gold', 'limegreen'])

  • 그래프 축/제목/범례 설정

그래프 제목, x축, y축, 범례 등을 직접 지정할 수 있습니다!

축, 제목, 범례를 설정하는 방법 및 한글 설정하는 자세한 과정은 아래 포스팅을 참고해주세요.

https://yhj9855.com/entry/%EC%8B%9C%EA%B0%81%ED%99%94-%ED%8C%8C%EC%9D%B4%EC%8D%AC-%EC%8B%9C%EA%B0%81%ED%99%94-%ED%99%9C%EC%9A%A9-%EB%B2%94%EB%A1%80-%EC%B6%95-%EC%A0%9C%EB%AA%A9-with-%ED%95%9C%EA%B8%80-%EC%A7%80%EC%A0%95

 

[시각화] 파이썬 시각화 활용 - 범례, 축, 제목 (with 한글 지정)

ㄱ안녕하세요! 오늘은 파이썬으로 하는 시각화 활용 공통 사항에 대해 포스팅 하려고 합니다. 어떤 그래프를 그려도, x축/y축/범례/제목 등은 공통적으로 지정해야 하는데요.해당 부분이 간단해

yhj9855.com

 

  • 그래프 마크 설정하기

그래프 내 x 좌표마다 마크를 설정할 수 있습니다.

# 데이터 생성
x = np.linspace(0, 10, 100)
y = np.cos(x)
y1 = np.sin(x)
df = pd.DataFrame({"X": x, "Y": y})
df1 = pd.DataFrame({"X": x, "Y": y1})

plt.figure(figsize=(8, 5))

# seaborn
sns.lineplot(x="X", y="Y", data=df, color='lightskyblue', marker='o')
sns.lineplot(x="X", y="Y", data=df1, color='lightcoral',  marker='*', markersize=9)

# matplotlib
plt.plot(x, y, color='deepskyblue', marker='o', markersize = 4)
plt.plot(x, y1, color='deeppink', marker='^',  markersize = 5)

 

하지만 이렇게 x좌표마다 마크를 설정하는 것이 아닌 특정 좌표마다 마크를 설정하고 싶으실 수도 있습니다.

그럴 때는 그래프 위에 점으로 된 그래프를 하나 더 그리는 방향으로 진행할 수 있습니다!

아래 코드는 5번째마다 그래프 위에 점을 찍는다고 보시면 됩니다.

for문 안에 있는 코드를 변경하시면 원하시는 곳에 마크 표시를 하실 수 있습니다.

# 데이터 생성
x = np.linspace(0, 10, 100)
y = np.cos(x)
y1 = np.sin(x)
df = pd.DataFrame({"X": x, "Y": y})
df1 = pd.DataFrame({"X": x, "Y": y1})

plt.figure(figsize=(8, 5))
sns.lineplot(x="X", y="Y", data=df, color='orange')
sns.lineplot(x="X", y="Y", data=df1, color='limegreen')

for i in range(len(x)) :
    if i%5 == 0 :
        plt.scatter(x[i], y[i], color='darkorange', s=15)
        plt.scatter(x[i], y1[i], color='forestgreen', s=15)

  • 그래프 텍스트 표기하기

그래프 내 텍스트를 표기할 수도 있습니다.

텍스트 표기는 위에 마크 설정하는 것처럼 그래프 위에 텍스트를 하나 더 그리는 방향으로 진행할 수 있습니다.

# 데이터 생성
x = np.linspace(0, 10, 100)
y = np.cos(x)
y1 = np.sin(x)
df = pd.DataFrame({"X": x, "Y": y})
df1 = pd.DataFrame({"X": x, "Y": y1})

plt.figure(figsize=(8, 5))
sns.lineplot(x="X", y="Y", data=df, color='orange')
sns.lineplot(x="X", y="Y", data=df1, color='limegreen')

for i in range(len(x)) :
    if i%5 == 0 :
        plt.scatter(x[i], y[i], color='darkorange', s=15)
        plt.text(x[i]+0.02, y[i]+0.01, f'{y[i]:.1f}', color='darkorange', ha='left', va='bottom', fontsize=8, fontweight='bold')
        plt.scatter(x[i], y1[i], color='forestgreen', s=15)
        plt.text(x[i]+0.05, y1[i]+0.01, f'{y1[i]:.1f}', color='forestgreen', ha='left', va='bottom', fontsize=8, fontweight='bold')
        
plt.xlabel('')
plt.ylabel('')

여기까지 line plot 활용을 정리해보았습니다!

단순히 그래프를 그리는 것 이상으로 활용하는 부분이 정말 생각보다 많았네요ㅠㅠ

그래프가 잘 숙련되시면 아래 같은 그래프를 그리실 수 있습니다.

 

코드에 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

728x90
반응형
728x90
반응형
728x90

안녕하세요! 오늘은 기존에 작성한 cart pole 문제DDQN(Double Deep Q-Network) 으로 진행하는 방법에 대해 포스팅 하겠습니다.

  • DDQN (Double Deep Q-Network) 이란?

강화 학습의 한 방법으로, DQN과 비슷하지만 학습할 때 Neural Network(인공 신경망)를 두 개 사용하는 학습 방법입니다.

DQN에서 Q-Network가 두 개 사용되었다고 보시면 될 것 같아요!

 

DQN에 대해 궁금하신 분들은 cart pole를 DQN으로 진행하는 아래 포스팅을 참고 해주시면 됩니다.

https://yhj9855.com/entry/RL-gymnasium-cart-pole-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-DQN

 

[RL] gymnasium cart pole 강화 학습 - DQN

안녕하세요! 오늘은 기존에 작성한 cart pole 문제를 DQN(Deep Q-Network) 으로 진행하는 방법에 대해 포스팅 하겠습니다.DQN (Deep Q-Network) 이란?강화 학습의 한 방법으로, Q-learning에서 Q-table 대신 Neural Netw

yhj9855.com

 

cart pole에 관한 전체적인 설명은 아래 포스팅에서 진행하고 있으니, 먼저 확인해주세요!!

https://yhj9855.com/entry/RL-gymnasium-cart-pole-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-1

 

[RL] gymnasium cart pole 강화 학습 - 1

안녕하세요! 오늘은 gymnasium에서 제공하는 cart pole 문제 설명에 대한 포스팅 진행하겠습니다. 저는 cart pole 문제를 DQN과 DDQN 두 가지 방법으로 풀어보았는데요!각각 전체 코드는 포스팅 가장 아래

yhj9855.com


그럼 본격적으로 DDQN으로 cart pole 문제를 풀어보도록 하겠습니다.

Neural Network 구현

우선 먼저 DDQN에서 Neural Network을 먼저 구현합니다.

Neural Network을 구현하는 코드는 아래와 같습니다.

class DDQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DDQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, x):
    	x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

레이어 층을 몇 개 사용할 것인지, 활성 함수는 어떤 것을 사용할 것인지는 모두 하이퍼 파라미터 입니다!

저는 3개의 층을 사용했고, 모두 Relu 함수를 사용했습니다.

보통 어떤 활성 함수를 사용하면 좋을지 모를 때, Relu 함수를 많이 사용하는데요, 그래도 다른 활성 함수도 사용해보시는 걸 추천 드려요!!

저는 tanh 함수를 사용했지만, 학습이 너무 진행되지 않아서 포기했었습니다ㅜㅜ

Replay Memory 구현

다음은 Replay Memory를 구현합니다.

  • Replay Memory란?

DDQN처럼 강화 학습과 딥러닝이 혼합된 알고리즘을 사용할 때, 더 안정적이고 효율적인 학습을 하기 위한 방법입니다.

환경에서 만들어진 에피소드를 저장해두었다가, 랜덤으로 샘플링해서 학습하는 것이 핵심 아이디어 입니다.

 

매 순간순간 에피소드를 학습하게 되면, 비슷한 경험으로 학습이 되지 않고, 편향적으로 학습될 가능성이 있습니다.

Replay Memory 방법을 활용하면 해당 단점을 완화할 수 있어, 자주 사용되는 방법입니다.

 

Replay Memory를 코드로 구현하는 방법은 아래와 같습니다.

class ReplayMemory:
    def __init__(self, max_len):
        self.memory = deque(maxlen=max_len)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

행동 선택하기

다음은 Q-learning과 비슷하게 Q값을 바탕으로 행동을 선택하는 것을 구현합니다.

저는 여기서 행동을 선택하는 방법을 두 가지 소개 드리려고 합니다!

 

1.  ϵ-greedy로 행동 선택하기

  • ϵ-greedy란?

탐험과 이용의 균형을 맞추기 위한 행동 선택 방법으로, 아래 공식을 따릅니다.

여기서 ϵ은 0과 1 사이의 값으로 ϵ 확률 만큼은 랜덤하게 행동을 하게 하여 탐험을 진행하도록 하고, (1-ϵ) 확률 만큼 Q 값이 가장 높은 행동을 선택하도록 합니다.

해당 ϵ을 초반에 높게 설정하고 점차 ϵ을 줄임으로써, 초반에는 랜덤 행동을 통한 탐험을 하게 하고 점차 Q 값을 이용하도록 행동을 선택할 수 있습니다.

 

ϵ-greedy를 활용하여 행동을 선택하는 코드는 아래와 같습니다.

def select_action(state, target_net, action_dim):
    if random.random() < epsilon:
        return random.randint(0, action_dim - 1)
    else:
        return target_net(state).argmax().item()

여기서 행동 값을 target_net을 바탕으로 진행을 하고 있는데요.

학습이 진행 중인 policy_net에서 행동을 선택할 경우, 학습이 불안정할 가능성이 높아 target_net에서 진행하는 것이 더 좋습니다.

 

2. 확률로 행동 선택하기

해당 방법은 Q값을 확률로 변경한 다음, 확률대로 행동을 선택하게 하는 것을 의미합니다.

이 방법을 사용할 경우, 확률적으로 행동을 선택하기 때문에 어느 정도 학습이 진행되어도 탐험을 보장한다는 것과 ϵ을 따로 세팅해주지 않아도 되는 것이 장점이라고 볼 수 있습니다!

 

확률로 행동을 선택하는 코드는 아래와 같습니다.

def select_action(state, target_net, action_dim):
    q_value = target_net(state)
    # Q 값을 확률 값으로 바꾸는 과정
    p = F.softmax(q_value, dim=0).tolist()
    # 부동소수점 오차로 인해 합이 1이 안되는 문제 해결
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

DDQN 공식을 사용해서 업데이트

이제 DDQN 공식을 사용해서 네트워크를 업데이트 하는 것을 구현해보도록 하겠습니다.

코드로 들어가지 전에 먼저 DDQN 공식을 먼저 살펴보겠습니다.

기본적으로는 DQN과 비슷한 공식입니다!

DQN과 Q가 두 번 사용된 것을 보실 수 있습니다.

 

DQN과 마찬가지로 DDQN은 딥러닝을 사용하기 때문에 자체적으로 옵티마이저와 learning rate가 들어가게 됩니다.

 두 개가 DDQN의 α로 작용을 하기 때문에 실제로 사용하는 공식에는 α가 사라져 아래와 같은 공식이 됩니다!

θ와 θ ̄ 는 α 대신 네트워크가 작동하는 부분이라고 생각하시면 됩니다.

 

이제 해당 공식을 바탕으로 DDQN을 진행하는 코드는 아래와 같습니다.

def optimize_model(memory, policy_net, target_net, optimizer):
	# batch_size만큼 데이터가 메모리에 쌓였을 때만 학습 진행
    if len(memory) < batch_size:
        return

    # transitions = (state, action, reward, next_state, done)
    transitions = memory.sample(batch_size)
    # state, action, reward, next_state, done을 각각 묶어서 list의 형태로 만드는 작업
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    # DDQN
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_action = policy_net(next_state_batch).argmax(1).unsqueeze(1)
    next_q_values = target_net(next_state_batch).gather(1, next_action).squeeze().detach()
    # (1-done_batch)을 통해 에피소드가 끝났는지 아닌지를 판단
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

policy_net에서 가져온 next_action을 바탕으로 target_net에서 next_q_values 값을 구하고 있습니다.

이처럼 policy_net, target_net 두 가지에서 가져온 값으로 계산을 진행하기 때문에 Double Deep Q-Network가 된 것입니다.

모델 학습

이제 본격적으로 학습을 진행해보도록 하겠습니다.

한 가지 중요한 점은 Cart Pole 환경이 500점을 달성해도, 에피소드 완료라고 판단하지 않기 때문에 저희가 직접 판단해줘야 합니다.

 

학습을 진행하는 코드는 아래와 같습니다.

# 초기 세팅
policy_net = DDQN(state_dim, action_dim)
target_net = DDQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon-greedy 방법으로 행동을 선택할 때 필요
# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0

save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

# 모델 학습
for episode in range(episodes):
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 1000 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/1000}")
    if episode % 1000 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)
        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)
        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
         model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
         torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    # epsilon-greedy로 action을 선택할 때는 있어야 함
    # if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

리워드 시각화 및 모델 테스트

마지막으로 저희가 학습한 모델이 잘 학습되었는지 확인하기 위해, 리워드를 시각화하고 모델을 테스트해보겠습니다.

먼저 리워드 시각화 하는 코드는 아래와 같습니다

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN on CartPole')
plt.show()

저는 위와 같은 결과 값이 나왔는데요, 한결 같은 값을 가지는 것은 아니지만 전체적으로 점점 리워드가 상승하는 것을 볼 수 있습니다.

이처럼 여러분의 리워드도 전체적으로 상승하는 형상을 보이고 있다면, 학습이 잘 된 것으로 보실 수 있습니다.

다음은 모델 테스트를 진행해보겠습니다.

저는 모든 모델을 테스트 한 것은 아니도, 500점 이상을 달성한 모델만 따로 저장하여 모델 테스트를 진행해보았습니다.

리워드 그래프에서도 보셨듯, 전체적으로 리워드가 상승하는 것이지 모든 모델이 좋은 모델이라고는 볼 수 없기 때문에 최대치의 리워드를 달성한 모델로 테스트를 진행하였어요!

 

모델 테스트를 진행하는 코드는 아래와 같습니다.

# 테스트 시, render 활성화 필요
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

특정 모델은 500점 리워드를 달성하지 못하는 경우도 있었지만, 대부분은 500점 이상을 달성하는 것을 볼 수 있었습니다.

모델마다 다른 방식으로 500점을 달성하는데, 한 번 구경해보시는 것도 좋을 것 같아요!

전체 코드

import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import os
import glob
import torch.nn.functional as F

# 하이퍼 파라미터
gamma = 0.99
learning_rate = 0.0005
batch_size = 100
memory_size = 5000
episodes = 5000
# ϵ-greedy 사용 시, 필요
# epsilon_start = 1.0
# epsilon_end = 0.001
# epsilon_decay = 0.995

class DDQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 32)
        self.fc2 = nn.Linear(32, 32)
        self.fc3 = nn.Linear(32, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

def select_action(state, target_net, action_dim):
	# ϵ-greedy
	# if random.random() < epsilon:
    #     return random.randint(0, action_dim - 1)
    # else:
    #     return target_net(state).argmax().item()
    
    q_value = target_net(state)
    p = F.softmax(q_value, dim=0).tolist()
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

def optimize_model(memory, policy_net, target_net, optimizer):
	if len(memory) < batch_size:
        return

    transitions = memory.sample(batch_size)
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_action = policy_net(next_state_batch).argmax(1).unsqueeze(1)
    next_q_values = target_net(next_state_batch).gather(1, next_action).squeeze().detach()
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = DDQN(state_dim, action_dim)
target_net = DDQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0
save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

for episode in range(episodes):
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 100 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/100}")
    if episode % 100 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)

        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)

        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
        model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
        torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    
    # ϵ-greedy 사용 시, 필요
  	# if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN on CartPole')
plt.show()

# 테스트 진행
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

 

사실 cart pole 문제는 간단한 편이라서, DQN이랑 DDQN이랑 성능이 거의 차이가 나지 않았습니다!

다만 그래도 Q-Network를 두 개를 사용한다는 개념이 헷갈릴 수 있기 때문에 한 번은 구현해보시는 걸 추천드려요:)

코드에 대해 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

728x90
반응형
728x90
반응형

 

728x90

안녕하세요! 오늘은 기존에 작성한 cart pole 문제DQN(Deep Q-Network) 으로 진행하는 방법에 대해 포스팅 하겠습니다.

  • DQN (Deep Q-Network) 이란?

강화 학습의 한 방법으로, Q-learning에서 Q-table 대신 Neural Network(인공 신경망)을 사용해서 학습하는 방법입니다.

state가 많아질수록 Q-table을 저장하고 업데이트 하는 것이 어렵기 때문에, 신경망이 대신 예측하도록 하는 것입니다.

Q-learning+DNN(Deep Neural Network)라고 보시면 좋을 것 같아요!

 

Q-learning에 대해 궁금하신 분들은 frozen lake를 Q-learning으로 진행하는 아래 포스팅을 참고 해주시면 됩니다.

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-2

 

[RL] gymnasium frozen lake 강화 학습 - 2

안녕하세요! 오늘은 기존에 작성한 frozen lake 문제를 Q-learning으로 진행하는 방법에 대해 포스팅 하겠습니다. Q-learning이란?강화 학습의 한 방법으로, Q라는 테이블을 이용하는 것입니다.Q 테이블

yhj9855.com

 

cart pole에 관한 전체적인 설명은 아래 포스팅에서 진행하고 있으니, 먼저 확인해주세요!!

https://yhj9855.com/entry/RL-gymnasium-cart-pole-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-1

 

[RL] gymnasium cart pole 강화 학습 - 1

안녕하세요! 오늘은 gymnasium에서 제공하는 cart pole 문제 설명에 대한 포스팅 진행하겠습니다. 저는 cart pole 문제를 DQN과 DDQN 두 가지 방법으로 풀어보았는데요!각각 전체 코드는 포스팅 가장 아래

yhj9855.com


그럼 본격적으로 DQN으로 cart pole 문제를 풀어보도록 하겠습니다.

Neural Network 구현

우선 먼저 DQN에서 Neural Network을 먼저 구현합니다.

Neural Network을 구현하는 코드는 아래와 같습니다.

class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, x):
    	x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

레이어 층을 몇 개 사용할 것인지, 활성 함수는 어떤 것을 사용할 것인지는 모두 하이퍼 파라미터 입니다!

저는 3개의 층을 사용했고, 모두 Relu 함수를 사용했습니다.

보통 어떤 활성 함수를 사용하면 좋을지 모를 때, Relu 함수를 많이 사용하는데요, 그래도 다른 활성 함수도 사용해보시는 걸 추천 드려요!!

저는 tanh 함수를 사용했지만, 학습이 너무 진행되지 않아서 포기했었습니다ㅜㅜ

Replay Memory 구현

다음은 Replay Memory를 구현합니다.

  • Replay Memory란?

DQN처럼 강화 학습과 딥러닝이 혼합된 알고리즘을 사용할 때, 더 안정적이고 효율적인 학습을 하기 위한 방법입니다.

환경에서 만들어진 에피소드를 저장해두었다가, 랜덤으로 샘플링해서 학습하는 것이 핵심 아이디어 입니다.

 

매 순간순간 에피소드를 학습하게 되면, 비슷한 경험으로 학습이 되지 않고, 편향적으로 학습될 가능성이 있습니다.

Replay Memory 방법을 활용하면 해당 단점을 완화할 수 있어, 자주 사용되는 방법입니다.

 

Replay Memory를 코드로 구현하는 방법은 아래와 같습니다.

class ReplayMemory:
    def __init__(self, max_len):
        self.memory = deque(maxlen=max_len)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

 

행동 선택하기

다음은 Q-learning과 비슷하게 Q값을 바탕으로 행동을 선택하는 것을 구현합니다.

저는 여기서 행동을 선택하는 방법을 두 가지 소개 드리려고 합니다!

 

1.  ϵ-greedy로 행동 선택하기

  • ϵ-greedy란?

탐험과 이용의 균형을 맞추기 위한 행동 선택 방법으로, 아래 공식을 따릅니다.

여기서 ϵ은 0과 1 사이의 값으로 ϵ 확률 만큼은 랜덤하게 행동을 하게 하여 탐험을 진행하도록 하고, (1-ϵ) 확률 만큼 Q 값이 가장 높은 행동을 선택하도록 합니다.

해당 ϵ을 초반에 높게 설정하고 점차 ϵ을 줄임으로써, 초반에는 랜덤 행동을 통한 탐험을 하게 하고 점차 Q 값을 이용하도록 행동을 선택할 수 있습니다.

 

ϵ-greedy를 활용하여 행동을 선택하는 코드는 아래와 같습니다.

def select_action(state, target_net, action_dim):
    if random.random() < epsilon:
        return random.randint(0, action_dim - 1)
    else:
        return target_net(state).argmax().item()

여기서 행동 값을 target_net을 바탕으로 진행을 하고 있는데요.

학습이 진행 중인 policy_net에서 행동을 선택할 경우, 학습이 불안정할 가능성이 높아 target_net에서 진행하는 것이 더 좋습니다.

 

2. 확률로 행동 선택하기

해당 방법은 Q값을 확률로 변경한 다음, 확률대로 행동을 선택하게 하는 것을 의미합니다.

이 방법을 사용할 경우, 확률적으로 행동을 선택하기 때문에 어느 정도 학습이 진행되어도 탐험을 보장한다는 것과 ϵ을 따로 세팅해주지 않아도 되는 것이 장점이라고 볼 수 있습니다!

 

확률로 행동을 선택하는 코드는 아래와 같습니다.

def select_action(state, target_net, action_dim):
    q_value = target_net(state)
    # Q 값을 확률 값으로 바꾸는 과정
    p = F.softmax(q_value, dim=0).tolist()
    # 부동소수점 오차로 인해 합이 1이 안되는 문제 해결
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

DQN 공식을 사용해서 업데이트

이제 DQN 공식을 사용해서 네트워크를 업데이트 하는 것을 구현해보도록 하겠습니다.

코드로 들어가지 전에 먼저 DQN 공식을 먼저 살펴보겠습니다.

기본적으로는 Q-learning과 동일한 공식입니다!

하지만 DQN은 딥러닝을 사용하기 때문에 자체적으로 옵티마이저와 learning rate가 들어가게 됩니다.

두 개가 DQN의 α로 작용을 하기 때문에 실제로 사용하는 공식에는 α가 사라져 아래와 같은 공식이 됩니다!

θ ̄ 는 α 대신 네트워크가 작동하는 부분이라고 생각하시면 됩니다.

 

α를 사용하지 않기 때문에 일종의 TD 으로도 보실 수 있는데요.

TD 공식으로 변형해서 사용하셔도 문제 없이 학습하실 수 있습니다:)

 

이제 해당 공식을 바탕으로 DQN을 진행하는 코드는 아래와 같습니다.

def optimize_model(memory, policy_net, target_net, optimizer):
	# batch_size만큼 데이터가 메모리에 쌓였을 때만 학습 진행
    if len(memory) < batch_size:
        return

    # transitions = (state, action, reward, next_state, done)
    transitions = memory.sample(batch_size)
    # state, action, reward, next_state, done을 각각 묶어서 list의 형태로 만드는 작업
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    # DQN
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_q_values = target_net(next_state_batch).max(1)[0].detach()
    # (1-done_batch)을 통해 에피소드가 끝났는지 아닌지를 판단
    target_q_values = reward_batch + (gamma * next_q_values * (1-done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

q_values를 policy_net에서 가져오는 이유는 학습 중인 policy_net과 DQN 공식을 적용한 target_net가 비슷해지도록 학습이 되야 하기 때문입니다.

 

모델 학습

이제 본격적으로 학습을 진행해보도록 하겠습니다.

한 가지 중요한 점은 Cart Pole 환경이 500점을 달성해도, 에피소드 완료라고 판단하지 않기 때문에 저희가 직접 판단해줘야 합니다.

 

학습을 진행하는 코드는 아래와 같습니다.

# 초기 세팅
policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon-greedy 방법으로 행동을 선택할 때 필요
# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0

save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

# 모델 학습
for episode in range(episodes):
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 1000 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/1000}")
    if episode % 1000 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)
        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)
        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
         model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
         torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    # epsilon-greedy로 action을 선택할 때는 있어야 함
    # if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

리워드 시각화 및 모델 테스트

마지막으로 저희가 학습한 모델이 잘 학습되었는지 확인하기 위해, 리워드를 시각화하고 모델을 테스트해보겠습니다.

먼저 리워드 시각화 하는 코드는 아래와 같습니다.

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN on CartPole')
plt.show()

저는 위와 같은 결과 값이 나왔는데요, 한결 같은 값을 가지는 것은 아니지만 전체적으로 점점 리워드가 상승하는 것을 볼 수 있습니다.

이처럼 여러분의 리워드도 전체적으로 상승하는 형상을 보이고 있다면, 학습이 잘 된 것으로 보실 수 있습니다.

 

 

다음은 모델 테스트를 진행해보겠습니다.

저는 모든 모델을 테스트 한 것은 아니도, 500점 이상을 달성한 모델만 따로 저장하여 모델 테스트를 진행해보았습니다.

리워드 그래프에서도 보셨듯, 전체적으로 리워드가 상승하는 것이지 모든 모델이 좋은 모델이라고는 볼 수 없기 때문에 최대치의 리워드를 달성한 모델로 테스트를 진행하였어요!

 

모델 테스트를 진행하는 코드는 아래와 같습니다.

# 테스트 시, render 활성화 필요
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

특정 모델은 500점 리워드를 달성하지 못하는 경우도 있었지만, 대부분은 500점 이상을 달성하는 것을 볼 수 있었습니다.

모델마다 다른 방식으로 500점을 달성하는데, 한 번 구경해보시는 것도 좋을 것 같아요!

전체 코드

import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import os
import glob
import torch.nn.functional as F

# 하이퍼 파라미터
gamma = 0.99
learning_rate = 0.0005
batch_size = 100
memory_size = 5000
episodes = 5000
# ϵ-greedy 사용 시, 필요
# epsilon_start = 1.0
# epsilon_end = 0.001
# epsilon_decay = 0.995

class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 32)
        self.fc2 = nn.Linear(32, 32)
        self.fc3 = nn.Linear(32, action_dim)

    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque(maxlen=capacity)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

def select_action(state, target_net, action_dim):
	# ϵ-greedy
	# if random.random() < epsilon:
    #     return random.randint(0, action_dim - 1)
    # else:
    #     return target_net(state).argmax().item()
    
    q_value = target_net(state)
    p = F.softmax(q_value, dim=0).tolist()
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

def optimize_model(memory, policy_net, target_net, optimizer):
    if len(memory) < batch_size:
        return

    transitions = memory.sample(batch_size)
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_q_values = target_net(next_state_batch).max(1)[0].detach()
    # DQN
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0
save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

for episode in range(episodes):
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 100 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/100}")
    if episode % 100 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)

        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)

        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
        model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
        torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    
    # ϵ-greedy 사용 시, 필요
  	# if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN on CartPole')
plt.show()

# 테스트 진행
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

 

딥러닝+강화학습으로 진행되기 때문에 모델 학습이나 이런 부분이 진행하면서 많이 어려웠습니다.

저도 이거 풀면서 딥러닝에 대한 지식이 부족하다는 것을 깨닫고 요즘은 딥러닝을 공부하고 있는데, 쉽지 않네요ㅠㅠ

 

DQN이랑 비슷한 DDQN으로 cart pole 문제를 푸는 방법은 아래 링크를 참고해주세요.

https://yhj9855.com/entry/RL-gymnasium-cart-pole-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-DDQN

 

[RL] gymnasium cart pole 강화 학습 - DDQN

안녕하세요! 오늘은 기존에 작성한 cart pole 문제를 DDQN(Double Deep Q-Network) 으로 진행하는 방법에 대해 포스팅 하겠습니다.DDQN (Double Deep Q-Network) 이란?강화 학습의 한 방법으로, DQN과 비슷하지만 학

yhj9855.com

 

코드에 대해 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

 

728x90
반응형
728x90
반응형

728x90

안녕하세요! 오늘은 gymnasium에서 제공하는 cart pole 문제 설명에 대한 포스팅 진행하겠습니다.

 

저는 cart pole 문제를 DQNDDQN 두 가지 방법으로 풀어보았는데요!

각각 전체 코드는 포스팅 가장 아래에 있습니다.


  • cart pole 규칙

우선 gymnasium에서 제공하는 cart pole 문제는 아래 링크에서 자세하게 보실 수 있습니다.

https://gymnasium.farama.org/environments/classic_control/cart_pole/

 

Gymnasium Documentation

A standard API for reinforcement learning and a diverse set of reference environments (formerly Gym)

gymnasium.farama.org

 

위의 사진처럼 생긴 막대를 지속적으로 세우는 것이 cart pole의 가장 큰 목표 입니다.

cart pole의 자세한 규칙은 아래와 같습니다.

  1. 이동 방향좌, 우만 존재
  2. 현재 막대의 상태는 막대 위치, 속도, 각도, 각속도 총 4가지로 표현
  3. 하나의 step이 에피소드 종료되지 않으면 보상 1을 획득
  4. 각도가 좌우 12 º 이상 벗어나거나, 막대가 화면을 벗어나거나, 보상 500(혹은 200) 달성하면 에피소드 종료

저희는 위의 규칙을 잘 생각하여, 막대가 특정 각도를 넘어가거나 화면에 벗어나지 않게 500 에피소드 이상 버티는 것을 학습해주어야 합니다.

  • 필요한 라이브러리 설치

frozen lake는 gymnasium에서 제공하고 있기 때문에 가장 먼저 gymnasium을 설치해주시면 됩니다!

그리고 render 모드 활성화를 진행하기 위해 gymnasium[toy-text] 설치도 같이 진행해주세요.

두 패키지 모두 pip install로 설치해주시면 금방 설치할 수 있습니다.

import gymnasium as gym
from collections import defaultdict
import numpy as np
  • cart pole 환경 세팅

필요한 라이브러리를 설치한 후에는 cart pole 환경을 세팅해봅니다.

우선 환경 세팅을 하는 코드는 아래와 같습니다.

env = gym.make("CartPole-v1")

cart pole은 학습을 진행해주지 않으면 바로 에피소드가 종료가 되기 때문에 테스트를 진행해보기가 어렵습니다ㅠㅠ

어떤 식으로 진행되는지 궁금하신 분들은 전체 코드 복사 후 실행해보시면, cart pole이 어떻게 진행되는지 아실 수 있습니다!


여기까지가 cart pole 환경 세팅이었습니다.

cart pole 환경 세팅은 그렇게 어렵지 않아서, 금방 하실 수 있어요!

 

이번 포스팅에서는 DQN과 DDQN에 관한 전체 코드만 업로드하고, 자세한 포스팅은 추후에 진행하도록 하겠습니다.

두 가지 모두 500 에피소드에 달성한 모델이 다수 존재했습니다.

[DQN 전체 코드]

import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import os
import glob
import torch.nn.functional as F

# 하이퍼 파라미터
gamma = 0.99
learning_rate = 0.001
batch_size = 100
memory_size = 5000
# epsilon에 의해 행동을 선택할 때는 해당 부분 필요
# epsilon_start = 1.0
# epsilon_end = 0.001
# epsilon_decay = 0.995
episodes = 5000

class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, x):
    	x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class ReplayMemory:
    def __init__(self, max_len):
        self.memory = deque(maxlen=max_len)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

def select_action(state, target_net, action_dim):
	# epsilon-greedy를 바탕으로 행동을 선택하는 과정
    # if random.random() < epsilon:
    #     return random.randint(0, action_dim - 1)
    # else:
    #     return target_net(state).argmax().item()
    
    # 행동 값을 확률로 변경하여, 확률에 따라 행동을 선택하는 과정
    q_value = target_net(state)
    p = F.softmax(q_value, dim=0).tolist()
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

def optimize_model(memory, policy_net, target_net, optimizer):
    if len(memory) < batch_size:
        return

    transitions = memory.sample(batch_size)
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    # DQN
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_q_values = target_net(next_state_batch).max(1)[0].detach()
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon-greedy 방법으로 행동을 선택할 때 필요
# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0

save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

# 500을 10회 진행하면 성공이라고 판단하여, 종료를 하기 위한 장치로 잠깐 쓰인 것
# count = 0

for episode in range(episodes):
    # if count > 10 :
    #     break
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 1000 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/1000}")
    if episode % 1000 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)
        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)
        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
    #    count += 1
         model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
         torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    # epsilon-greedy로 action을 선택할 때는 있어야 함
    # if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DQN on CartPole')
plt.show()

# 테스트 진행 - render를 켜줘야 확인이 가능
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

[DDQN 전체 코드]

import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import random
import numpy as np
from collections import deque
import matplotlib.pyplot as plt
import os
import glob
import torch.nn.functional as F

# 하이퍼 파라미터
gamma = 0.99
learning_rate = 0.001
batch_size = 100
memory_size = 5000
# epsilon에 의해 행동을 선택할 때는 해당 부분 필요
# epsilon_start = 1.0
# epsilon_end = 0.001
# epsilon_decay = 0.995
episodes = 5000

class DQN(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(state_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, action_dim)

    def forward(self, x):
    	x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class ReplayMemory:
    def __init__(self, max_len):
        self.memory = deque(maxlen=max_len)

    def push(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

def select_action(state, target_net, action_dim):
	# epsilon-greedy를 바탕으로 행동을 선택하는 과정
    # if random.random() < epsilon:
    #     return random.randint(0, action_dim - 1)
    # else:
    #     return target_net(state).argmax().item()
    
    # 행동 값을 확률로 변경하여, 확률에 따라 행동을 선택하는 과정
    q_value = target_net(state)
    p = F.softmax(q_value, dim=0).tolist()
    p = np.array(p)
    p /= p.sum()
    action = np.random.choice(action_dim, p=p)
    return action

def optimize_model(memory, policy_net, target_net, optimizer):
    if len(memory) < batch_size:
        return

    transitions = memory.sample(batch_size)
    batch = list(zip(*transitions))
    
    state_batch = torch.stack(batch[0])
    action_batch = torch.tensor(batch[1]).unsqueeze(1)
    reward_batch = torch.tensor(batch[2])
    next_state_batch = torch.stack(batch[3])
    done_batch = torch.tensor(batch[4], dtype=torch.float32)
    
    # DDQN
    q_values = policy_net(state_batch).gather(1, action_batch)
    next_action = policy_net(next_state_batch).argmax(1).unsqueeze(1)
    next_q_values = target_net(next_state_batch).gather(1, next_action).squeeze().detach()
    target_q_values = reward_batch + (gamma * next_q_values * (1 - done_batch))
    loss = nn.MSELoss()(q_values.squeeze(), target_q_values)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()


env = gym.make("CartPole-v1")
state_dim = env.observation_space.shape[0]
action_dim = env.action_space.n

policy_net = DQN(state_dim, action_dim)
target_net = DQN(state_dim, action_dim)

target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(), lr=learning_rate)
memory = ReplayMemory(memory_size)

# epsilon-greedy 방법으로 행동을 선택할 때 필요
# epsilon = epsilon_start

episode_rewards = []
episode_reward = 0

save_dir = "dqn_saved_models"
os.makedirs(save_dir, exist_ok=True)

# 500을 10회 진행하면 성공이라고 판단하여, 종료를 하기 위한 장치로 잠깐 쓰인 것
# count = 0

for episode in range(episodes):
    # if count > 10 :
    #     break
    state = torch.tensor(env.reset()[0], dtype=torch.float32)
    if episode % 1000 == 0: 
        print(f"Episode {episode}, Avg Reward: {episode_reward/1000}")
    if episode % 1000 == 0 :
        episode_reward = 0
    total_reward = 0

    # 500 초과인 경우는 done으로 판단
    while total_reward < 501 :
        action = select_action(state, target_net, action_dim)
        next_state, reward, done, _, _ = env.step(action)
        next_state = torch.tensor(next_state, dtype=torch.float32)
        memory.push((state, action, reward, next_state, done))

        state = next_state
        total_reward += reward

        optimize_model(memory, policy_net, target_net, optimizer)
        
        if done :
            break
    # 500점 달성한 모델 저장
    if total_reward >= 500 :
    #    count += 1
         model_path = os.path.join(save_dir, f"dqn_model_episode_{episode}.pth")
         torch.save(policy_net.state_dict(), model_path)
    
    episode_reward += total_reward
    # epsilon-greedy로 action을 선택할 때는 있어야 함
    # if episode % 10 == 0 :
    #     epsilon = max(epsilon_end, epsilon*epsilon_decay)
    
    if episode % 20 == 0:
        target_net.load_state_dict(policy_net.state_dict())

    episode_rewards.append(total_reward)

plt.plot(episode_rewards)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DDQN on CartPole')
plt.show()

# 테스트 진행 - render를 켜줘야 확인이 가능
env = gym.make("CartPole-v1", render_mode='human')
# 500 달성한 모델 업로드
model_paths = glob.glob(os.path.join(save_dir, "*.pth"))

model_i = 0
for model_path in model_paths :
    policy_net.load_state_dict(torch.load(model_path))
    policy_net.eval()
    
    avg_reward = 0
    
    # 각 모델 별 10번 진행
    for episode in range(10) :    
        state = torch.tensor(env.reset()[0], dtype=torch.float32)
        total_reward = 0

        while total_reward < 501 :
            with torch.no_grad() :
                action = policy_net(state).argmax().item()

            next_state, reward, done, _, _ = env.step(action)
            next_state = torch.tensor(next_state, dtype=torch.float32)

            state = next_state
            total_reward += reward
            
            if done :
                break

        avg_reward += total_reward
    print(f"model {model_i + 1}, Avg Reward: {avg_reward/10}")
    model_i += 1

결과물

 

코드에 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

728x90
반응형
728x90
반응형

728x90

 안녕하세요! 오늘은 기존 작성한 frozen lake 문제SARSA로 진행하는 방법에 대해 포스팅 하겠습니다.

  • SARSA 란?

강화 학습의 한 방법으로, Q-learning과 비슷하지만 행동 선택이 다른 방식입니다.

Q-learning이 미래의 최적 행동을 가정하고 학습한다면, SARSA는 실제로 선택한 행동을 기반으로 학습합니다.

즉 (현재 State, 현재 Action, 현재 Reward, 다음 State, 최적의 행동)이 Q-learning이었다면,

(현재 State, 현재 Action, 현재 Reward, 다음 State, 다음 Action)이 SARSA가 되고, 각각의 앞글자를 따와 SARSA라고 이름을 붙이게 된 것입니다.

 

frozen lake를 Q-learning으로 진행하는 방법은 아래 링크에서 확인해주세요.

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-2

 

[RL] gymnasium frozen lake 강화 학습 - 2

안녕하세요! 오늘은 기존에 작성한 frozen lake 문제를 Q-learning으로 진행하는 방법에 대해 포스팅 하겠습니다. Q-learning이란?강화 학습의 한 방법으로, Q라는 테이블을 이용하는 것입니다.Q 테이블

yhj9855.com

 

frozen lake에 관한 전체적인 설명은 아래 포스팅에서 진행하고 있으니, 먼저 확인해주세요!

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-1

 

[RL] gymnasium frozen lake 강화 학습 - 1

안녕하세요! 오늘은 gymnasium에서 제공하는 frozen lake 문제에 대한 설명에 대해 포스팅 하겠습니다. 강화 학습이란?행동을 통해 얻는 보상을 기반으로 학습하는 AI의 한 분야입니다.흔히 Reinforcemen

yhj9855.com


그럼 본격적으로 SARSA로 frozen lake 문제를 풀어보도록 하겠습니다.

SARSA로 frozen lake를 풀 때, 크게 두 가지를 생각하시면 됩니다.

  1. Q 값을 바탕으로 행동을 선택
  2. 선택한 행동을 SARSA 공식을 사용해서 Q 값 업데이트

위의 두 가지를 하나씩 자세히 살펴보겠습니다.

Q 값을 바탕으로 행동을 선택

해당 부분은 현재까지 업데이트 된 Q 값을 바탕으로 행동을 선택하는 것입니다.

기본적으로는 Q 값이 가장 높은 행동을 선택하면 됩니다.

 

하지만 여기에는 한 가지 문제점이 있습니다.

Q 값이 제대로 업데이트가 될 때까지 충분한 탐험을 진행하지 못했을 경우, 제대로 된 행동을 추출할 수 없다는 것입니다.

예를 들어, 초반에는 Q 값이 모두 0이기 때문에 (0, 0)에서 왼쪽으로 가는 행동만 선택하기 때문에 게임을 진행할 수가 없습니다.

 

저희는 이 문제를 해결하기 위해, ϵ-greedy 방법을 사용할 수 있습니다.

  • ϵ-greedy란?

탐험과 이용의 균형을 맞추기 위한 행동 선택 방법으로, 아래 공식을 따릅니다.

여기서 ϵ은 0과 1 사이의 값으로 ϵ 확률 만큼은 랜덤하게 행동을 하게 하여 탐험을 진행하도록 하고, (1-ϵ) 확률 만큼 Q 값이 가장 높은 행동을 선택하도록 합니다.

해당 ϵ을 초반에 높게 설정하고 점차 ϵ을 줄임으로써, 초반에는 랜덤 행동을 통한 탐험을 하게 하고 점차 Q 값을 이용하도록 행동을 선택할 수 있습니다.

 

ϵ-greedy를 활용하여 행동을 선택하는 코드는 아래와 같습니다.

# 초기 값은 보통 1로 설정
epsilon = 1.0
train = True

# ϵ-greedy를 활용한 행동 선택
def select_action(state) :
	# 훈련을 할 경우에는 ϵ-greedy 방법을 사용
   	# 테스트를 진행할 때는 온전히 Q 값만 사용
   	# np.random.rand()를 넣어, 후반에도 종종 탐험을 할 수 있도록 함
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

선택한 행동을 SARSA 공식을 사용해서 Q 값 업데이트

해당 부분은 위에서 선택한 행동을 환경에서 실행해보고, 그 결과 값을 SARSA 공식에 맞게 Q 값을 업데이트 하는 것입니다.

코드로 들어가기 전에 먼저 Q 테이블을 업데이트하는 공식을 먼저 살펴보겠습니다.

해당 수식은 Q(s, a)를 업데이트 하는데, 특정 학습률 α에 있어 (1- α)만큼 현재의 Q 값 α만큼의 (보상값 r + 할인율  γ * 다음 state와 action의 Q값 Q(s', a'))를 반영한다는 의미입니다.

Q-learning을 배우신 분들은 아시겠지만, Q-learning에서의 maxQ값특정 행동 a'의 Q 값인 Q(s', a')로 바뀐 것을 알 수 있습니다.

  • 학습률 α

값이 높을수록 다음 행동 값 즉, 새로운 정보를 더 많이 반영한다는 것이고, 낮을수록 현재의 Q 값 즉, 기존의 경험을 더 많이 유지한다는 의미입니다.

  • 할인율 γ

미래 보상의 중요도를 나타내는 지표로, 보통은 미래의 보상에 너무 의존하지 않도록 1보다 약간 작은 수로 지정하는 것이 보통입니다.

  • Q(s', a')

실제로 선택한 다음 행동 a'에 대한 Q 값으로, 위의 행동 선택을 기반하여 다음 state에서 실제 action을 고른 값입니다.

이렇게 실제 행동을 기반으로 Q 값을 업데이트 하기 때문에, 안정적이지만 그만큼 느릴 수 있습니다.

하지만, frozen lake는 공간이 작은 문제라서 Q-learning과 크게 결과 차이는 없으실 거예요:)

 

해당 공식을 바탕으로 SARSA를 진행하는 코드는 아래와 같습니다.

# 학습을 진행할 때는 render 모드 비활성화
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
# 환경의 행동 범위 : 여기서는 상, 하, 좌, 우 총 4개
action_size = env.action_space.n

# defaultdict은 키가 없을 때 자동으로 기본값을 생성하기 때문에 강화 학습에서 많이 사용
Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
# 총 학습을 진행할 에피소드 수
max_episode = 10000

def learn() :
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        # 에피소드를 처음 시작할 때 reset
        state, _ = env.reset()
        done = False
        all_reward = 0
        # 에피소드가 종료될 때까지 반복
        while not done :
        	# Q 테이블을 바탕으로 action을 고르는 함수
            action = select_action(state)
            # state, reward, done 외 사용하지 않기 때문에 _ 처리
            new_state, reward, done, _, _ = env.step(action)
            next_action = select_action(new_state)
            # SARSA
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*Q[new_state][next_action])
            all_reward += reward
            state = new_state
        # 50번째 에피소드 마다 ϵ 값을 줄여줌
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

 

위의 두 가지 과정을 합치면 SARSA로 frozen lake를 풀 수 있는 코드가 완성됩니다!

학습 후 테스트를 진행하고 싶으신 경우에는 render를 킨 환경을 다시 세팅해서 해주시면 됩니다.

전체 코드

행동 선택, 학습, 테스트 과정을 모두 포함한 전체 코드는 아래와 같습니다.

import gymnasium as gym
from collections import defaultdict
import numpy as np

# 미끄러짐 옵션 True/False 선택 가능
is_slippery = True
# 8x8 중에 선택 가능
map_size = '4x4'
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
action_size = env.action_space.n

Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
epsilon = 1.0
train = True
max_episode = 100000


def select_action(state) :
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

def learn() :
    global epsilon
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        state, _ = env.reset()
        done = False
        all_reward = 0
        while not done :
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            next_action = select_action(new_state)
            # SARSA
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*Q[new_state][next_action])
            all_reward += reward
            state = new_state
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

# 학습한 Q를 바탕으로 frozen lake 테스트
def testing_after_learning():
	# render를 켜야 제대로 학습이 되었는지 확인할 수 있음
    env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery, render_mode='human')
    total_test_episode = 10
    rewards = []
    for episode in range(total_test_episode):
        state, _ = env.reset()
        episode_reward = 0
        while True: 
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            episode_reward += reward
            if done:
                rewards.append(episode_reward)
                break
            state = new_state
    print("")
    print("avg: " + str(sum(rewards) / total_test_episode))

if __name__ == "__main__" :
    learn()
    testing_after_learning()

 

테스트를 진행하면서 is_slippery 옵션을 껐을 경우에는 1.0 보상을 받으면 성공이고, is_slippery 옵션을 켰을 경우에는 70% 이상 1.0 보상을 받으면 성공이라고 보실 수 있습니다.

 

추가로  is_slippery 옵션을 켰을 경우에는 학습을 많이 진행해야 어느 정도 수렴하시는 걸 보실 수 있습니다!

아무래도 model-free로 진행을 하니까 많이 느리더라구요ㅠㅠ

  • model-based

model-free가 아닌 어느 정도 model-based로 빠르게 학습을 하고 싶으신 경우 아래 상황을 고려할 수 있습니다.

  1. 행동 한 번을 진행할 때마다 reward에 - 진행
    → RL이 최단 경로로 진행하려는 경향을 학습할 수 있음
  2. 구멍에 빠졌을 경우, reward에 크게 - 진행
    → 구멍에 빠지지 않는 쪽으로 빠르게 학습할 수 있음
  3. 도착했을 경우, reward를 크게 추가
    → 도착 지점에 확실히 도착하기 위해 큰 reward를 지급

그 외에도 벽에 부딪히거나 하는 등 맵을 알고 있기 때문에 환경에 맞게 reward를 추가로 주거나 마이너스를 진행하여, model-based 모델을 만들 수도 있습니다.

 

그래도 강화 학습을 제대로 알기 위해서는 model-free로 진행해보는 것을 추천드립니다!

코드에 대해 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

728x90
반응형
728x90
반응형
728x90

 

안녕하세요! 오늘은 기존에 작성한 frozen lake 문제를 Q-learning으로 진행하는 방법에 대해 포스팅 하겠습니다.

 

  • Q-learning이란?

강화 학습의 한 방법으로, Q라는 테이블을 이용하는 것입니다.

Q 테이블 내 값은 특정 상황에서 어떤 행동을 했을 때의 보상 값의 큰 정도를 나타내는 것으로, 학습이 진행되면서 해당 값들을 업데이트 하여 최적의 행동을 찾는 것입니다.

 

frozen lake에 관한 전체적인 설명은 아래 포스팅에서 진행하고 있으니, 먼저 확인해주세요!

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-1

 

[RL] gymnasium frozen lake 강화 학습 - 1

안녕하세요! 오늘은 gymnasium에서 제공하는 frozen lake 문제에 대한 설명에 대해 포스팅 하겠습니다. 강화 학습이란?행동을 통해 얻는 보상을 기반으로 학습하는 AI의 한 분야입니다.흔히 Reinforcemen

yhj9855.com


그럼 본격적으로 Q-learing으로 frozen lake 문제를 풀어보도록 하겠습니다.

Q-learning으로 frozen lake를 풀 때, 크게 두 가지를 생각하시면 됩니다.

  1. Q 값을 바탕으로 행동을 선택
  2. 선택한 행동을 Q-learning 공식을 사용해서 Q 값 업데이트

위의 두 가지를 하나씩 자세히 살펴보겠습니다.

Q 값을 바탕으로 행동을 선택

해당 부분은 현재까지 업데이트 된 Q 값을 바탕으로 행동을 선택하는 것입니다.

기본적으로는 Q 값이 가장 높은 행동을 선택하면 됩니다.

 

하지만 여기에는 한 가지 문제점이 있습니다.

Q 값이 제대로 업데이트가 될 때까지 충분한 탐험을 진행하지 못했을 경우, 제대로 된 행동을 추출할 수 없다는 것입니다.

예를 들어, 초반에는 Q 값이 모두 0이기 때문에 (0, 0)에서 왼쪽으로 가는 행동만 선택하기 때문에 게임을 진행할 수가 없습니다.

 

저희는 이 문제를 해결하기 위해, ϵ-greedy 방법을 사용할 수 있습니다.

  • ϵ-greedy란?

탐험과 이용의 균형을 맞추기 위한 행동 선택 방법으로, 아래 공식을 따릅니다.

여기서 ϵ은 0과 1 사이의 값으로 ϵ 확률 만큼은 랜덤하게 행동을 하게 하여 탐험을 진행하도록 하고, (1-ϵ) 확률 만큼 Q 값이 가장 높은 행동을 선택하도록 합니다.

해당 ϵ을 초반에 높게 설정하고 점차 ϵ을 줄임으로써, 초반에는 랜덤 행동을 통한 탐험을 하게 하고 점차 Q 값을 이용하도록 행동을 선택할 수 있습니다.

 

ϵ-greedy를 활용하여 행동을 선택하는 코드는 아래와 같습니다.

# 초기 값은 보통 1로 설정
epsilon = 1.0
train = True

# ϵ-greedy를 활용한 행동 선택
def select_action(state) :
	# 훈련을 할 경우에는 ϵ-greedy 방법을 사용
   	# 테스트를 진행할 때는 온전히 Q 값만 사용
   	# np.random.rand()를 넣어, 후반에도 종종 탐험을 할 수 있도록 함
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

 

선택한 행동을 Q-learning 공식을 사용해서 Q 값 업데이트

해당 부분은 위에서 선택한 행동을 환경에서 실행해보고, 그 결과 값을 Q-learning 공식에 맞게 Q 값을 업데이트 하는 것입니다.

코드로 들어가기 전에 먼저 Q 테이블을 업데이트하는 공식을 먼저 살펴보겠습니다.

해당 수식은 Q(s, a)를 업데이트 하는데, 특정 학습률 α에 있어 (1- α)만큼 현재의 Q 값α만큼의 (보상값 r + 할인율  γ * 다음 state의 가장 높은 Q값 maxQ(s', a'))를 반영한다는 의미입니다.

  • 학습률 α

값이 높을수록 다음 행동 값 즉, 새로운 정보를 더 많이 반영한다는 것이고, 낮을수록 현재의 Q 값 즉, 기존의 경험을 더 많이 유지한다는 의미입니다.

  • 할인율 γ

미래 보상의 중요도를 나타내는 지표로, 보통은 미래의 보상에 너무 의존하지 않도록 1보다 약간 작은 수로 지정하는 것이 보통입니다.

  • maxQ(s', a')

다음 상태인 s'에서 가능한 모든 행동 중 가장 높은 Q 값을 의미하며, s'은 현재 state에서 위에서 고른 행동을 실행한 결과 값이라고 보시면 됩니다.

 

이제 해당 공식을 바탕으로 Q-learning을 하는 코드는 아래와 같습니다.

# 학습을 진행할 때는 render 모드 비활성화
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
# 환경의 행동 범위 : 여기서는 상, 하, 좌, 우 총 4개
action_size = env.action_space.n

# defaultdict은 키가 없을 때 자동으로 기본값을 생성하기 때문에 강화 학습에서 많이 사용
Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
# 총 학습을 진행할 에피소드 수
max_episode = 10000

def learn() :
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        # 에피소드를 처음 시작할 때 reset
        state, _ = env.reset()
        done = False
        all_reward = 0
        # 에피소드가 종료될 때까지 반복
        while not done :
        	# Q 테이블을 바탕으로 action을 고르는 함수
            action = select_action(state)
            # state, reward, done 외 사용하지 않기 때문에 _ 처리
            new_state, reward, done, _, _ = env.step(action)
            # Q-learning 공식
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*np.max(Q[new_state]))
            all_reward += reward
            state = new_state
        # 50번째 에피소드 마다 ϵ 값을 줄여줌
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

위의 두 가지 과정을 합치면 Q-learing으로 frozen lake를 풀 수 있는 코드가 완성됩니다!

학습 후 테스트를 진행하고 싶으신 경우에는 render를 킨 환경을 다시 세팅해서 해주시면 됩니다.

 

전체 코드

행동 선택, 학습, 테스트 과정을 모두 포함한 전체 코드는 아래와 같습니다.

import gymnasium as gym
from collections import defaultdict
import numpy as np

# 미끄러짐 옵션 True/False 선택 가능
is_slippery = True
# 8x8 중에 선택 가능
map_size = '4x4'
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
action_size = env.action_space.n

Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
epsilon = 1.0
train = True
max_episode = 100000


def select_action(state) :
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

def learn() :
    global epsilon
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        state, _ = env.reset()
        done = False
        all_reward = 0
        while not done :
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*np.max(Q[new_state]))
            all_reward += reward
            state = new_state
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

# 학습한 Q를 바탕으로 frozen lake 테스트
def testing_after_learning():
	# render를 켜야 제대로 학습이 되었는지 확인할 수 있음
    env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery, render_mode='human')
    total_test_episode = 10
    rewards = []
    for episode in range(total_test_episode):
        state, _ = env.reset()
        episode_reward = 0
        while True: 
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            episode_reward += reward
            if done:
                rewards.append(episode_reward)
                break
            state = new_state
    print("")
    print("avg: " + str(sum(rewards) / total_test_episode))

if __name__ == "__main__" :
    learn()
    testing_after_learning()

 

 

테스트를 진행하면서 is_slippery 옵션을 껐을 경우에는 1.0 보상을 받으면 성공이고, is_slippery 옵션을 켰을 경우에는 70% 이상 1.0 보상을 받으면 성공이라고 보실 수 있습니다.

 

추가로  is_slippery 옵션을 켰을 경우에는 학습을 많이 진행해야 어느 정도 수렴하시는 걸 보실 수 있습니다!

아무래도 model-free로 진행을 하니까 많이 느리더라구요ㅠㅠ

  • model-based

model-free가 아닌 어느 정도 model-based로 빠르게 학습을 하고 싶으신 경우 아래 상황을 고려할 수 있습니다.

  1. 행동 한 번을 진행할 때마다 reward에 - 진행
    → RL이 최단 경로로 진행하려는 경향을 학습할 수 있음
  2. 구멍에 빠졌을 경우, reward에 크게 - 진행
    → 구멍에 빠지지 않는 쪽으로 빠르게 학습할 수 있음
  3. 도착했을 경우, reward를 크게 추가
    → 도착 지점에 확실히 도착하기 위해 큰 reward를 지급

그 외에도 벽에 부딪히거나 하는 등 맵을 알고 있기 때문에 환경에 맞게 reward를 추가로 주거나 마이너스를 진행하여, model-based 모델을 만들 수도 있습니다.

 

그래도 강화 학습을 제대로 알기 위해서는 model-free로 진행해보는 것을 추천드립니다!

 

Q-learning이랑 비슷한 SARSA로 frozen lake 문제를 푸는 방법은 아래 링크를 참고해주세요.

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-SARSA

 

[RL] gymnasium frozen lake 강화 학습 - SARSA

안녕하세요! 오늘은 기존 작성한 frozen lake 문제를 SARSA로 진행하는 방법에 대해 포스팅 하겠습니다.SARSA 란?강화 학습의 한 방법으로, Q-learning과 비슷하지만 행동 선택이 다른 방식입니다.Q-learning

yhj9855.com

 

코드에 대해 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

728x90
반응형
728x90
반응형
728x90

 

안녕하세요! 오늘은 gymnasium에서 제공하는 frozen lake 문제에 대한 설명에 대해 포스팅 하겠습니다.

 

  • 강화 학습이란?

행동을 통해 얻는 보상을 기반으로 학습하는 AI의 한 분야입니다.

흔히 Reinforcement Learning 줄여서, RL이라고 부르는 강화 학습은 컴퓨터가 스스로 경험을 쌓으면서 어떤 행동이 가장 좋은 보상을 얻는지 배우는 과정이라고 볼 수 있습니다.

 

저는 frozen lake 문제를 Q-learningSARSA 두 가지 방법으로 풀어보았는데요!

각각 전체 코드는 포스팅 가장 아래에 있습니다.


  • frozen lake 규칙

우선 gymnasium에서 제공하는 frozen lake 문제는 아래 링크에서 자세하게 보실 수 있습니다.

https://gymnasium.farama.org/environments/toy_text/frozen_lake/

 

Gymnasium Documentation

A standard API for reinforcement learning and a diverse set of reference environments (formerly Gym)

gymnasium.farama.org

 

frozen lake의 기본 맵은 아래 사진처럼 생겼습니다.

  1. frozen lake의 기본 맵은 4x4 혹은 8x8로 존재
  2. 맵 내 구멍이 존재
    → 구멍의 수나 위치는 조절할 수 없음
  3. 선물에 있는 곳에 도착하거나, 구멍에 빠지면 에피소드 종료
    물에 있는 곳에 도착해야만 보상 1을 획득할 수 있음
  4. 움직임은 상, 하, 좌, 우 존재
  5. 얼음 내 미끄러짐을 키고 끌 수 있음
    → 미끄러짐을 킬 경우, 각 얼음마다 특정 확률로 다른 곳으로 움직이는 확률이 존재
    → 각 얼음마다 존재하는 미끄러질 확률을 조절할 수 없음

저희는 위의 규칙을 잘 생각하여, 구멍에 빠지지 않고 선물에 도착하는 경로를 학습해주어야 합니다.

  • 필요한 라이브러리 설치

frozen lake는 gymnasium에서 제공하고 있기 때문에 가장 먼저 gymnasium을 설치해주시면 됩니다!

그리고 render 모드 활성화를 진행하기 위해 gymnasium[toy-text] 설치도 같이 진행해주세요.

두 패키지 모두 pip install로 설치해주시면 금방 설치할 수 있습니다.

import gymnasium as gym
from collections import defaultdict
import numpy as np
  • frozen lake 환경 세팅

필요한 라이브러리를 설치한 후에는 frozen lake 환경을 세팅해봅니다.

우선 환경 세팅을 하는 코드는 아래와 같습니다.

# 미끄러지는 얼음을 만들지 결정하는 변수
is_slippery = False
# 맵 사이즈
map_size = '4x4'
# frozen lake 환경 설정
# render 활성화 환경
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery, render_mode='human')
# 환경을 초기화하는 함수
env.reset()

env.reset()의 경우 기본 기능은 환경을 초기화하는 함수지만, 환경을 세팅한 다음에 해당 코드를 부르지 않으면 오류가 발생하니 반드시 환경 사용 전에 env.reset()을 먼저 불러주세요!

  • frozen lake 테스트 진행하기

이제 만든 환경을 바탕으로 gymnasium에서 제공하는 frozen lake 테스트를 진행해봅시다!

테스트를 진행하는 코드는 아래와 같습니다.

while True:
    # 0 : 왼쪽, 1 : 아래, 2 : 오른쪽, 3 : 위
    action = input("이동할 방향: ")
    if action not in ['0','1','2','3']:
        continue
    action = int(action)
    # step() : 환경에 액션을 넣고 실행하는 함수
    state, reward, done, info, prob = env.step(action)
    print("위치: ", state, "행동: ", action, "보상: ", reward)
    print()
    if done:
        print("에피소드 종료", reward)
        break

해당 코드를 실행해보면, 4x4 맵이 있는 창에 입력한 행동대로 움직이는 것을 확인할 수 있습니다.

구멍에 빠지거나 선물에 도착하기 전에는 계속 움직일 수 있으니, 환경을 이해하기 위해 편하게 테스트 진행해보세요!

맵을 키우거나 미끄러지는 얼음을 키고 테스트 해보시는 것도 추천드립니다.

 


여기까지가 frozen lake 환경 세팅이었습니다.

이번 포스팅에서는 Q-learning과 SARSA에 관한 model-free 전체 코드만 업로드 되어 있습니다.

두 가지 모두 is_slippery가 False일 경우에는 1.0 만점을 받았고, is_silppery가 True일 경우에는 0.8 이상의 보상을 획득했습니다.

 

Q-learning에 관한 자세한 내용이 궁금하신 분들은 아래 링크에서 확인해주세요!

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-2

 

[RL] gymnasium frozen lake 강화 학습 - 2

안녕하세요! 오늘은 기존에 작성한 frozen lake 문제를 Q-learning으로 진행하는 방법에 대해 포스팅 하겠습니다. Q-learning이란?강화 학습의 한 방법으로, Q라는 테이블을 이용하는 것입니다.Q 테이블

yhj9855.com

 

SARSA에 관한 자세한 내용이 궁금하진 분들은 아래 링크에서 확인해주세요!

https://yhj9855.com/entry/RL-gymnasium-frozen-lake-%EA%B0%95%ED%99%94-%ED%95%99%EC%8A%B5-SARSA

 

[Q-learning 전체 코드]

import gymnasium as gym
from collections import defaultdict
import numpy as np

is_slippery = True
map_size = '8x8'
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
action_size = env.action_space.n

Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
epsilon = 1.0
train = True
max_episode = 100000


def select_action(state) :
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

def learn() :
    global epsilon
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        # 에피소드를 처음 시작할 때는 reset을 해줘야 함
        state, _ = env.reset()
        done = False
        all_reward = 0
        # 에피소드가 종료될 때까지 반복
        while not done :
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            # Q 러닝
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*np.max(Q[new_state]))
            all_reward += reward
            state = new_state
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

def testing_after_learning():
    env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery, render_mode='human')
    total_test_episode = 10
    rewards = []
    for episode in range(total_test_episode):
        state, _ = env.reset()
        episode_reward = 0
        while True: 
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            episode_reward += reward
            if done:
                rewards.append(episode_reward)
                break
            state = new_state
    print("")
    print("avg: " + str(sum(rewards) / total_test_episode))

if __name__ == "__main__" :
    learn()
    testing_after_learning()

결과물

is_slippery = False / True

[SARSA 전체 코드]

import gymnasium as gym
from collections import defaultdict
import numpy as np

is_slippery = True
map_size = '4x4'
env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery)
env.reset()
action_size = env.action_space.n

Q = defaultdict(lambda: np.zeros(action_size))

alpha = 0.1
gamma = 0.99
epsilon = 1.0
train = True
max_episode = 100000


def select_action(state) :
    if np.random.rand() < epsilon and train :
        action = np.random.choice([0, 1, 2, 3])
    else :
        action = np.argmax(Q[state])
    return action

def learn() :
    global epsilon
    reward_list = []
    for i in range(1, max_episode+1) :
        # 100번째 마다 학습이 진행되고 있음을 출력
        if i % 100 == 0 :
            # 해당 에피소드까지 진행된 모든 보상의 평균을 구함
            avg_reward = sum(reward_list)/100
            print("\rEpisode {}/{} || average reward {}".format(i, max_episode, avg_reward), end="")
            reward_list = []
        # 에피소드를 처음 시작할 때는 reset을 해줘야 함
        state, _ = env.reset()
        done = False
        all_reward = 0
        # 에피소드가 종료될 때까지 반복
        while not done :
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            next_action = select_action(new_state)
            # SARSA
            Q[state][action] = (1-alpha)*Q[state][action] + alpha*(reward + gamma*Q[new_state][next_action])
            all_reward += reward
            state = new_state
        if i % 50 == 0 :
            epsilon *= 0.99
        reward_list.append(all_reward)

def testing_after_learning():
    env = gym.make('FrozenLake-v1', desc=None, map_name=map_size, is_slippery=is_slippery, render_mode='human')
    total_test_episode = 10
    rewards = []
    for episode in range(total_test_episode):
        state, _ = env.reset()
        episode_reward = 0
        while True: 
            action = select_action(state)
            new_state, reward, done, _, _ = env.step(action)
            episode_reward += reward
            if done:
                rewards.append(episode_reward)
                break
            state = new_state
    print("")
    print("avg: " + str(sum(rewards) / total_test_episode))

if __name__ == "__main__" :
    learn()
    testing_after_learning()

결과물

is_slippery = False / True

 

코드에 궁금한 부분이 있으신 분들은 댓글로 남겨주시면, 답변 드리도록 하겠습니다.

★읽어주셔서 감사합니다★

 

728x90
반응형
728x90
반응형

 

728x90

안녕하세요! 오늘은 데이터 분석에서 정말 중요한 그래프 색상을 정리하는 포스팅을 진행하도록 하겠습니다.

 

데이터 분석에서 시각화는 정말 중요한데요.

똑같이 데이터 분석을 진행했다고 해도, 얼만큼 잘 표현할 수 있는지에 따라 굉장히 다른 평가를 받을 수 있습니다.

저도 실제로 회사에서 그래프 색상 사용 덕분에 보고서의 평가가 훨씬 좋았던 경험이 많았기 때문에 파이썬에서 사용할 수 있는 색상을 정리해보고자 합니다.

 

  • 단일 색상

단일 색상을 이미 지정된 색상을 단일 알파벳으로 지정하는 것, 색상의 이름을 직접 지정하는 것으로 구분할 수 있습니다.

  • 단일 색상 사용 예시

단일 색상인 lightskybluelightcoral을 사용한 예시 입니다.

이 두 색은 제가 자주 사용하는 색 조합인데요!

두 개의 비교군을 보여줄 때 사용하는데, 눈이 아프지 않아 좋습니다.

plt.figure(figsize=(12, 8))  # 그래프 크기 설정
lineplot = sns.lineplot(x='a', y='b', hue='c', data=mini_data, palette=['lightskyblue', 'lightcoral'])
line_colors = {line.get_label(): line.get_color() for line in lineplot.lines}
line_color = {'A': 'deepskyblue', 'B': 'red'}

for name, group in mini_data.groupby('c') :
    for x, y in zip(group['a'], group['b']):
        if x%5 == 0 :
            plt.text(x, y+1.1, f'{y:.1f}%', color=line_color[name], ha='right', va='bottom', fontsize=12, fontweight='bold')
            plt.scatter(x, y, color=line_colors[name], s=70)
        else :
            plt.text(x, y+1.1, f'{y:.1f}%', color=line_color[name], ha='right', va='bottom', fontsize=8, fontweight='bold')
            plt.scatter(x, y, color=line_colors[name], s=30)

  • 컬러맵 (color map)

컬러맵은 색상의 분포를 의미합니다.

산점도나 하트맵 같이 단일 색상으로 정하기 어려운 그래프의 경우 컬러맵을 사용하는 경우가 많습니다.

 

  • 컬러맵 사용 예시

컬러 맵인 Reds coolwarm을 사용한 예시 입니다.

개인적으로 히트맵은 Reds와 같이 비슷한 색상의 컬러 맵을 많이 사용하고, 워드클라우드 같이 데이터가 뚜렷하게 구분되어야 할 경우에는 다른 색상이 많은 컬러 맵을 사용하는 것 같아요!

plt.figure(figsize=(10, 10))
sns.heatmap(df, annot=True, cmap='Reds', fmt='.2f', linewidths = 0.1,annot_kws={'size': 12}, cbar=False)

Naver = np.array(Image.open("./A.png"))
plt.figure(figsize=(30,30))
wc = WordCloud(        relative_scaling=0.2,mask = Naver,
                       font_path="/Windows/Fonts/Cafe24Dangdanghae-v2.0.otf",
                       background_color="white",
                       min_font_size=1,
                       max_font_size=50,
                       max_words=100,
                       colormap = 'coolwarm'
                     ).generate_from_frequencies(wordcloud_data)
plt.imshow(wc)
plt.axis('off')
plt.show()

그 외에도 컬러 맵을 리스트로 지정한 후 하나씩 단일 색상으로 사용할 수도 있습니다!

저는 개인적으로 Qualitative colormaps 같은 경우, 제가 눈으로 색상을 하나씩 확인할 수 있어서 해당 컬러맵은 단일 색상처럼 사용하는 경우도 자주 있는 것 같습니다.

# Pastel1 컬러맵 불러오기
cmap = plt.get_cmap('Pastel1')
# 여섯 번째 색상 선택
color = cmap(5)

plt.figure(figsize=(8, 5))
plt.bar(categories, values, color=color, edgecolor='black')
plt.tight_layout()
plt.show()

 

여기까지 파이썬에서 사용할 수 있는 색상을 정리해보았습니다!

해당 색상을 잘 활용하여 보고서의 퀄리티를 높이는 시각화를 하시면 좋겠습니다:)

★읽어주셔서 감사합니다★

 

728x90
반응형
728x90
반응형
728x90

 

안녕하세요. 오늘은 기존에 작성한 네이버 뉴스 클로링 코드에서 첫 번째 코드의 자세한 크롤링 과정을 포스팅 하겠습니다.

새롭게 변경된 첫 번째 코드의 자세한 크롤링 과정입니다!

 

네이버 뉴스 크롤링 전체 코드를 확인하고 싶으신 분들은 아래 링크를 확인해주세요!

https://yhj9855.com/entry/Crawling-%EB%84%A4%EC%9D%B4%EB%B2%84-%EB%89%B4%EC%8A%A4-%ED%81%AC%EB%A1%A4%EB%A7%81-%EC%BD%94%EB%93%9C-%EB%B3%80%EA%B2%BD

 

[Crawling] 네이버 뉴스 크롤링 코드 변경

안녕하세요! 네이버 뉴스가 24년 1월 25일부터 페이지가 보여주는 방식이 변경되면서, 이전 포스팅에서 진행했던 첫 번째 코드를 사용할 수 없게 되었습니다. 그래서 변경된 페이지에서 적용되는

yhj9855.com

 

이전에 사용되었던 BeautifulSoup으로 크롤링 했던 코드의 자세한 과정이 궁금하신 분들은 아래 링크를 확인해주세요!

https://yhj9855.com/entry/Crawling-%EB%84%A4%EC%9D%B4%EB%B2%84-%EB%89%B4%EC%8A%A4-%ED%81%AC%EB%A1%A4%EB%A7%81-2

 

[Crawling] 네이버 뉴스 크롤링 - 2

안녕하세요. 오늘은 기존에 작성한 네이버 뉴스 크롤링 코드에서 첫 번째 코드의 자세한 크롤링 과정을 포스팅 하겠습니다. 네이버 뉴스 크롤링 전체 코드를 확인하고 싶으신 분들은 아래 링크

yhj9855.com


첫 번째 코드는 Python으로 특정 날짜의 네이버 게임/리뷰 카테고리의 기사들을 크롤링하는데, 제목과 링크만 가져오는 코드였습니다.

 

  • 기본 링크 설정하기

크롤링을 시작할 때 크롤링하고자 하는 페이지의 링크를 설정하는 것이 가장 중요합니다.

위의 그림처럼 2025년 01월 07일의 게임/리뷰 카테고리의 뉴스를 크롤링하고자 한다면, 원하는 정보가 담긴 링크는 아래와 같습니다.

 

https://news.naver.com/breakingnews/section/105/229?date=20250107

 

해당 링크에서 가장 중요한 것은 date=20250107 부분인데요!

네이버 뉴스의 경우, 날짜를 링크 자체에서 구분할 수 있게 되어 있습니다.

그렇기 때문에 특정 날짜의 기사들을 크롤링하고 싶다면 date 뒤에 연도+월+일을 붙이면 된다는 사실을 알 수 있습니다.

해당 부분을 코드로 변경하면 아래와 같습니다.

# 수정하고자 하는 메인 링크
link = 'https://news.naver.com/breakingnews/section/105/229?date='
# 스크랩 하고 싶은 날짜를 년도월일 나열해준다.
# 날짜를 쉽게 바꾸기 위해 date를 따로 선언해준다.
date = '20250107'
# 메인 링크는 링크에 날짜가 붙은 구조이기 때문에 이렇게 작성해준다.
main_link = link + date
# 기사의 수, 제목, 링크를 받아올 예정이기 때문에 정보를 담아줄 데이터 프레임을 생성한다.
Main_link = pd.DataFrame({'number' : [], 'title' : [], 'link' : []})

 

[크롤링 하고자 하는 URL 찾기 Tip]

더보기
더보기

링크를 확인할 때 오늘 날짜, 첫 페이지와 같이 기본적으로 접속하면 바로 볼 수 있는 URL로 확인하는 건 피하시는게 좋습니다.

기본으로 설정된 URL은 해당 URL만 특별하게 설정되어 있거나, 보이는 링크와 실제 링크가 다른 경우도 종종 있기 때문에 오류가 발생하기 쉽습니다.

아래 그림처럼 '오늘' 날짜의 뉴스 기사에는 date 표시가 되어있지 않기 때문에 초기 링크를 설정할 때 어려움이 있을 수 있습니다.

  • 기사 더보기 클릭 및 URL 확인하기

 

원하는 날짜의 모든 기사를 크롤링하기 위해서는 아래 그림처럼 기사 더보기 버튼을 클릭해서 기사를 끝까지 업로드 해야 합니다.

 

이 때, 중요한 것은 기사를 업로드 해도 URL이 변경되지 않는다는 것입니다!

기사 더보기를 웹 페이지 안의 내용이 달라졌음에도 URL이 달라지지 않기 때문에 이전에 진행했던 정적 크롤링으로는 진행할 수가 없는 것입니다.

 

이렇게 URL이 변경되지 않고, 데이터가 변경되는 것동적 페이지라고 부릅니다.

동적 페이지는 오직 1가지 방법으로만 크롤링이 가능한데요!

바로 Selenium을 활용한 동적 크롤링 입니다.

 

Selenium을 활용한 동적 크롤링을 진행하기 위해서는 크롬 드라이버 설치가 필수적입니다.

크롬 드라이버 설치가 되어 있지 않거나, 자신의 크롬 버전과 크롬 드라이버 버전이 맞지 않으시는 분들은 아래 링크에 설명된 방법을 통해서 크롬 드라이버를 먼저 설치해주셔야 합니다!

https://yhj9855.com/entry/%ED%81%AC%EB%A1%AC%EB%93%9C%EB%9D%BC%EC%9D%B4%EB%B2%84-%EC%84%A4%EC%B9%98%ED%95%98%EA%B8%B0-%EC%B5%9C%EC%8B%A0%EB%B2%84%EC%A0%84%EA%B3%BC-%EB%8B%A4%EB%A5%B8-%ED%81%AC%EB%A1%AC%EB%93%9C%EB%9D%BC%EC%9D%B4%EB%B2%84-%EC%84%A4%EC%B9%98

 

크롬드라이버 설치하기 (최신버전과 다른 크롬드라이버 설치)

안녕하세요!최근에 크롬 드라이버를 설치하는 페이지가 변경되어 새롭게 포스팅을 하겠습니다! 이전 포스팅이 궁금하신 분들은 아래 포스팅을 참고해주시면 됩니다!https://yhj9855.com/entry/%ED%81%AC

yhj9855.com

 

  • 코드로 기사 더보기 버튼 클릭하기

이제 어떤 방향으로 크롤링을 해야하는지 알았기 때문에 기사를 모두 업로드해서 크롤링하는 방법만 남았습니다.

다음으로 고려해야 할 문제점은 저희가 코드로 기사 더보기 버튼을 클릭해야 하는 것과 기사 더보기 버튼의 끝을 알아야 한다는 것입니다.

아래 그림처럼 기사를 기사 더보기 버튼을 지속적으로 클릭하다보면, 더 이상 해당 버튼이 등장하지 않는 것을 보실 수 있습니다.

해당 부분을 보고 저희가 알 수 있는 것은 기사 더보기 버튼을 계속 클릭하다보면, 더 이상 버튼이 없어서 오류가 발생하는 지점이 생긴다는 것입니다!

 

이제 개발자 도구 창을 열어, 기사 더보기 버튼이 어떤 HTML 구조를 가지고 있는지 확인하는 작업이 필요합니다.

개발자 도구에 대한 자세한 설명은 아래 링크를 확인해주세요!

https://yhj9855.com/entry/%ED%81%AC%EB%A1%AC-%EA%B0%9C%EB%B0%9C%EC%9E%90-%EB%8F%84%EA%B5%AC-%EC%82%AC%EC%9A%A9%ED%95%98%EA%B8%B0

 

크롬 개발자 도구 사용하기

안녕하세요. 오늘은 크롤링을 진행하기 위해 반드시 필요한 개발자 도구를 사용하는 법에 대해서 포스팅 하겠습니다. 크롬 개발자 도구란? 크롬 브라우저에 직접 내장된 웹 개발 도구로, 웹 페

yhj9855.com

 

저희는 기사 더보기 버튼의 HTML 구조를 알고 싶은 것이기 때문에 아래 그림처럼 따라합니다.

①번의 아이콘을 클릭한 후, ②번(=원하는 정보)을 클릭하면 원하는 정보의 HTML 구조로 바로 이동이 됩니다.

원하는 정보의 HTML 구조를 확인해보니, 아래와 같이 나왔습니다.

 

<a href="#" class="section_more_inner _CONTENT_LIST_LOAD_MORE_BUTTON" data-persistable="false">기사 더보기</a>

 

[HTML 구조의 뜻]

더보기
더보기
  1. 'a' 태그 : 링크가 담겨져 있는 공간이면 해당 태그를 사용합니다. 현재 버튼을 클릭하면 새로운 기사가 등장하기 때문에 해당 태그를 사용했습니다.
  2. href: 링크의 주소를 가지고 있는 부분입니다. 여기서는 #을 사용하여, 페이지를 변동하지 않겠다는 옵션을 지정해주었습니다.
  3. class: 태그의 속성을 나타내는 부분입니다. 크롤링을 진행할 때 많이 사용되는 부분으로 ID가 없을 경우에는 이름처럼 사용되기도 합니다.
  4. data-* : HTML5에서 지원하는 사용자 정의 데이터 속성입니다. 여기서의 옵션은 특정 동작이나 상태를 저장하지 않도록 지시하는 것을 의미합니다.

 

기사 더보기 버튼의 HTMl 구조를 알았으니, 오류가 발생할 때까지 버튼을 클릭하는 코드를 작성합니다.

service = Service('chromedriver.exe')
driver = webdriver.Chrome(service=service)
driver.get(main_link)
# 웹 페이지 로딩을 기다리는 코드로, 초는 더 짧아도 된다.
time.sleep(3)

# 기사 더보기 버튼
more_button = driver.find_element(By.CLASS_NAME, 'section_more_inner._CONTENT_LIST_LOAD_MORE_BUTTON')

# 기사 더보기가 몇 개가 있을지 모르기 때문에 오류가 날 때까지 누르는 것으로 한다.
# 여기서 발생하는 오류란 버튼을 찾을 수 없다 즉, 버튼이 없을 때 발생하는 오류이다.
while True :
    try :
        more_button.click()
        time.sleep(3)
    except :
        break

위의 코드에서 보면, time.sleep 코드를 사용하는 것을 보실 수 있습니다.

동적 크롤링을 작성하실 때 생각보다 많이 놓치실 수 있는 부분인데요!

동적 크롤링은 코드로 크롬을 띄우고, 직접 웹 페이지에 들어가서 해당 페이지에 있는 내용을 크롤링하는 작업입니다.

그렇기 때문에 웹 페이지 로딩이 되지 않은 상태에서 접근을 시도하면 오류가 발생하는데, 이 때 발생하는 오류가 특정 HTML을 찾을 수 없다는 오류가 발생합니다.

 

이로 인해 초보자분들께서는 웹 페이지가 로딩이 되지 못했다고 생각을 못하고, 코드 내 오류가 있다고만 생각을 하여 한참 시간을 소모하는 경우를 많이 봤습니다!

이를 방지하기 위해서 time.sleep 명령어를 웹 페이지가 변경될 때마다 꼭 넣어주셔야 합니다.

몇 초를 기다리는지는 웹 페이지 안의 영상, 이미지 여부나 인터넷 속도에 따라서 다르지만 보통 1~5초 사이 라고 보시면 됩니다.

 

  • 기사의 제목과 링크 가져오기

마지막으로 크롤링 목적인 기사의 제목과 링크를 가져오기만 하면 첫 번째 코드의 크롤링이 완료됩니다!

버튼을 확인했던 것처럼 개발자 도구에서 기사의 제목과 링크가 어떤 HTML 구조를 가지고 있는지 확인해보겠습니다.

HTML 구조를 확인해보니 아래와 같이 나왔습니다.

<a href="https://n.news.naver.com/mnews/article/119/0002911455" class="sa_text_title _NLOG_IMPRESSION" data-clk="alist" data-rank="34" data-gdid="8812A0A8_000000000000000002911455" data-imp-url="https://n.news.naver.com/mnews/article/119/0002911455">
<strong class="sa_text_strong">LCK컵, 15일 개최…우승 팀은 국제 대회 출전권 획득</strong>
</a>

 

[HTML 구조의 뜻]

더보기
더보기
  1. 'strong' 태그 : HTML에서 텍스트를 굵게 보이기 위한 태그입니다.

 

이전 기사 더보기 버튼의 HTML 구조와 매우 유사한 것을 알 수 있습니다.

저희가 수집해야 할 정보는 아래와 같습니다.

  1. 기사의 제목 : a태그 속 strong태그 텍스트
  2. 기사의 링크 : a태그 속 HTML 구조의 href

해당 부분을 코드로 변경하면 아래와 같습니다.

# 기사의 제목과 링크가 모두 담긴 a태그를 모두 찾는다.
articles = driver.find_elements(By.CLASS_NAME, 'sa_text_title._NLOG_IMPRESSION')
# a태그 내 기사의 제목과 링크를 따로 저장한다.
for i in range(len(articles)) :
	# 기사의 제목
    # strip을 사용하여 눈으로 확인할 수 없는 양 끝의 공백을 제거한다.
    title = articles[i].text.strip()
    # href 부분을 가져온느 방법
    # a태그 내 href를 가져온다.
    link = articles[i].get_attribute('href')
    # 번호는 0부터 시작하기 때문에 1을 더해준다.
    li = [i+1, title, link]
    Main_link.loc[i] = li
  • 액셀 파일로 저장

이제 크롤링 작업은 완료되었고, 크롤링한 데이터프레임을 엑셀 파일로 저장하도록 하겠습니다.

# 엑셀을 잘 관리하기 위해서 크롤링 날짜를 파일 이름에 포함한다.
excel_name = 'news_' + date + '.xlsx'
with pd.ExcelWriter(excel_name) as writer :
    Main_link.to_excel(writer, sheet_name='링크', index=False)

이런 과정을 통해서 특정 날짜의 모든 기사의 제목과 링크를 크롤링하는 코드가 완성되었습니다!!

 

[전체 코드]

import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
import time
from selenium.webdriver.common.by import By
from openpyxl import *

link = 'https://news.naver.com/breakingnews/section/105/229?date='
date = '20250107'

main_link = link + date 
Main_link = pd.DataFrame({'number' : [], 'title' : [], 'link' : []})

service = Service('chromedriver.exe')
driver = webdriver.Chrome(service=service)
driver.get(main_link)
time.sleep(3)

more_button = driver.find_element(By.CLASS_NAME, 'section_more_inner._CONTENT_LIST_LOAD_MORE_BUTTON')

while True :
    try :
        more_button.click()
        time.sleep(3)
    except :
        break

articles = driver.find_elements(By.CLASS_NAME, 'sa_text_title._NLOG_IMPRESSION')
for i in range(len(articles)) :
    title = articles[i].text.strip()
    link = articles[i].get_attribute('href')
    li = [i+1, title, link]
    Main_link.loc[i] = li


excel_name = 'news_' + date + '.xlsx'
with pd.ExcelWriter(excel_name) as writer :
    Main_link.to_excel(writer, sheet_name='링크', index=False)

 

이전 코드와 마찬가지로 Xpath, CSS_Selector를 사용하지 않았는데, 해당 부분을 사용하면 쉽게 크롤링을 할 수 있지만 HTML 코드가 복잡하거나 크롤링을 배우고 싶은 분들에게는 좋은 방법이 아니라고 생각합니다.

 

기사의 본문을 크롤링하는 두 번째 코드의 자세한 크롤링 코딩 과정을 확인하고 싶으신 분들은 아래 링크를 확인해주세요!!
https://yhj9855.com/entry/Crawling-%EB%84%A4%EC%9D%B4%EB%B2%84-%EB%89%B4%EC%8A%A4-%ED%81%AC%EB%A1%A4%EB%A7%81-3

 

[Crawling] 네이버 뉴스 크롤링 - 3

안녕하세요. 오늘은 기존에 작성한 네이버 뉴스 크롤링 코드에서 두 번째 코드의 자세한 크롤링 과정을 포스팅 하겠습니다. 네이버 뉴스 크롤링 전체 코드를 확인하고 싶으신 분들은 아래 링크

yhj9855.com

 

코드에 대해 궁금한 점이 있으신 분들은 댓글로 남겨주시면, 답변 드리겠습니다.

★ 읽어주셔서 감사합니다★

 

728x90
반응형

+ Recent posts