Lecture 09: Monte Carlo Control
Monte Carlo Control Demo
This is a demo code for Monte Carlo Control using Epsilon-Greedy policy on the FrozenLake environment.
MC Control Demo
# To add a new cell, type '# %%'
# To add a new markdown cell, type '# %% [markdown]'
# %% [markdown]
# In this lab, we will implement MC policy iteration. Essentially, what we need to do is run policy iteration without `env.unwrapped.P`.
# %%
import numpy as np
from tqdm import tqdm
import gymnasium as gym
env = gym.make('FrozenLake-v1', render_mode="rgb_array", map_name="4x4", is_slippery=True) # or you can try '8x8'
env.reset()
n_state = env.unwrapped.observation_space.n
n_action = env.unwrapped.action_space.n
print("# of actions", n_action)
print("# of states", n_state)
# %% [markdown]
# Given certain policy, how can we compute the value function for each state.
# %%
def run_episode(env, policy, render=False):
""" Runs an episode and return the total reward """
obs = env.reset()
states = []
rewards = []
actions = []
while True:
if render:
env.render()
states.append(obs)
action = int(policy(obs))
actions.append(action)
obs, reward, done, _ = env.step(action)
rewards.append(reward)
if done:
break
return states, actions, rewards
# %%
class Policy:
def __init__(self, Q, N, eps):
self.Q = Q
self.N = N
self.eps = eps
self.gamma = 0.98
def update(self, states, actions, rewards):
returns = np.zeros_like(rewards)
s = 0
for i in reversed(range(len(returns))):
s = s * self.gamma + rewards[i]
returns[i] = s
for state, action, rs in zip(states, actions, returns):
self.N[state, action] += 1
self.Q[state, action] = (
self.Q[state, action] * (self.N[state, action] - 1) + rs)/self.N[state, action]
def __call__(self, state):
if np.random.rand() < self.eps:
return np.random.choice(n_action)
return np.argmax(self.Q[state, :])
# %% [markdown]
# Let's start to train the Q table.
# %%
Q = np.zeros((n_state, n_action))
N = np.zeros_like(Q)
policy = Policy(Q, N, 1)
for i in tqdm(range(20000)):
states, actions, rewards = run_episode(env, policy)
policy.update(states, actions, rewards)
policy.eps = max(0.01, policy.eps - 1.0/20000)
policy.eps = 0.0
scores = [sum(run_episode(env, policy, False)[2]) for _ in range(100)]
print("Final score: {:.2f}".format(np.mean(scores)))