# Look at the backup tree above and the vector form Bellman equation to understand this code.
# Have them side by side while you are reading.
# Each of the 11 rows of the "matrix" P[s][a] has 4 tuples - one for each of the allowed actions. Each tuple / action is written in the format (probability, s') and is associated with the 3 possible next states that the agent may end up despite its intention to go to the desired state. The states are numbered sequentially from top left to bottom right.
import numpy as np
from rich import print
= {
P 0: {
0: [(0.9, 0), (0.1, 1), (0, 4)],
1: [(0.8, 1), (0.1, 4), (0.1, 0)],
2: [(0.8, 4), (0.1, 1), (0.1, 0)],
3: [(0.9, 0), (0.1, 4)],
},1: {
0: [(0.8, 1), (0.1, 2), (0.1, 0)],
1: [(0.8, 2), (0.2, 1)],
2: [(0.8, 1), (0.1, 0), (0.1, 2)],
3: [(0.8, 0), (0.2, 1)],
},2: {
0: [(0.8, 2), (0.1, 3), (0.1, 1)],
1: [(0.8, 3), (0.1, 5), (0.1, 2)],
2: [(0.8, 5), (0.1, 1), (0.1, 3)],
3: [(0.8, 1), (0.1, 2), (0.1, 5)],
},3: {
0: [(0.9, 3), (0.1, 2)],
1: [(0.9, 3), (0.1, 6)],
2: [(0.8, 6), (0.1, 2), (0.1, 3)],
3: [(0.8, 2), (0.1, 3), (0.1, 6)],
},4: {
0: [(0.8, 0), (0.2, 4)],
1: [(0.8, 4), (0.1, 7), (0.1, 0)],
2: [(0.8, 7), (0.2, 4)],
3: [(0.8, 4), (0.1, 0), (0.1, 7)],
},5: {
0: [(0.8, 2), (0.1, 6), (0.1, 5)],
1: [(0.8, 6), (0.1, 9), (0.1, 2)],
2: [(0.8, 9), (0.1, 5), (0.1, 6)],
3: [(0.8, 5), (0.1, 2), (0.1, 9)],
},6: {
0: [(0.8, 3), (0.1, 6), (0.1, 5)],
1: [(0.8, 6), (0.1, 10), (0.1, 3)],
2: [(0.8, 10), (0.1, 5), (0.1, 6)],
3: [(0.8, 5), (0.1, 3), (0.1, 10)],
},7: {
0: [(0.8, 4), (0.1, 8), (0.1, 7)],
1: [(0.8, 8), (0.1, 7), (0.1, 4)],
2: [(0.9, 7), (0.1, 8)],
3: [(0.9, 7), (0.1, 4)],
},8: {
0: [(0.8, 8), (0.1, 9), (0.1, 7)],
1: [(0.8, 9), (0.2, 8)],
2: [(0.8, 8), (0.1, 7), (0.1, 9)],
3: [(0.8, 7), (0.2, 8)],
},9: {
0: [(0.8, 5), (0.1, 10), (0.1, 8)],
1: [(0.8, 9), (0.1, 9), (0.1, 5)],
2: [(0.8, 9), (0.1, 8), (0.1, 10)],
3: [(0.8, 8), (0.1, 5), (0.1, 9)],
},10: {
0: [(0.8, 6), (0.1, 10), (0.1, 9)],
1: [(0.9, 10), (0.1, 6)],
2: [(0.9, 10), (0.1, 9)],
3: [(0.8, 9), (0.1, 6), (0.1, 10)],
},
}
= [0, 0, 0, 1, 0, 0, -1, 0, 0, 0, 0]
R = 0.9
gamma
= [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
States = [0, 1, 2, 3] # [east, north, south, west]
Actions
Value Iteration Gridworld
Two
= [0] * 11
v
# once v computed, we can calculate the optimal policy
= [0] * 11
optPolicy
# value iteration
for i in range(100):
for s in States:
# trans[1] = s'
# trans[0] = P_ss'
= sum(trans[0] * v[trans[1]] for trans in P[s][0])
q_0 = sum(trans[0] * v[trans[1]] for trans in P[s][1])
q_1 = sum(trans[0] * v[trans[1]] for trans in P[s][2])
q_2 = sum(trans[0] * v[trans[1]] for trans in P[s][3])
q_3
= R[s] + gamma * max(q_0, q_1, q_2, q_3)
v[s]
for s in States:
= int(
optPolicy[s]
np.argmax(=[sum([trans[0] * v[trans[1]] for trans in P[s][a]]) for a in Actions]
a
)
)
print(v)
print(optPolicy)
[ 5.882322961884418, 6.788994341526544, 7.731922481317687, 8.925605847317208, 5.164964536161228, 6.773577272302599, 6.633030088993117, 4.595878221066634, 5.149735082549222, 5.864985900325012, 5.828164501942005 ]
[1, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0]
import numpy as np
import pandas as pd
# Gridworld parameters
= 3, 4
rows, cols = -0.04
step_reward = 0.9
gamma
# Terminal states and their rewards (row, col) using 0-indexed rows from top
= {
terminal_states 0, 3): 1.0, # top-right
(1, 3): -1.0,
(# just below top-right
} = {(1, 1)} # wall at position (row=1, col=1)
walls
# Initialize value function
= np.zeros((rows, cols))
V for (i, j), r in terminal_states.items():
= r
V[i, j]
# Define actions
= {"up": (-1, 0), "down": (1, 0), "left": (0, -1), "right": (0, 1)}
actions = list(actions.values()) action_list
def next_state_reward(state, action):
= state
i, j = action
di, dj = i + di, j + dj
ni, nj # If next state is off-grid or a wall, stay in place
if (ni < 0 or ni >= rows or nj < 0 or nj >= cols) or ((ni, nj) in walls):
return state, step_reward
# If next state is terminal, get its terminal reward
if (ni, nj) in terminal_states:
return (ni, nj), terminal_states[(ni, nj)]
# Otherwise, standard step reward
return (ni, nj), step_reward
def transitions(state, action):
# Stochastic model: intended 0.8, perpendicular slips 0.1 each
if action == actions["up"]:
= [actions["left"], actions["right"]]
slips elif action == actions["down"]:
= [actions["right"], actions["left"]]
slips elif action == actions["left"]:
= [actions["down"], actions["up"]]
slips else: # right
= [actions["up"], actions["down"]]
slips # Gather transitions
= []
results # intended
= next_state_reward(state, action)
s1, r1 0.8, s1, r1))
results.append((# slips / sideways
for slip in slips:
= next_state_reward(state, slip)
s2, r2 0.1, s2, r2))
results.append((return results
# Value Iteration
= 1e-4
theta while True:
= 0
delta = V.copy()
newV for i in range(rows):
for j in range(cols):
if (i, j) in terminal_states or (i, j) in walls:
continue
= []
q_values for action in action_list:
= 0
q for prob, s_prime, reward in transitions((i, j), action):
+= prob * (reward + gamma * V[s_prime])
q
q_values.append(q)= max(q_values)
newV[i, j] = max(delta, abs(newV[i, j] - V[i, j]))
delta = newV
V if delta < theta:
break
# Display the resulting value function
= pd.DataFrame(
df =[f"row{r}" for r in range(rows)], columns=[f"col{c}" for c in range(cols)]
V, index
)
print(df)
col0 col1 col2 col3 row0 1.255424 1.510587 1.775947 1.000000 row1 1.053525 0.000000 1.156793 -1.000000 row2 0.863027 0.742828 0.901585 0.464972