Deep Q-Network Demo

This is a demo code for Deep Q-Network (DQN) with replay buffer on the CartPole environment.

DQN Replay Demo
Download
# %% [markdown]
# In this lab, we will implement Q learning with deep neural nets.

# %%
import numpy as np
import gymnasium as gym
import torch
from torch import nn
import torch.nn.functional as F
import random
from collections import deque
import copy

from tqdm.std import tqdm


env = gym.make('CartPole-v1')
n_state = int(np.prod(env.observation_space.shape))
n_action = env.action_space.n
print("# of state", n_state)
print("# of action", n_action)

# SEED = 1234
# torch.manual_seed(SEED)
# np.random.seed(SEED)
# random.seed(SEED)
# env.seed(SEED)
# %% [markdown]
# Given certain policy, how can we compute the value function for each state.

# %%

device = 'cuda' if torch.cuda.is_available() else 'cpu'


def run_episode(env, policy, render=False):
    """ Runs an episode and return the total reward """
    obs = env.reset()[0]
    states = []
    rewards = []
    actions = []
    while True:
        if render:
            env.render()

        states.append(obs)
        action = int(policy(obs))
        actions.append(action)
        obs, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        rewards.append(reward)
        if done:
            break

    return states, actions, rewards


# %%
class Policy():
    def __init__(self, n_state, n_action, eps):
        self.q_net = nn.Sequential(
            nn.Linear(n_state, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, n_action)
        )
        self.eps = eps
        self.gamma = 0.95

        self.optimizer = torch.optim.Adam(self.q_net.parameters(), lr=1e-3)
        self.q_net.to(device)
        self.replaybuff = ReplayBuffer(50000)

    def update(self, data=None):
        obs, act, reward, next_obs, done = self.replaybuff.sample(32)

        self.optimizer.zero_grad()
        with torch.no_grad():
            y = reward + self.gamma * (1 - done) * \
                torch.max(self.q_net(next_obs), axis=1)[0]

        loss = F.mse_loss(y, self.q_net(obs)[range(len(obs)), act])
        loss.backward()
        self.optimizer.step()
        return loss.item()

    def __call__(self, state):
        if np.random.rand() < self.eps:
            return np.random.choice(n_action)

        if not torch.is_tensor(state):
            state = torch.FloatTensor(state).to(device)
        with torch.no_grad():
            Q = self.q_net(state).cpu().numpy()
            act = np.argmax(Q)

        return act

# %%


class ReplayBuffer:
    def __init__(self, size):
        self.buff = deque(maxlen=size)

    def add(self, obs, act, reward, next_obs, done):
        self.buff.append([obs, act, reward, next_obs, done])

    def sample(self, sample_size):
        if (len(self.buff) < sample_size):
            sample_size = len(self.buff)

        sample = random.sample(self.buff, sample_size)
        obs = torch.FloatTensor([exp[0] for exp in sample]).to(device)
        act = torch.LongTensor([exp[1] for exp in sample]).to(device)
        reward = torch.FloatTensor([exp[2] for exp in sample]).to(device)
        next_obs = torch.FloatTensor([exp[3] for exp in sample]).to(device)
        done = torch.FloatTensor([exp[4] for exp in sample]).to(device)
        return obs, act, reward, next_obs, done

    def __len__(self):
        return len(self.buff)


# %%
losses_list, reward_list = [], []
policy = Policy(n_state, n_action, 0.5)
update_index = 0
loss = 0
for i in tqdm(range(10000)):
    obs, rew = env.reset()[0], 0
    while True:
        act = policy(obs)
        next_obs, reward, terminated, truncated, _ = env.step(act)
        done = terminated or truncated
        rew += reward

        update_index += 1
        if len(policy.replaybuff) > 2e3 and update_index > 4:
            update_index = 0
            loss = policy.update()

        policy.replaybuff.add(obs, act, reward, next_obs, done)
        obs = next_obs
        if done:
            break
    if i > 0 and i % 500 == 0:
        print("itr:({:>5d}) loss:{:>3.4f} reward:{:>3.1f}".format(
            i, np.mean(losses_list[-500:]), np.mean(reward_list[-500:])))
    policy.eps = max(0.005, policy.eps - 1.0/5000)

    losses_list.append(loss), reward_list.append(rew)

# %%
policy.eps = 0.0
scores = [sum(run_episode(env, policy, False)[2]) for _ in range(100)]
print("Final score:", np.mean(scores))

import pandas as pd
df = pd.DataFrame({'loss': losses_list, 'reward': reward_list})
df.to_csv("./ClassMaterials/Lecture_14_DQN/data/dqn-replay.csv",
          index=False, header=True)