发布时间:2024-06-04 16:14:19 作者:佚名 点击量:
Adam(Adaptive Moment Estimation)本质上是带有动量项的RMSprop,它利用梯度的一阶矩估计和二阶矩估计动态调整每个参数的学习率。Adam的优点主要在于经过偏置校正后,每次迭代学习率的时候都有个确定范围,使得参数的更新比较平稳。Adam优化器结合了Adagrad善于处理稀疏梯度和RMSprop善于处理非平稳目标的优点。
算法如下:
Adam适合处理大规模的Sparse梯度,这应该怎么理解呢?还有Adam在更新dense特征的梯度和sparse特征的梯度时,有什么区别吗?为了搞清楚这些问题,写了个分布式的parameter-server的例子程序,其中模型有图像特征(稠密特征)和一个稀疏的sparse id特征。
def model(images, emb_feat_id):
"""Define a simple mnist classifier"""
idx = list(zip(np.arange(0, 32), [0]*32))
sparse_emb_feat = tf.SparseTensor(indices=idx, values=emb_feat_id, dense_shape=(32, 1))
emb = tf.get_variable("emb", shape=[100000, 10], initializer=tf.initializers.random_normal(stddev=0.1))
emb_feat = tf.nn.embedding_lookup_sparse(emb, sparse_emb_feat, sp_weights=None)
net = tf.concat([images, emb_feat], axis=-1)
net = tf.layers.dense(net, 500, activation=tf.nn.relu)
net = tf.layers.dense(net, 500, activation=tf.nn.relu)
net = tf.layers.dense(net, 10, activation=None)
return net
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# create the cluster configured by `ps_hosts' and 'worker_hosts'
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
#pdb.set_trace()
# create a server for local task
server = tf.train.Server(cluster, job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join() # ps hosts only join
elif FLAGS.job_name == "worker":
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % (FLAGS.task_index),
cluster=cluster)):
# load mnist dataset
mnist = read_data_sets("https://zhuanlan.zhihu.com/p/dataset", one_hot=True)
# the model
images = tf.placeholder(tf.float32, [None, 784])
labels = tf.placeholder(tf.int32, [None, 10])
emb_feat_id = tf.placeholder(tf.int32, [None, ])
logits = model(images, emb_feat_id)
loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=labels))
# The StopAtStepHook handles stopping after running given steps.
hooks = [tf.train.StopAtStepHook(last_step=2000)]
global_step = tf.train.get_or_create_global_step()
optimizer = tf.train.AdamOptimizer(learning_rate=1e-04)
train_op = optimizer.minimize(loss, global_step=global_step,
aggregation_method=tf.AggregationMethod.ADD_N)
with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(FLAGS.task_index == 0),
checkpoint_dir="ps_debug_checkpoint_dir",
hooks=hooks) as mon_sess:
while not mon_sess.should_stop():
# mon_sess.run handles AbortedError in case of preempted PS.
img_batch, label_batch = mnist.train.next_batch(32)
emb_id = np.random.randint(0, 100000, 32)
_, ls, step = mon_sess.run([train_op, loss, global_step],
print("sess run one time end")
if step % 100 == 0:
print("Train step %d, loss: %f" % (step, ls))
if __name__ == "__main__":
tf.app.run()
例子中的model会有500*10这个dense类型的variable和一个100000*10的稀疏类型的SparseVariable,我们知道对于这两种不同类型的tensor,Adam算法在更新他们梯度的时候调用的是不同的更新参数方法。那么在哪里可以体现出这样的逻辑呢,我仔细阅读源码后发现这样一段代码:
def update_op(self, optimizer, g):
if isinstance(g, ops.Tensor):
update_op = optimizer._apply_dense(g, self._v) # pylint: disable=protected-access
if self._v.constraint is not None:
with ops.control_dependencies([update_op]):
return self._v.assign(self._v.constraint(self._v))
else:
return update_op
else:
assert isinstance(g, ops.IndexedSlices), ("Gradient ", g, " is neither a "
"tensor nor IndexedSlices.")
if self._v.constraint is not None:
raise RuntimeError(
"Cannot use a constraint function on a sparse variable.")
# pylint: disable=protected-access
return optimizer._apply_sparse_duplicate_indices(g, self._v)
原来是在optimizer.py里面的update_op函数实现了这样的逻辑,update_op函数的作用是根据grad(梯度)去更新参数variables一个调用方法。这段代码的意思其实很简单,当传来的梯度是普通tensor时,调用_apply_dense方法去更新参数;当传来的梯度是IndexedSlices类型时,则去调用optimizer._apply_sparse_duplicate_indices函数。其中IndexedSlices类型是一种可以存储稀疏矩阵的数据结构,只需要存储对应的行号和相应的值即可。这样存储有什么好处呢?比如我们的model里面的100000*10大小的embedding矩阵,当前来了个batch,lookup的index行号是[0, 201, 301],那么在更新整个embedding参数的时候,其实只需更新这三行的参数即可。所以IndexedSlices其实只存储了index=[0, 201, 301],和对应3*10大小的梯度 。那么这里有个问题,当前batch的行号如果是[0, 201, 201, 301],有重复的index号怎么办呢?其实只需将重复位置的梯度加起来即可。至于其它行的参数对应的梯度在这一次的更新过程中是置0的。代码实现如下:
def _apply_sparse_duplicate_indices(self, grad, var):
summed_values, unique_indices = _deduplicate_indexed_slices(
values=grad.values, indices=grad.indices)
gradient_no_duplicate_indices = ops.IndexedSlices(
indices=unique_indices,
values=summed_values,
dense_shape=grad.dense_shape)
return self._apply_sparse(gradient_no_duplicate_indices, var)
def _deduplicate_indexed_slices(values, indices):
"""Sums `values` associated with any non-unique `indices`.
Returns:
A tuple of (`summed_values`, `unique_indices`) where `unique_indices` is a
de-duplicated version of `indices` and `summed_values` contains the sum of
`values` slices associated with each unique index.
"""
unique_indices, new_index_positions = array_ops.unique(indices)
summed_values = math_ops.unsorted_segment_sum(
values, new_index_positions,
array_ops.shape(unique_indices)[0])
return (summed_values, unique_indices)
下面_apply_dense是更新dense变量的方法,_apply_sparse_shared则是更新IndexedSlices变量的方法,可看出对于稀疏的IndexedSlices变量在更新值的时候确实是只去更新当前batch数据中那些对应行的参数。
def _apply_dense(self, grad, var):
m = self.get_slot(var, "m")
v = self.get_slot(var, "v")
beta1_power, beta2_power = self._get_beta_accumulators()
return training_ops.apply_adam(
var, m, v,
math_ops.cast(beta1_power, var.dtype.base_dtype),
math_ops.cast(beta2_power, var.dtype.base_dtype),
math_ops.cast(self._lr_t, var.dtype.base_dtype),
math_ops.cast(self._beta1_t, var.dtype.base_dtype),
math_ops.cast(self._beta2_t, var.dtype.base_dtype),
math_ops.cast(self._epsilon_t, var.dtype.base_dtype),
grad, use_locking=self._use_locking).op
def _apply_sparse_shared(self, grad, var, indices, scatter_add):
beta1_power, beta2_power = self._get_beta_accumulators()
beta1_power = math_ops.cast(beta1_power, var.dtype.base_dtype)
beta2_power = math_ops.cast(beta2_power, var.dtype.base_dtype)
lr_t = math_ops.cast(self._lr_t, var.dtype.base_dtype)
beta1_t = math_ops.cast(self._beta1_t, var.dtype.base_dtype)
beta2_t = math_ops.cast(self._beta2_t, var.dtype.base_dtype)
epsilon_t = math_ops.cast(self._epsilon_t, var.dtype.base_dtype)
lr = (lr_t * math_ops.sqrt(1 - beta2_power) / (1 - beta1_power))
# m_t=beta1 * m + (1 - beta1) * g_t
m = self.get_slot(var, "m")
m_scaled_g_values = grad * (1 - beta1_t)
m_t = state_ops.assign(m, m * beta1_t,
use_locking=self._use_locking)
with ops.control_dependencies([m_t]):
m_t = scatter_add(m, indices, m_scaled_g_values)
# v_t=beta2 * v + (1 - beta2) * (g_t * g_t)
v = self.get_slot(var, "v")
v_scaled_g_values = (grad * grad) * (1 - beta2_t)
v_t = state_ops.assign(v, v * beta2_t, use_locking=self._use_locking)
with ops.control_dependencies([v_t]):
v_t = scatter_add(v, indices, v_scaled_g_values)
v_sqrt = math_ops.sqrt(v_t)
var_update = state_ops.assign_sub(var,
lr * m_t / (v_sqrt + epsilon_t),
use_locking=self._use_locking)
return control_flow_ops.group(*[var_update, m_t, v_t])
举一个简单的example来说明一下这种IndexedSlices类型的梯度是怎么更新的:
import numpy as np
import tensorflow as tf
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import ops
from tensorflow.python.ops import variables
from tensorflow.python.training import adam
if __name__ == '__main__':
value_a = np.ones(shape=[3, 10])
indices_a = np.array([0, 3, 8])
dense_shape_a = [10, 10]
grad_slices_a = ops.IndexedSlices(constant_op.constant(value_a), constant_op.constant(indices_a),
constant_op.constant(dense_shape_a))
var_np = np.ones(shape=[10, 10])
var0 = variables.RefVariable(var_np)
opt = adam.AdamOptimizer()
update = opt.apply_gradients(zip([grad_slices_a], [var0]))
# variables.global_variables_initializer().run()
sess = tf.Session()
sess.run(tf.global_variables_initializer())
print("initial variable is:", sess.run(var0))
sess.run(update)
print("update 1 time variable is:", sess.run(var0))
输出:
initial variable is: [[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]
update 1 time variable is: [[0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]
[0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]
[0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999]
[1. 1. 1. 1. 1. 1. 1. 1. 1. 1. ]]
可以很清楚地看到,执行一次梯度更新之后,只有0,3,8这三行的变量值发生了改变。这就是使用IndexedSlices类型的优势。
至于dense类型的变量是调用training_ops.apply_adam
这个op实现的,代码如下:
template <typename Device, typename T>
struct ApplyAdamNonCuda {
void operator()(const Device& d, typename TTypes<T>::Flat var,
typename TTypes<T>::Flat m, typename TTypes<T>::Flat v,
typename TTypes<T>::ConstScalar beta1_power,
typename TTypes<T>::ConstScalar beta2_power,
typename TTypes<T>::ConstScalar lr,
typename TTypes<T>::ConstScalar beta1,
typename TTypes<T>::ConstScalar beta2,
typename TTypes<T>::ConstScalar epsilon,
typename TTypes<T>::ConstFlat grad, bool use_nesterov) {
const T alpha = lr() * Eigen::numext::sqrt(T(1) - beta2_power()) /
(T(1) - beta1_power());
// beta1==μ
// beta2==ν
// v==n
// var==θ
m.device(d) += (grad - m) * (T(1) - beta1());
v.device(d) += (grad.square() - v) * (T(1) - beta2());
if (use_nesterov) {
var.device(d) -= ((grad * (T(1) - beta1()) + beta1() * m) * alpha) /
(v.sqrt() + epsilon());
} else {
var.device(d) -= (m * alpha) / (v.sqrt() + epsilon());
}
}
};
至此,算是搞清楚了Adam对于不同类型的tensor变量的更新流程。大家有什么问题欢迎一起讨论。
联系我们
contact us地址:广东省广州市天河区88号
电话:400-123-4567
点击图标在线留言,我们会及时回复