强化学习-DQN

Deep Reinforcement Learning

深度学习(Deep Learning)已经在诸多领域产生了突破性的发展,通常,面对的深度学习任务都属于监督学习(Supervised Learning)范畴,即给定输入,同时告诉模型这个输入对应的输出(Label),前向传播计算损失,反向传播计算梯度并优化参数,通过这样一个pipeline, 可以解决计算机视觉、自然语言处理中的诸多问题。

强化学习(Reinforcement Learning)与深度学习不同,强化学习关注的是执行某一动作(Action)后获得的收益(Reward),比如,小明参加数学考试,考完后拿到成绩发现考了100分,于是他的妈妈便奖励他100元,此时收益设为100,而下次考试情况很糟糕,只有59分,他妈妈很生气于是批评了他一顿,此时便可以设收益为-100。小明需要通过不断学习,使得收益最大化。这就是强化学习的思路。

强化学习的核心为 Markov Decision Process,关注一组变量: <S,A,R,S',Terminate><当前状态, 动作,奖励,下一个动作,是否结束> (对于强化学习的基础知识,本文就不在此展开)

深度强化学习是深度学习和强化学习的结合体,同时,深度学习的出现促进了强化学习的发展,Deepmind在2013年《Playing Atari with Deep Reinforcement Learning》中首次提出使用deep neural netwoks结合Q-learning算法突破Atari游戏问题。

深度强化学习有两种思路:

  1. model-free: 无模型(模型指对环境建模, M(S,A)-> R, S’)

    • Policy-Based: 基于策略 pi(a|s)
    • Value-Based: 基于价值 V(s), Q(s,a)
    • Actor-Critic: 策略&&价值
  2. model-based: 基于模型,多采用planning method.

Deep Q Learning and Deep Q Networks

DQN是一种基于价值的的强化学习模型,其中状态价值函数(State Value Function) V(s) 根据当前状态输出对当前状态下的价值评价。而动作状态价值函数(Action-State Value Function) Q(s,a) 为在当前状态下,执行了动作a后的价值评价。

DQN实际为Deep Neural Neworks和基于价值的Q-learning算法的结合, 使用 Neural Network 来进行值估计,即使用 Neural Network 拟合 V(s) 或者 Q(s,a),Q-learning进行值迭代,并更新 Neural Nework.

Q Learning 算法

Q learning 主要通过迭代 选择动作 – 更新动作价值函数 两个步骤来学习并逼近最优的价值函数

DNN + Q Learning

使用 Deep Neural Networks 作为 Action State Value Function, 神经网络的优化需要计算loss和梯度,在Q learning迭代中,会利用未来状态的价值Q(s',a)和reward和来更新当前状态的价值Q(s,a) 即如下图:

通过利用未来状态预测得到的 Q'(s,a) 与当前状态计算输出得到的 Q(s,a) 计算loss和梯度,并优化网络参数.

DQN算法

其中, 有一个 Replay Memory (重放机制),主要用来存储经历五元组<S,A,R,S',Terminate>, 在训练时,会从replay memory中采样一个mini batch数据用来计算并更新网络参数。

DQN实现

根据提供的算法,实现DQN主要有三个方面:

  1. 与环境的交互
  2. 设计价值函数网络
  3. 使用Q Learning算法更新网络

使用 PyTorch 和 OpenAI gym 环境实现简单的 DQN

Step 0x00 构建 QNet (Q(s,a))

QNet输入当前的状态,输出为每个动作的收益, 而不是动作的概率分布

由于直接利用图像数据为状态,因此模型基于卷积神经网络搭建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
class QNet(nn.Module):

def __init__(self, in_chan, output_actions, use_gpu=False):
super(QNet, self).__init__()
self.use_gpu = use_gpu
self.output_actions = output_actions

self.conv_layers = nn.Sequential(
nn.Conv2d(in_channels=in_chan, out_channels=32, kernel_size=5, padding=2),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.BatchNorm2d(32),
nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.BatchNorm2d(64),
nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=2, stride=2),
nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1),
nn.ReLU(),
nn.BatchNorm2d(128),
nn.MaxPool2d(kernel_size=2, stride=2),
)
self.output_layers = nn.Sequential(
nn.Linear(128*5*6, 512),
nn.Linear(512, 128),
nn.Linear(128, self.output_actions),
)

def forward(self, x):
x = self.conv_layers(x)
x = x.view(-1, 128*5*6)
x = self.output_layers(x)
return x

Step 0x01 构建 ReplayMemory

创建Replay Memory, Replay Memory 主要存放五元组,支持随机抽取一个mini batch的数据用于训练

代码借鉴了 PyTorch Official Tutorial

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward', 'terminate'))


class ReplayMemory(object):

def __init__(self, capacity):
self.capacity = capacity
self.memory = []
self.position = 0

def push(self, state, action, next_state, reward, terminate):
if len(self.memory) < self.capacity:
self.memory.append(None)
self.memory[self.position] = Transition(
state=state,
action=action,
next_state=next_state,
reward=reward,
terminate=terminate
)
self.position = (self.position + 1) % self.capacity

def sample(self, batch_size):
return random.sample(self.memory, batch_size)

def save(self, path):
f = open(path, 'wb')
pickle.dump(self.memory, f)
f.close()

def __len__(self):
return len(self.memory)

Step 0x03 Choose Action

设计DQN中动作选择函数(choose action),动作选择会涉及到强化学习领域最关键的技术之一 – Exploration and Exploitation 这是强化学习中的一个trade-off

  • Exploration: 放弃当前模型输出的最优策略,而是通过随机尝试一个动作,用于发现新的方法,突破局部最优

  • Exploitation: 执行当前状态下模型输出的最优策略.

因此,在选择动作时,我们需要设计在某概率下输出一个随机动作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def choose_action(self, step, current_state):
sample = random.random()
eps_threshold = self.epsilon_end + (self.epsilon_start-self.epsilon_end)*math.exp(-step*self.epsilon_decay)
if sample < eps_threshold:
# 随机动作
q_value = np.random.rand(2)
action_index = np.argmax(q_value)
q_value = q_value[action_index]
else:
# 选择输出价值最高的动作
state = Variable(torch.FloatTensor(current_state))
state = state.unsqueeze(0)
q_value = self.eval_net(state).squeeze(0).data.numpy()
action_index = np.argmax(q_value)
q_value = q_value[action_index]
return action_index, q_value

Step 0x04 Update by Q Learning

取样一个mini batch的数据,实现Q Learning算法,计算网络loss和梯度并更新参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def learn(self, episode):
# 学习, 在一定步骤之后,将eval网络参数拷贝到target网络
if self.step_counter % self.update_target == 0:
self.target_net.load_state_dict(self.eval_net.state_dict())

batch_data = self.get_batch_data()
batch_state = [x.state for x in batch_data]
batch_action = [x.action for x in batch_data]
batch_reward = [x.reward for x in batch_data]
batch_next_state = [x.next_state for x in batch_data]
batch_terminate = [x.terminate for x in batch_data]

batch_state = Variable(torch.FloatTensor(batch_state))
batch_next_state = Variable(torch.FloatTensor(batch_next_state))

batch_reward = Variable(torch.FloatTensor(batch_reward))

q_eval = self.eval_net(batch_state)
q_eval_value = Variable(torch.zeros(self.train_batch_size))

# 计算Q(s,a)
for i in xrange(self.train_batch_size):
q_eval_value[i] = q_eval[i, batch_action[i]]

q_next = self.target_net(batch_next_state).detach()

q_next, _ = torch.max(q_next, 1)
q_target = Variable(torch.zeros(self.train_batch_size))

# 计算 r + gamma*Q(s', a)
for i in xrange(self.train_batch_size):
if batch_terminate[i]:
q_target[i] = batch_reward[i]
else:
q_target[i] = batch_reward[i] + self.gamma * q_next[i]

loss = self.loss_criterion(q_eval_value, q_target)

self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
print("[Episode:{}][Step:{}]Loss:{}".format(episode, self.step_counter, loss.data[0]))

if (self.step_counter+1) % 500 == 0:
self.save_parameters('params/', self.step_counter)

Step 0x05 获取游戏状态

获取游戏状态,通过gym获得屏幕图像数据:

1
2
3
4
5
6
7
8
9
10
11

def get_screen(frames):
states = []
for frame in xrange(frames):
screen = env.render(mode='rgb_array')
img = cv2.resize(screen, (100, 80))
img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
states.append(img)
states = np.stack(states, axis=0)
states = np.array(states) / 255.0
return states

完整代码 DQN 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
class DQN(object):

def __init__(self, config):
"""
:param config: 模型配置参数
"""
self.num_actions = config['num_actions']
self.use_gpu = config['use_gpu']

self.epsilon_start = config['epsilon_start']
self.epsilon_end = config['epsilon_end']
self.epsilon_decay = config['epsilon_decay']

self.gamma = config['gamma']

self.update_target = config['update_target']
self.epoches = config['epoches']
self.train_batch_size = config['train_batch_size']

# 全局计时器
self.step_counter = 0

self.eval_net = QNet(2, self.num_actions, self.use_gpu)
self.target_net = QNet(2, self.num_actions, self.use_gpu)

learning_rate = config['learning_rate']

self.optimizer = optimizer.Adam(params=self.eval_net.parameters(), lr=learning_rate)
self.loss_criterion = nn.MSELoss()

self.replay_memory_size = config['replay_memory_size']
self.replay_memory = ReplayMemory(self.replay_memory_size)

self.current_state = None

def choose_action(self, step, current_state):
# 同上 ....

def apply_action(self, env, action):
observation, _, terminate, _ = env.step(action)
position, velocity = observation
reward = position + 0.5
next_state = get_screen(2)
return reward, next_state, terminate

def store_memory(self, state, action, reward, next_state, terminate):
# 存储历史
self.replay_memory.push(
state=state,
next_state=next_state,
reward=reward,
action=action,
terminate=terminate
)

def save_parameters(self, param_dir, step_counter):
# 保存模型参数
if not os.path.exists(param_dir):
os.mkdir(param_dir)
torch.save(self.eval_net.state_dict(), param_dir+'eval_net_%s_param.pth' % step_counter)
torch.save(self.target_net.state_dict(), param_dir+'target_net_%s_param.pth' % step_counter)

def get_batch_data(self):
# 获得一批数据
batch_data = self.replay_memory.sample(batch_size=self.train_batch_size)
return batch_data

def learn(self, episode):
# 同上 .....

def train(self, env):
episode_durations = []
for episode in xrange(self.epoches):
self.current_state = get_screen(2)
for t in count():
state = self.current_state
action, q_value = self.choose_action(self.step_counter, current_state=state)
reward, next_state, terminate = self.apply_action(env, action)
print("Step:{} Q:{} Reward: {} Terminate: {}".format(self.step_counter, q_value, reward, terminate))
self.store_memory(
state=state,
action=action,
next_state=next_state,
reward=reward,
terminate=terminate
)
self.current_state = next_state
# 存储足够记忆后开始训练网络
if len(self.replay_memory) > 10 * self.train_batch_size:
self.learn(episode)
self.step_counter += 1
if terminate:
episode_durations.append(t+1)
break

一切就绪后,就可以配置 OpenAI gym 环境,并调整模型超参数进行DQN训练了,直接使用游戏的图像像素训练通常会比直接使用env提供的状态更难训练,需要的时间也会更长,模型为通用DQN模型,也可以用来训练类似于Flappy Bird的游戏,只需要修改动作空间大小和状态输入即可。

References

[1] Playing Atari with Deep Reinforcement Learning

[2] Human-level control through deep reinforcement learning

[3] Pytorch Tutorial: http://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html