娱乐天地-娱乐天地注册登录站
NEWS 娱乐天地新闻
当前位置:娱乐天地新闻

Title
tensorflow分布式源码解读4:AdamOptimizer

发布时间:2024-06-04 16:14:19    作者:佚名    点击量:

Adam(Adaptive Moment Estimation)本质上是带有动量项的RMSprop,它利用梯度的一阶矩估计和二阶矩估计动态调整每个参数的学习率。Adam的优点主要在于经过偏置校正后,每次迭代学习率的时候都有个确定范围,使得参数的更新比较平稳。Adam优化器结合了Adagrad善于处理稀疏梯度和RMSprop善于处理非平稳目标的优点。

算法如下:

Adam适合处理大规模的Sparse梯度,这应该怎么理解呢?还有Adam在更新dense特征的梯度和sparse特征的梯度时,有什么区别吗?为了搞清楚这些问题,写了个分布式的parameter-server的例子程序,其中模型有图像特征(稠密特征)和一个稀疏的sparse id特征。

def model(images, emb_feat_id):
    """Define a simple mnist classifier"""
    idx = list(zip(np.arange(0, 32), [0]*32))
    sparse_emb_feat = tf.SparseTensor(indices=idx, values=emb_feat_id, dense_shape=(32, 1))
    emb = tf.get_variable("emb", shape=[100000, 10], initializer=tf.initializers.random_normal(stddev=0.1))
    emb_feat = tf.nn.embedding_lookup_sparse(emb, sparse_emb_feat, sp_weights=None)
    net = tf.concat([images, emb_feat], axis=-1)
    net = tf.layers.dense(net, 500, activation=tf.nn.relu)
    net = tf.layers.dense(net, 500, activation=tf.nn.relu)
    net = tf.layers.dense(net, 10, activation=None)
    return net

def main(_):
    ps_hosts = FLAGS.ps_hosts.split(",")
    worker_hosts = FLAGS.worker_hosts.split(",")

    # create the cluster configured by `ps_hosts' and 'worker_hosts'
    cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})

    #pdb.set_trace()
    # create a server for local task
    server = tf.train.Server(cluster, job_name=FLAGS.job_name,
                             task_index=FLAGS.task_index)

    if FLAGS.job_name == "ps":
        server.join()  # ps hosts only join
    elif FLAGS.job_name == "worker":
        with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % (FLAGS.task_index),
                                                      cluster=cluster)):
            # load mnist dataset
            mnist = read_data_sets("https://zhuanlan.zhihu.com/p/dataset", one_hot=True)

            # the model
            images = tf.placeholder(tf.float32, [None, 784])
            labels = tf.placeholder(tf.int32, [None, 10])
            emb_feat_id = tf.placeholder(tf.int32, [None, ])

            logits = model(images, emb_feat_id)
            loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=labels))

            # The StopAtStepHook handles stopping after running given steps.
            hooks = [tf.train.StopAtStepHook(last_step=2000)]

            global_step = tf.train.get_or_create_global_step()
            optimizer = tf.train.AdamOptimizer(learning_rate=1e-04)
            
            train_op = optimizer.minimize(loss, global_step=global_step,
                                          aggregation_method=tf.AggregationMethod.ADD_N)

            with tf.train.MonitoredTrainingSession(master=server.target,
                                                   is_chief=(FLAGS.task_index == 0),
                                                   checkpoint_dir="ps_debug_checkpoint_dir",
                                                   hooks=hooks) as mon_sess:
                while not mon_sess.should_stop():
                    # mon_sess.run handles AbortedError in case of preempted PS.
                    img_batch, label_batch = mnist.train.next_batch(32)
                    emb_id = np.random.randint(0, 100000, 32)
                    _, ls, step = mon_sess.run([train_op, loss, global_step],
                    print("sess run one time end")
                    if step % 100 == 0:
                        print("Train step %d, loss: %f" % (step, ls))

if __name__ == "__main__":
    tf.app.run()

例子中的model会有500*10这个dense类型的variable和一个100000*10的稀疏类型的SparseVariable,我们知道对于这两种不同类型的tensor,Adam算法在更新他们梯度的时候调用的是不同的更新参数方法。那么在哪里可以体现出这样的逻辑呢,我仔细阅读源码后发现这样一段代码:

def update_op(self, optimizer, g):
    if isinstance(g, ops.Tensor):
      update_op = optimizer._apply_dense(g, self._v)  # pylint: disable=protected-access
      if self._v.constraint is not None:
        with ops.control_dependencies([update_op]):
          return self._v.assign(self._v.constraint(self._v))
      else:
        return update_op
    else:
      assert isinstance(g, ops.IndexedSlices), ("Gradient ", g, " is neither a "
                                                "tensor nor IndexedSlices.")
      if self._v.constraint is not None:
        raise RuntimeError(
            "Cannot use a constraint function on a sparse variable.")
      # pylint: disable=protected-access
      return optimizer._apply_sparse_duplicate_indices(g, self._v)

原来是在optimizer.py里面的update_op函数实现了这样的逻辑,update_op函数的作用是根据grad(梯度)去更新参数variables一个调用方法。这段代码的意思其实很简单,当传来的梯度是普通tensor时,调用_apply_dense方法去更新参数;当传来的梯度是IndexedSlices类型时,则去调用optimizer._apply_sparse_duplicate_indices函数。其中IndexedSlices类型是一种可以存储稀疏矩阵的数据结构,只需要存储对应的行号和相应的值即可。这样存储有什么好处呢?比如我们的model里面的100000*10大小的embedding矩阵,当前来了个batch,lookup的index行号是[0, 201, 301],那么在更新整个embedding参数的时候,其实只需更新这三行的参数即可。所以IndexedSlices其实只存储了index=[0, 201, 301],和对应3*10大小的梯度 。那么这里有个问题,当前batch的行号如果是[0, 201, 201, 301],有重复的index号怎么办呢?其实只需将重复位置的梯度加起来即可。至于其它行的参数对应的梯度在这一次的更新过程中是置0的。代码实现如下:

def _apply_sparse_duplicate_indices(self, grad, var):
    summed_values, unique_indices = _deduplicate_indexed_slices(
        values=grad.values, indices=grad.indices)
    gradient_no_duplicate_indices = ops.IndexedSlices(
        indices=unique_indices,
        values=summed_values,
        dense_shape=grad.dense_shape)
    return self._apply_sparse(gradient_no_duplicate_indices, var)

def _deduplicate_indexed_slices(values, indices):
  """Sums `values` associated with any non-unique `indices`.
  Returns:
    A tuple of (`summed_values`, `unique_indices`) where `unique_indices` is a
    de-duplicated version of `indices` and `summed_values` contains the sum of
    `values` slices associated with each unique index.
  """
  unique_indices, new_index_positions = array_ops.unique(indices)
  summed_values = math_ops.unsorted_segment_sum(
      values, new_index_positions,
      array_ops.shape(unique_indices)[0])
  return (summed_values, unique_indices)

下面_apply_dense是更新dense变量的方法,_apply_sparse_shared则是更新IndexedSlices变量的方法,可看出对于稀疏的IndexedSlices变量在更新值的时候确实是只去更新当前batch数据中那些对应行的参数。

def _apply_dense(self, grad, var):
    m = self.get_slot(var, "m")
    v = self.get_slot(var, "v")
    beta1_power, beta2_power = self._get_beta_accumulators()
    return training_ops.apply_adam(
        var, m, v,
        math_ops.cast(beta1_power, var.dtype.base_dtype),
        math_ops.cast(beta2_power, var.dtype.base_dtype),
        math_ops.cast(self._lr_t, var.dtype.base_dtype),
        math_ops.cast(self._beta1_t, var.dtype.base_dtype),
        math_ops.cast(self._beta2_t, var.dtype.base_dtype),
        math_ops.cast(self._epsilon_t, var.dtype.base_dtype),
        grad, use_locking=self._use_locking).op

def _apply_sparse_shared(self, grad, var, indices, scatter_add):
    beta1_power, beta2_power = self._get_beta_accumulators()
    beta1_power = math_ops.cast(beta1_power, var.dtype.base_dtype)
    beta2_power = math_ops.cast(beta2_power, var.dtype.base_dtype)
    lr_t = math_ops.cast(self._lr_t, var.dtype.base_dtype)
    beta1_t = math_ops.cast(self._beta1_t, var.dtype.base_dtype)
    beta2_t = math_ops.cast(self._beta2_t, var.dtype.base_dtype)
    epsilon_t = math_ops.cast(self._epsilon_t, var.dtype.base_dtype)
    lr = (lr_t * math_ops.sqrt(1 - beta2_power) / (1 - beta1_power))
    # m_t=beta1 * m + (1 - beta1) * g_t
    m = self.get_slot(var, "m")
    m_scaled_g_values = grad * (1 - beta1_t)
    m_t = state_ops.assign(m, m * beta1_t,
                           use_locking=self._use_locking)
    with ops.control_dependencies([m_t]):
      m_t = scatter_add(m, indices, m_scaled_g_values)
    # v_t=beta2 * v + (1 - beta2) * (g_t * g_t)
    v = self.get_slot(var, "v")
    v_scaled_g_values = (grad * grad) * (1 - beta2_t)
    v_t = state_ops.assign(v, v * beta2_t, use_locking=self._use_locking)
    with ops.control_dependencies([v_t]):
      v_t = scatter_add(v, indices, v_scaled_g_values)
    v_sqrt = math_ops.sqrt(v_t)
    var_update = state_ops.assign_sub(var,
                                      lr * m_t / (v_sqrt + epsilon_t),
                                      use_locking=self._use_locking)
    return control_flow_ops.group(*[var_update, m_t, v_t])

举一个简单的example来说明一下这种IndexedSlices类型的梯度是怎么更新的:

import numpy as np
import tensorflow as tf
from tensorflow.python.framework import constant_op
from tensorflow.python.framework import ops
from tensorflow.python.ops import variables
from tensorflow.python.training import adam


if __name__ == '__main__':
    value_a = np.ones(shape=[3, 10])
    indices_a = np.array([0, 3, 8])
    dense_shape_a = [10, 10]
    grad_slices_a = ops.IndexedSlices(constant_op.constant(value_a), constant_op.constant(indices_a),
                                      constant_op.constant(dense_shape_a))

    var_np = np.ones(shape=[10, 10])

    var0 = variables.RefVariable(var_np)
    opt = adam.AdamOptimizer()
    update = opt.apply_gradients(zip([grad_slices_a], [var0]))
    # variables.global_variables_initializer().run()
    sess = tf.Session()
    sess.run(tf.global_variables_initializer())
    print("initial variable is:", sess.run(var0))
    sess.run(update)
    print("update 1 time variable is:", sess.run(var0))


输出:
initial variable is: [[1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 1. 1. 1. 1. 1. 1. 1. 1. 1.]]
update 1 time variable is: [[0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999]
 [1.    1.    1.    1.    1.    1.    1.    1.    1.    1.   ]
 [1.    1.    1.    1.    1.    1.    1.    1.    1.    1.   ]
 [0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999]
 [1.    1.    1.    1.    1.    1.    1.    1.    1.    1.   ]
 [1.    1.    1.    1.    1.    1.    1.    1.    1.    1.   ]
 [1.    1.    1.    1.    1.    1.    1.    1.    1.    1.   ]
 [1.    1.    1.    1.    1.    1.    1.    1.    1.    1.   ]
 [0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999 0.999]
 [1.    1.    1.    1.    1.    1.    1.    1.    1.    1.   ]]

可以很清楚地看到,执行一次梯度更新之后,只有0,3,8这三行的变量值发生了改变。这就是使用IndexedSlices类型的优势。

至于dense类型的变量是调用training_ops.apply_adam这个op实现的,代码如下:

template <typename Device, typename T>
struct ApplyAdamNonCuda {
  void operator()(const Device& d, typename TTypes<T>::Flat var,
                  typename TTypes<T>::Flat m, typename TTypes<T>::Flat v,
                  typename TTypes<T>::ConstScalar beta1_power,
                  typename TTypes<T>::ConstScalar beta2_power,
                  typename TTypes<T>::ConstScalar lr,
                  typename TTypes<T>::ConstScalar beta1,
                  typename TTypes<T>::ConstScalar beta2,
                  typename TTypes<T>::ConstScalar epsilon,
                  typename TTypes<T>::ConstFlat grad, bool use_nesterov) {
    const T alpha = lr() * Eigen::numext::sqrt(T(1) - beta2_power()) /
                    (T(1) - beta1_power());
    // beta1==μ
    // beta2==ν
    // v==n
    // var==θ

    m.device(d) += (grad - m) * (T(1) - beta1());
    v.device(d) += (grad.square() - v) * (T(1) - beta2());
    if (use_nesterov) {
      var.device(d) -= ((grad * (T(1) - beta1()) + beta1() * m) * alpha) /
                       (v.sqrt() + epsilon());
    } else {
      var.device(d) -= (m * alpha) / (v.sqrt() + epsilon());
    }
  }
};

至此,算是搞清楚了Adam对于不同类型的tensor变量的更新流程。大家有什么问题欢迎一起讨论。

返回列表

联系我们

contact us
Copyright © 2012-2018 娱乐天地-娱乐天地注册登录站 版权所有  ICP备案编号:琼ICP备xxxxxxxx号

平台注册入口