强化学习算法实现:从Q学习到深度强化学习
引言
强化学习是机器学习的重要分支,通过智能体与环境的交互学习最优策略。近年来,深度强化学习在游戏、机器人控制、自动驾驶等领域取得了突破性进展。强化学习的核心在于平衡探索与利用,通过不断试错来学习最优行为策略。本文将系统介绍强化学习算法的实现原理,从经典的Q学习到现代的深度强化学习算法。
强化学习基础
理解强化学习的基本概念是掌握算法实现的关键。
马尔可夫决策过程
马尔可夫决策过程(MDP)是强化学习的数学基础,定义了状态、动作、奖励、转移概率等核心概念。
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
class MDP:
def __init__(self, states, actions, rewards, transitions, gamma=0.9):
self.states = states
self.actions = actions
self.rewards = rewards
self.transitions = transitions
self.gamma = gamma
def get_reward(self, state, action, next_state):
"""获取奖励"""
return self.rewards.get((state, action, next_state), 0)
def get_transition_prob(self, state, action, next_state):
"""获取转移概率"""
return self.transitions.get((state, action, next_state), 0)
def get_next_states(self, state, action):
"""获取可能的下一状态"""
next_states = []
for (s, a, s_next) in self.transitions.keys():
if s == state and a == action:
next_states.append(s_next)
return next_states
class GridWorld(MDP):
def __init__(self, width, height, obstacles, goal, gamma=0.9):
self.width = width
self.height = height
self.obstacles = obstacles
self.goal = goal
self.gamma = gamma
# 构建状态和动作空间
self.states = [(i, j) for i in range(height) for j in range(width)
if (i, j) not in obstacles]
self.actions = ['up', 'down', 'left', 'right']
# 构建转移概率和奖励
self.transitions = {}
self.rewards = {}
for state in self.states:
for action in self.actions:
next_state = self._get_next_state(state, action)
if next_state in self.states:
self.transitions[(state, action, next_state)] = 1.0
if next_state == goal:
self.rewards[(state, action, next_state)] = 100
else:
self.rewards[(state, action, next_state)] = -1
def _get_next_state(self, state, action):
"""获取下一状态"""
i, j = state
if action == 'up':
return (max(0, i - 1), j)
elif action == 'down':
return (min(self.height - 1, i + 1), j)
elif action == 'left':
return (i, max(0, j - 1))
elif action == 'right':
return (i, min(self.width - 1, j + 1))
return state
def visualize(self, values=None, policy=None):
"""可视化网格世界"""
fig, ax = plt.subplots(figsize=(8, 6))
# 绘制网格
for i in range(self.height):
for j in range(self.width):
if (i, j) in self.obstacles:
ax.add_patch(plt.Rectangle((j, i), 1, 1, facecolor='black'))
elif (i, j) == self.goal:
ax.add_patch(plt.Rectangle((j, i), 1, 1, facecolor='green'))
else:
ax.add_patch(plt.Rectangle((j, i), 1, 1, facecolor='white'))
# 显示值函数
if values and (i, j) in values:
ax.text(j + 0.5, i + 0.5, f'{values[(i, j)]:.1f}',
ha='center', va='center')
# 显示策略
if policy and (i, j) in policy:
action = policy[(i, j)]
if action == 'up':
ax.arrow(j + 0.5, i + 0.3, 0, 0.3, head_width=0.1)
elif action == 'down':
ax.arrow(j + 0.5, i + 0.7, 0, -0.3, head_width=0.1)
elif action == 'left':
ax.arrow(j + 0.3, i + 0.5, 0.3, 0, head_width=0.1)
elif action == 'right':
ax.arrow(j + 0.7, i + 0.5, -0.3, 0, head_width=0.1)
ax.set_xlim(0, self.width)
ax.set_ylim(0, self.height)
ax.set_aspect('equal')
ax.set_xticks(range(self.width + 1))
ax.set_yticks(range(self.height + 1))
ax.grid(True)
plt.show()
价值函数与策略
价值函数评估状态或状态-动作对的价值,策略定义了在给定状态下选择动作的概率分布。
class ValueFunction:
def __init__(self, states, gamma=0.9):
self.states = states
self.gamma = gamma
self.values = {state: 0.0 for state in states}
def update(self, state, new_value):
"""更新价值函数"""
self.values[state] = new_value
def get_value(self, state):
"""获取状态价值"""
return self.values.get(state, 0.0)
def get_values(self):
"""获取所有状态价值"""
return self.values.copy()
class QFunction:
def __init__(self, states, actions, gamma=0.9):
self.states = states
self.actions = actions
self.gamma = gamma
self.q_values = defaultdict(lambda: defaultdict(float))
def update(self, state, action, new_value):
"""更新Q值"""
self.q_values[state][action] = new_value
def get_q_value(self, state, action):
"""获取Q值"""
return self.q_values[state][action]
def get_max_q_value(self, state):
"""获取最大Q值"""
if state not in self.q_values:
return 0.0
return max(self.q_values[state].values())
def get_best_action(self, state):
"""获取最佳动作"""
if state not in self.q_values:
return np.random.choice(self.actions)
best_actions = []
max_q = max(self.q_values[state].values())
for action, q_value in self.q_values[state].items():
if q_value == max_q:
best_actions.append(action)
return np.random.choice(best_actions)
class Policy:
def __init__(self, states, actions):
self.states = states
self.actions = actions
self.policy = defaultdict(lambda: defaultdict(float))
# 初始化随机策略
for state in states:
for action in actions:
self.policy[state][action] = 1.0 / len(actions)
def get_action_prob(self, state, action):
"""获取动作概率"""
return self.policy[state][action]
def get_action(self, state):
"""根据策略选择动作"""
probs = [self.policy[state][action] for action in self.actions]
return np.random.choice(self.actions, p=probs)
def update_policy(self, state, action, new_prob):
"""更新策略"""
self.policy[state][action] = new_prob
def make_greedy(self, q_function):
"""将策略变为贪婪策略"""
for state in self.states:
best_action = q_function.get_best_action(state)
for action in self.actions:
if action == best_action:
self.policy[state][action] = 1.0
else:
self.policy[state][action] = 0.0

Q学习算法
Q学习是强化学习的经典算法,通过更新Q值来学习最优策略。
基础Q学习
class QLearning:
def __init__(self, states, actions, alpha=0.1, gamma=0.9, epsilon=0.1):
self.states = states
self.actions = actions
self.alpha = alpha
self.gamma = gamma
self.epsilon = epsilon
self.q_function = QFunction(states, actions, gamma)
self.episode_rewards = []
def choose_action(self, state, training=True):
"""选择动作"""
if training and np.random.random() < self.epsilon:
return np.random.choice(self.actions)
else:
return self.q_function.get_best_action(state)
def update(self, state, action, reward, next_state):
"""更新Q值"""
current_q = self.q_function.get_q_value(state, action)
max_next_q = self.q_function.get_max_q_value(next_state)
new_q = current_q + self.alpha * (reward + self.gamma * max_next_q - current_q)
self.q_function.update(state, action, new_q)
def train(self, env, episodes=1000):
"""训练Q学习智能体"""
for episode in range(episodes):
state = env.reset()
total_reward = 0
while True:
action = self.choose_action(state)
next_state, reward, done = env.step(action)
self.update(state, action, reward, next_state)
state = next_state
total_reward += reward
if done:
break
self.episode_rewards.append(total_reward)
# 衰减探索率
if episode % 100 == 0:
self.epsilon = max(0.01, self.epsilon * 0.99)
if episode % 100 == 0:
avg_reward = np.mean(self.episode_rewards[-100:])
print(f"Episode {episode}, Average Reward: {avg_reward:.2f}")
def get_policy(self):
"""获取策略"""
policy = Policy(self.states, self.actions)
policy.make_greedy(self.q_function)
return policy
class GridWorldEnv:
def __init__(self, grid_world):
self.grid_world = grid_world
self.current_state = None
def reset(self):
"""重置环境"""
# 随机选择起始状态
available_states = [s for s in self.grid_world.states if s != self.grid_world.goal]
self.current_state = np.random.choice(available_states)
return self.current_state
def step(self, action):
"""执行动作"""
next_state = self.grid_world._get_next_state(self.current_state, action)
if next_state not in self.grid_world.states:
next_state = self.current_state
reward = self.grid_world.get_reward(self.current_state, action, next_state)
done = (next_state == self.grid_world.goal)
self.current_state = next_state
return next_state, reward, done
双Q学习
双Q学习通过维护两个Q函数来减少过估计偏差。
class DoubleQLearning:
def __init__(self, states, actions, alpha=0.1, gamma=0.9, epsilon=0.1):
self.states = states
self.actions = actions
self.alpha = alpha
self.gamma = gamma
self.epsilon = epsilon
self.q1 = QFunction(states, actions, gamma)
self.q2 = QFunction(states, actions, gamma)
self.episode_rewards = []
def choose_action(self, state, training=True):
"""选择动作"""
if training and np.random.random() < self.epsilon:
return np.random.choice(self.actions)
else:
# 使用两个Q函数的平均值
q_values = {}
for action in self.actions:
q1_val = self.q1.get_q_value(state, action)
q2_val = self.q2.get_q_value(state, action)
q_values[action] = (q1_val + q2_val) / 2
best_actions = [a for a, q in q_values.items() if q == max(q_values.values())]
return np.random.choice(best_actions)
def update(self, state, action, reward, next_state):
"""更新Q值"""
if np.random.random() < 0.5:
# 更新Q1
current_q = self.q1.get_q_value(state, action)
max_next_q = self.q2.get_max_q_value(next_state)
new_q = current_q + self.alpha * (reward + self.gamma * max_next_q - current_q)
self.q1.update(state, action, new_q)
else:
# 更新Q2
current_q = self.q2.get_q_value(state, action)
max_next_q = self.q1.get_max_q_value(next_state)
new_q = current_q + self.alpha * (reward + self.gamma * max_next_q - current_q)
self.q2.update(state, action, new_q)
def train(self, env, episodes=1000):
"""训练双Q学习智能体"""
for episode in range(episodes):
state = env.reset()
total_reward = 0
while True:
action = self.choose_action(state)
next_state, reward, done = env.step(action)
self.update(state, action, reward, next_state)
state = next_state
total_reward += reward
if done:
break
self.episode_rewards.append(total_reward)
if episode % 100 == 0:
self.epsilon = max(0.01, self.epsilon * 0.99)
avg_reward = np.mean(self.episode_rewards[-100:])
print(f"Episode {episode}, Average Reward: {avg_reward:.2f}")

策略梯度方法
策略梯度方法直接优化策略参数,适用于连续动作空间和高维状态空间。
REINFORCE算法
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
class PolicyNetwork(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(PolicyNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, action_dim)
def forward(self, state):
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
action_probs = F.softmax(self.fc3(x), dim=-1)
return action_probs
class REINFORCE:
def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99):
self.state_dim = state_dim
self.action_dim = action_dim
self.gamma = gamma
self.policy_net = PolicyNetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
self.episode_states = []
self.episode_actions = []
self.episode_rewards = []
def select_action(self, state):
"""选择动作"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs = self.policy_net(state_tensor)
action_dist = torch.distributions.Categorical(action_probs)
action = action_dist.sample()
return action.item(), action_dist.log_prob(action)
def store_transition(self, state, action, reward):
"""存储转移"""
self.episode_states.append(state)
self.episode_actions.append(action)
self.episode_rewards.append(reward)
def compute_returns(self):
"""计算回报"""
returns = []
discounted_reward = 0
for reward in reversed(self.episode_rewards):
discounted_reward = reward + self.gamma * discounted_reward
returns.insert(0, discounted_reward)
# 标准化回报
returns = torch.FloatTensor(returns)
returns = (returns - returns.mean()) / (returns.std() + 1e-8)
return returns
def update_policy(self):
"""更新策略"""
if len(self.episode_states) == 0:
return
returns = self.compute_returns()
policy_loss = 0
for i, (state, action, return_val) in enumerate(
zip(self.episode_states, self.episode_actions, returns)
):
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs = self.policy_net(state_tensor)
action_dist = torch.distributions.Categorical(action_probs)
log_prob = action_dist.log_prob(torch.tensor(action))
policy_loss += -log_prob * return_val
self.optimizer.zero_grad()
policy_loss.backward()
self.optimizer.step()
# 清空经验
self.episode_states = []
self.episode_actions = []
self.episode_rewards = []
def train(self, env, episodes=1000):
"""训练REINFORCE智能体"""
episode_rewards = []
for episode in range(episodes):
state = env.reset()
total_reward = 0
while True:
action, log_prob = self.select_action(state)
next_state, reward, done = env.step(action)
self.store_transition(state, action, reward)
state = next_state
total_reward += reward
if done:
break
self.update_policy()
episode_rewards.append(total_reward)
if episode % 100 == 0:
avg_reward = np.mean(episode_rewards[-100:])
print(f"Episode {episode}, Average Reward: {avg_reward:.2f}")
Actor-Critic算法
Actor-Critic结合了策略梯度和价值函数方法。
class ValueNetwork(nn.Module):
def __init__(self, state_dim, hidden_dim=128):
super(ValueNetwork, self).__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, 1)
def forward(self, state):
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
value = self.fc3(x)
return value
class ActorCritic:
def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99):
self.state_dim = state_dim
self.action_dim = action_dim
self.gamma = gamma
self.policy_net = PolicyNetwork(state_dim, action_dim)
self.value_net = ValueNetwork(state_dim)
self.policy_optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
self.value_optimizer = optim.Adam(self.value_net.parameters(), lr=lr)
def select_action(self, state):
"""选择动作"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs = self.policy_net(state_tensor)
action_dist = torch.distributions.Categorical(action_probs)
action = action_dist.sample()
return action.item(), action_dist.log_prob(action)
def get_value(self, state):
"""获取状态价值"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
return self.value_net(state_tensor)
def update(self, state, action, reward, next_state, done):
"""更新网络"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)
# 计算价值
value = self.value_net(state_tensor)
next_value = self.value_net(next_state_tensor) if not done else torch.tensor(0.0)
# 计算TD误差
td_target = reward + self.gamma * next_value
td_error = td_target - value
# 更新价值网络
value_loss = F.mse_loss(value, td_target.detach())
self.value_optimizer.zero_grad()
value_loss.backward()
self.value_optimizer.step()
# 更新策略网络
action_probs = self.policy_net(state_tensor)
action_dist = torch.distributions.Categorical(action_probs)
log_prob = action_dist.log_prob(torch.tensor(action))
policy_loss = -log_prob * td_error.detach()
self.policy_optimizer.zero_grad()
policy_loss.backward()
self.policy_optimizer.step()
def train(self, env, episodes=1000):
"""训练Actor-Critic智能体"""
episode_rewards = []
for episode in range(episodes):
state = env.reset()
total_reward = 0
while True:
action, log_prob = self.select_action(state)
next_state, reward, done = env.step(action)
self.update(state, action, reward, next_state, done)
state = next_state
total_reward += reward
if done:
break
episode_rewards.append(total_reward)
if episode % 100 == 0:
avg_reward = np.mean(episode_rewards[-100:])
print(f"Episode {episode}, Average Reward: {avg_reward:.2f}")

深度强化学习
深度强化学习结合了深度学习和强化学习,能够处理高维状态空间。
Deep Q-Network (DQN)
class DQN(nn.Module):
def __init__(self, state_dim, action_dim, hidden_dim=128):
super(DQN, self).__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, action_dim)
def forward(self, state):
x = F.relu(self.fc1(state))
x = F.relu(self.fc2(x))
q_values = self.fc3(x)
return q_values
class ReplayBuffer:
def __init__(self, capacity):
self.buffer = []
self.capacity = capacity
self.position = 0
def push(self, state, action, reward, next_state, done):
if len(self.buffer) < self.capacity:
self.buffer.append(None)
self.buffer[self.position] = (state, action, reward, next_state, done)
self.position = (self.position + 1) % self.capacity
def sample(self, batch_size):
return random.sample(self.buffer, batch_size)
def __len__(self):
return len(self.buffer)
class DQNAgent:
def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99,
epsilon=0.1, epsilon_decay=0.995, epsilon_min=0.01):
self.state_dim = state_dim
self.action_dim = action_dim
self.gamma = gamma
self.epsilon = epsilon
self.epsilon_decay = epsilon_decay
self.epsilon_min = epsilon_min
self.q_net = DQN(state_dim, action_dim)
self.target_net = DQN(state_dim, action_dim)
self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr)
self.replay_buffer = ReplayBuffer(10000)
self.batch_size = 32
self.update_target_freq = 100
self.step_count = 0
def select_action(self, state, training=True):
"""选择动作"""
if training and np.random.random() < self.epsilon:
return np.random.choice(self.action_dim)
else:
state_tensor = torch.FloatTensor(state).unsqueeze(0)
q_values = self.q_net(state_tensor)
return q_values.argmax().item()
def store_transition(self, state, action, reward, next_state, done):
"""存储转移"""
self.replay_buffer.push(state, action, reward, next_state, done)
def update_target_network(self):
"""更新目标网络"""
self.target_net.load_state_dict(self.q_net.state_dict())
def train(self):
"""训练DQN"""
if len(self.replay_buffer) < self.batch_size:
return
batch = self.replay_buffer.sample(self.batch_size)
states, actions, rewards, next_states, dones = zip(*batch)
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions)
rewards = torch.FloatTensor(rewards)
next_states = torch.FloatTensor(next_states)
dones = torch.BoolTensor(dones)
current_q_values = self.q_net(states).gather(1, actions.unsqueeze(1))
next_q_values = self.target_net(next_states).max(1)[0].detach()
target_q_values = rewards + (self.gamma * next_q_values * ~dones)
loss = F.mse_loss(current_q_values.squeeze(), target_q_values)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# 更新探索率
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
# 更新目标网络
self.step_count += 1
if self.step_count % self.update_target_freq == 0:
self.update_target_network()
def train_episodes(self, env, episodes=1000):
"""训练多个回合"""
episode_rewards = []
for episode in range(episodes):
state = env.reset()
total_reward = 0
while True:
action = self.select_action(state)
next_state, reward, done = env.step(action)
self.store_transition(state, action, reward, next_state, done)
self.train()
state = next_state
total_reward += reward
if done:
break
episode_rewards.append(total_reward)
if episode % 100 == 0:
avg_reward = np.mean(episode_rewards[-100:])
print(f"Episode {episode}, Average Reward: {avg_reward:.2f}, Epsilon: {self.epsilon:.3f}")
实际应用案例
通过具体的应用案例,我们可以更好地理解强化学习算法的实际应用。
游戏AI
某游戏AI项目使用DQN算法训练智能体。
class GameEnvironment:
def __init__(self, game_type='cartpole'):
self.game_type = game_type
if game_type == 'cartpole':
self.state_dim = 4
self.action_dim = 2
elif game_type == 'mountain_car':
self.state_dim = 2
self.action_dim = 3
def reset(self):
"""重置环境"""
if self.game_type == 'cartpole':
# 简化的CartPole环境
self.state = np.random.uniform(-0.1, 0.1, 4)
self.steps = 0
return self.state
def step(self, action):
"""执行动作"""
if self.game_type == 'cartpole':
# 简化的CartPole动力学
x, x_dot, theta, theta_dot = self.state
if action == 0:
force = -1
else:
force = 1
# 简化的物理模拟
x_dot += force * 0.1
x += x_dot * 0.02
theta_dot += np.sin(theta) * 0.1
theta += theta_dot * 0.02
self.state = np.array([x, x_dot, theta, theta_dot])
self.steps += 1
# 判断是否结束
done = (abs(x) > 2.4 or abs(theta) > 0.2 or self.steps > 200)
reward = 1 if not done else 0
return self.state, reward, done
def train_game_ai():
"""训练游戏AI"""
env = GameEnvironment('cartpole')
agent = DQNAgent(env.state_dim, env.action_dim)
# 训练
agent.train_episodes(env, episodes=1000)
# 测试
test_rewards = []
for _ in range(100):
state = env.reset()
total_reward = 0
while True:
action = agent.select_action(state, training=False)
state, reward, done = env.step(action)
total_reward += reward
if done:
break
test_rewards.append(total_reward)
print(f"Test Average Reward: {np.mean(test_rewards):.2f}")
return agent
机器人控制
某机器人控制项目使用Actor-Critic算法。
class RobotEnvironment:
def __init__(self):
self.state_dim = 6 # 位置和速度
self.action_dim = 3 # 三个关节的控制信号
self.state = np.zeros(6)
self.target = np.array([1.0, 1.0, 1.0]) # 目标位置
def reset(self):
"""重置环境"""
self.state = np.random.uniform(-0.1, 0.1, 6)
return self.state
def step(self, action):
"""执行动作"""
# 简化的机器人动力学
position = self.state[:3]
velocity = self.state[3:]
# 更新状态
velocity += action * 0.1
position += velocity * 0.02
self.state = np.concatenate([position, velocity])
# 计算奖励
distance_to_target = np.linalg.norm(position - self.target)
reward = -distance_to_target
# 判断是否到达目标
done = distance_to_target < 0.1
return self.state, reward, done
def train_robot_control():
"""训练机器人控制"""
env = RobotEnvironment()
agent = ActorCritic(env.state_dim, env.action_dim)
# 训练
agent.train(env, episodes=1000)
return agent
结论
强化学习是AI领域的重要分支,通过智能体与环境的交互学习最优策略。从经典的Q学习到现代的深度强化学习,算法不断演进,应用领域不断扩展。
在实际应用中,需要根据具体问题选择合适的算法。Q学习适用于离散状态和动作空间,策略梯度方法适用于连续动作空间,深度强化学习适用于高维状态空间。通过不断实践和优化,强化学习算法能够在游戏、机器人控制、自动驾驶等领域发挥重要作用。
随着技术的不断发展,强化学习算法将变得更加高效和智能。多智能体强化学习、元学习、模仿学习等新技术为强化学习的发展提供了新的方向。通过持续的技术创新和应用实践,强化学习将为AI技术的发展提供强有力的支撑。