Value Iteration Gridworld

Two

# Look at the backup tree above and the vector form Bellman equation to understand this code.
# Have them side by side while you are reading.
# Each of the 11 rows of the "matrix" P[s][a] has 4 tuples - one for each of the allowed actions. Each tuple / action is written in the format (probability, s') and is associated with the 3 possible next states that the agent may end up despite its intention to go to the desired state. The states are numbered sequentially from top left to bottom right.
import numpy as np
from rich import print

P = {
    0: {
        0: [(0.9, 0), (0.1, 1), (0, 4)],
        1: [(0.8, 1), (0.1, 4), (0.1, 0)],
        2: [(0.8, 4), (0.1, 1), (0.1, 0)],
        3: [(0.9, 0), (0.1, 4)],
    },
    1: {
        0: [(0.8, 1), (0.1, 2), (0.1, 0)],
        1: [(0.8, 2), (0.2, 1)],
        2: [(0.8, 1), (0.1, 0), (0.1, 2)],
        3: [(0.8, 0), (0.2, 1)],
    },
    2: {
        0: [(0.8, 2), (0.1, 3), (0.1, 1)],
        1: [(0.8, 3), (0.1, 5), (0.1, 2)],
        2: [(0.8, 5), (0.1, 1), (0.1, 3)],
        3: [(0.8, 1), (0.1, 2), (0.1, 5)],
    },
    3: {
        0: [(0.9, 3), (0.1, 2)],
        1: [(0.9, 3), (0.1, 6)],
        2: [(0.8, 6), (0.1, 2), (0.1, 3)],
        3: [(0.8, 2), (0.1, 3), (0.1, 6)],
    },
    4: {
        0: [(0.8, 0), (0.2, 4)],
        1: [(0.8, 4), (0.1, 7), (0.1, 0)],
        2: [(0.8, 7), (0.2, 4)],
        3: [(0.8, 4), (0.1, 0), (0.1, 7)],
    },
    5: {
        0: [(0.8, 2), (0.1, 6), (0.1, 5)],
        1: [(0.8, 6), (0.1, 9), (0.1, 2)],
        2: [(0.8, 9), (0.1, 5), (0.1, 6)],
        3: [(0.8, 5), (0.1, 2), (0.1, 9)],
    },
    6: {
        0: [(0.8, 3), (0.1, 6), (0.1, 5)],
        1: [(0.8, 6), (0.1, 10), (0.1, 3)],
        2: [(0.8, 10), (0.1, 5), (0.1, 6)],
        3: [(0.8, 5), (0.1, 3), (0.1, 10)],
    },
    7: {
        0: [(0.8, 4), (0.1, 8), (0.1, 7)],
        1: [(0.8, 8), (0.1, 7), (0.1, 4)],
        2: [(0.9, 7), (0.1, 8)],
        3: [(0.9, 7), (0.1, 4)],
    },
    8: {
        0: [(0.8, 8), (0.1, 9), (0.1, 7)],
        1: [(0.8, 9), (0.2, 8)],
        2: [(0.8, 8), (0.1, 7), (0.1, 9)],
        3: [(0.8, 7), (0.2, 8)],
    },
    9: {
        0: [(0.8, 5), (0.1, 10), (0.1, 8)],
        1: [(0.8, 9), (0.1, 9), (0.1, 5)],
        2: [(0.8, 9), (0.1, 8), (0.1, 10)],
        3: [(0.8, 8), (0.1, 5), (0.1, 9)],
    },
    10: {
        0: [(0.8, 6), (0.1, 10), (0.1, 9)],
        1: [(0.9, 10), (0.1, 6)],
        2: [(0.9, 10), (0.1, 9)],
        3: [(0.8, 9), (0.1, 6), (0.1, 10)],
    },
}

R = [0, 0, 0, 1, 0, 0, -1, 0, 0, 0, 0]
gamma = 0.9

States = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Actions = [0, 1, 2, 3]  # [east, north, south, west]
v = [0] * 11

# once v computed, we can calculate the optimal policy
optPolicy = [0] * 11

# value iteration
for i in range(100):
    for s in States:
        # trans[1] = s'
        # trans[0] = P_ss'
        q_0 = sum(trans[0] * v[trans[1]] for trans in P[s][0])
        q_1 = sum(trans[0] * v[trans[1]] for trans in P[s][1])
        q_2 = sum(trans[0] * v[trans[1]] for trans in P[s][2])
        q_3 = sum(trans[0] * v[trans[1]] for trans in P[s][3])

        v[s] = R[s] + gamma * max(q_0, q_1, q_2, q_3)


    for s in States:
        optPolicy[s] = int(
            np.argmax(
                a=[sum([trans[0] * v[trans[1]] for trans in P[s][a]]) for a in Actions]
            )
        )

print(v)
print(optPolicy)
    
[
    5.882322961884418,
    6.788994341526544,
    7.731922481317687,
    8.925605847317208,
    5.164964536161228,
    6.773577272302599,
    6.633030088993117,
    4.595878221066634,
    5.149735082549222,
    5.864985900325012,
    5.828164501942005
]
[1, 1, 1, 0, 0, 0, 0, 0, 1, 0, 0]
import numpy as np
import pandas as pd

# Gridworld parameters 
rows, cols = 3, 4
step_reward = -0.04
gamma = 0.9

# Terminal states and their rewards (row, col) using 0-indexed rows from top
terminal_states = {
    (0, 3): 1.0,  # top-right
    (1, 3): -1.0,
}  # just below top-right
walls = {(1, 1)}  # wall at position (row=1, col=1)

# Initialize value function
V = np.zeros((rows, cols))
for (i, j), r in terminal_states.items():
    V[i, j] = r

# Define actions
actions = {"up": (-1, 0), "down": (1, 0), "left": (0, -1), "right": (0, 1)}
action_list = list(actions.values())
def next_state_reward(state, action):
    i, j = state
    di, dj = action
    ni, nj = i + di, j + dj
    # If next state is off-grid or a wall, stay in place
    if (ni < 0 or ni >= rows or nj < 0 or nj >= cols) or ((ni, nj) in walls):
        return state, step_reward
    # If next state is terminal, get its terminal reward
    if (ni, nj) in terminal_states:
        return (ni, nj), terminal_states[(ni, nj)]
    # Otherwise, standard step reward
    return (ni, nj), step_reward


def transitions(state, action):
    # Stochastic model: intended 0.8, perpendicular slips 0.1 each
    if action == actions["up"]:
        slips = [actions["left"], actions["right"]]
    elif action == actions["down"]:
        slips = [actions["right"], actions["left"]]
    elif action == actions["left"]:
        slips = [actions["down"], actions["up"]]
    else:  # right
        slips = [actions["up"], actions["down"]]
    # Gather transitions
    results = []
    # intended
    s1, r1 = next_state_reward(state, action)
    results.append((0.8, s1, r1))
    # slips / sideways
    for slip in slips:
        s2, r2 = next_state_reward(state, slip)
        results.append((0.1, s2, r2))
    return results


# Value Iteration
theta = 1e-4
while True:
    delta = 0
    newV = V.copy()
    for i in range(rows):
        for j in range(cols):
            if (i, j) in terminal_states or (i, j) in walls:
                continue
            q_values = []
            for action in action_list:
                q = 0
                for prob, s_prime, reward in transitions((i, j), action):
                    q += prob * (reward + gamma * V[s_prime])
                q_values.append(q)
            newV[i, j] = max(q_values)
            delta = max(delta, abs(newV[i, j] - V[i, j]))
    V = newV
    if delta < theta:
        break

# Display the resulting value function
df = pd.DataFrame(
    V, index=[f"row{r}" for r in range(rows)], columns=[f"col{c}" for c in range(cols)]
)

print(df)
          col0      col1      col2      col3
row0  1.255424  1.510587  1.775947  1.000000
row1  1.053525  0.000000  1.156793 -1.000000
row2  0.863027  0.742828  0.901585  0.464972
Back to top