REINFORCE Algorithm Demo

This code demonstrates the implementation of the REINFORCE (Monte Carlo Policy Gradient) algorithm using PyTorch. REINFORCE is a foundational policy gradient method that directly optimizes the policy by following the gradient of expected returns.

Key concepts illustrated:

  • Policy network with softmax output for action selection
  • Monte Carlo return calculation
  • Policy gradient update using log-probability and returns
  • Epsilon-greedy exploration strategy
REINFORCE Implementation
Download
# Partially Adapted from https://towardsdatascience.com/learning-reinforcement-learning-reinforce-with-pytorch-5e8ad7fc7da0

# %%
import numpy as np
import gymnasium as gym
import torch
from torch import nn
import torch.nn.functional as F
import random
from collections import deque
import copy

from tqdm.std import tqdm

device = 'cuda' if torch.cuda.is_available() else 'cpu'
# %%
env = gym.make('CartPole-v1')
n_state = int(np.prod(env.observation_space.shape))
n_action = env.action_space.n
print("# of state", n_state)
print("# of action", n_action)

# %%


def run_episode(env, policy, render=False):
    obs_list = []
    act_list = []
    reward_list = []
    next_obs_list = []
    done_list = []
    obs = env.reset()[0]
    while True:
        if render:
            env.render()

        action = policy(obs)
        next_obs, reward, terminated, truncated, _ = env.step(action)
        done = terminated or truncated
        reward_list.append(reward), obs_list.append(obs), \
            done_list.append(done), act_list.append(action), \
            next_obs_list.append(next_obs)
        if done:
            break
        obs = next_obs

    return obs_list, act_list, reward_list, next_obs_list, done_list
# %%


class Policy():
    def __init__(self, n_state, n_action):
        # Define network
        self.act_net = nn.Sequential(
            nn.Linear(n_state, 16),
            nn.ReLU(),
            nn.Linear(16, n_action),
            nn.Softmax()
        )
        self.act_net.to(device)
        self.gamma = 0.99
        self.optimizer = torch.optim.Adam(self.act_net.parameters(), lr=1e-3)

    def __call__(self, state):
        with torch.no_grad():
            state = torch.FloatTensor(state).to(device)
            action_probs = self.act_net(state).detach().cpu().numpy()
            action = np.random.choice(n_action, p=action_probs)
        return action

    def update(self, data):
        obs, act, reward, next_obs, done = data
        # Calculate culmulative return
        returns = np.zeros_like(reward)
        s = 0
        for i in reversed(range(len(returns))):
            s = s * self.gamma + reward[i]
            returns[i] = s

        obs = torch.FloatTensor(obs).to(device)
        returns = torch.FloatTensor(returns).to(device)
        # Actions are used as indices, must be
        # LongTensor
        act = torch.LongTensor(act).to(device)
        # Calculate loss
        batch_size = 32
        list = [j for j in range(len(obs))]
        for i in range(0, len(list), batch_size):
            index = list[i:i+batch_size]
            logprob = torch.log(self.act_net(obs[index, :]))
            gt_logprob = returns[index] * \
                torch.gather(logprob, 1,
                             act[index, None]).squeeze()
            loss = -gt_logprob.mean()

            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()

        return loss.item()


# %%
losses_list, reward_list = [], []
policy = Policy(n_state, n_action)
loss = 0
n_step = 0
for i in tqdm(range(2000)):
    data = run_episode(env, policy)
    for _ in range(5):
        loss = policy.update(data)
    rew = sum(data[2])
    if i > 0 and i % 50 == 0:
        print("itr:({:>5d}) loss:{:>3.4f} reward:{:>3.1f}".format(
            i, np.mean(losses_list[-50:]), np.mean(reward_list[-50:])))

    losses_list.append(loss), reward_list.append(rew)

# %%
scores = [sum(run_episode(env, policy, False)[2]) for _ in range(100)]
print("Final score:", np.mean(scores))

import pandas as pd
df = pd.DataFrame({'loss': losses_list, 'reward': reward_list})
df.to_csv("./ClassMaterials/Lecture_19_Actor_Critic/data/REINFORCE.csv",
          index=False, header=True)