Github user staple commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4934#discussion_r26186113
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/optimization/AcceleratedGradientDescent.scala
 ---
    @@ -0,0 +1,237 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import breeze.linalg.{DenseVector => BDV, norm}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * :: DeveloperApi ::
    + * This class optimizes a vector of weights via accelerated (proximal) 
gradient descent.
    + * The implementation is based on TFOCS [[http://cvxr.com/tfocs]], 
described in Becker, Candes, and
    + * Grant 2010.
    + * @param gradient Delegate that computes the loss function value and 
gradient for a vector of
    + *                 weights.
    + * @param updater Delegate that updates weights in the direction of a 
gradient.
    + */
    +@DeveloperApi
    +class AcceleratedGradientDescent (private var gradient: Gradient, private 
var updater: Updater)
    +  extends Optimizer {
    +
    +  private var stepSize: Double = 1.0
    +  private var convergenceTol: Double = 1e-4
    +  private var numIterations: Int = 100
    +  private var regParam: Double = 0.0
    +
    +  /**
    +   * Set the initial step size, used for the first step. Default 1.0.
    +   * On subsequent steps, the step size will be adjusted by the 
acceleration algorithm.
    +   */
    +  def setStepSize(step: Double): this.type = {
    +    this.stepSize = step
    +    this
    +  }
    +
    +  /**
    +   * Set the optimization convergence tolerance. Default 1e-4.
    +   * Smaller values will increase accuracy but require additional 
iterations.
    +   */
    +  def setConvergenceTol(tol: Double): this.type = {
    +    this.convergenceTol = tol
    +    this
    +  }
    +
    +  /**
    +   * Set the maximum number of iterations. Default 100.
    +   */
    +  def setNumIterations(iters: Int): this.type = {
    +    this.numIterations = iters
    +    this
    +  }
    +
    +  /**
    +   * Set the regularization parameter. Default 0.0.
    +   */
    +  def setRegParam(regParam: Double): this.type = {
    +    this.regParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set a Gradient delegate for computing the loss function value and 
gradient.
    +   */
    +  def setGradient(gradient: Gradient): this.type = {
    +    this.gradient = gradient
    +    this
    +  }
    +
    +  /**
    +   * Set an Updater delegate for updating weights in the direction of a 
gradient.
    +   * If regularization is used, the Updater will implement the 
regularization term's proximity
    +   * operator. Thus the type of regularization penalty is configured by 
providing a corresponding
    +   * Updater implementation.
    +   */
    +  def setUpdater(updater: Updater): this.type = {
    +    this.updater = updater
    +    this
    +  }
    +
    +  /**
    +   * Run accelerated gradient descent on the provided training data.
    +   * @param data training data
    +   * @param initialWeights initial weights
    +   * @return solution vector
    +   */
    +  def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): 
Vector = {
    +    val (weights, _) = AcceleratedGradientDescent.run(
    +      data,
    +      gradient,
    +      updater,
    +      stepSize,
    +      convergenceTol,
    +      numIterations,
    +      regParam,
    +      initialWeights)
    +    weights
    +  }
    +}
    +
    +/**
    + * :: DeveloperApi ::
    + * Top-level method to run accelerated (proximal) gradient descent.
    + */
    +@DeveloperApi
    +object AcceleratedGradientDescent extends Logging {
    +  /**
    +   * Run accelerated proximal gradient descent.
    +   * The implementation is based on TFOCS [[http://cvxr.com/tfocs]], 
described in Becker, Candes,
    +   * and Grant 2010. A limited but useful subset of the TFOCS feature set 
is implemented, including
    +   * support for composite loss functions, the Auslender and Teboulle 
acceleration method, and
    +   * automatic restart using the gradient test. A global Lipschitz bound 
is supported in preference
    +   * to local Lipschitz estimation via backtracking. On each iteration, 
the loss function and
    +   * gradient are caclculated from the full training dataset, requiring 
one Spark map reduce.
    +   *
    +   * @param data Input data. RDD containing data examples of the form 
(label, [feature values]).
    +   * @param gradient Delegate that computes the loss function value and 
gradient for a vector of
    +                     weights (for one single data example).
    +   * @param updater Delegate that updates weights in the direction of a 
gradient.
    +   * @param stepSize Initial step size for the first step.
    +   * @param convergenceTol Tolerance for convergence of the optimization 
algorithm. When the norm of
    +   *                       the change in weight vectors between successive 
iterations falls below
    +   *                       this relative tolerance, optimization is 
complete.
    +   * @param numIterations Maximum number of iterations to run the 
algorithm.
    +   * @param regParam The regularization parameter.
    +   * @param initialWeights The initial weight values.
    +   *
    +   * @return A tuple containing two elements. The first element is a 
Vector containing the optimized
    +   *         weight for each feature, and the second element is an array 
containing the approximate
    +   *         loss computed on each iteration.
    +   */
    +  def run(
    +      data: RDD[(Double, Vector)],
    +      gradient: Gradient,
    +      updater: Updater,
    +      stepSize: Double,
    +      convergenceTol: Double,
    +      numIterations: Int,
    +      regParam: Double,
    +      initialWeights: Vector): (Vector, Array[Double]) = {
    +
    +    /** Returns the loss function and gradient for the provided weights 
'x'. */
    +    def applySmooth(x: BDV[Double]): (Double, BDV[Double]) = {
    +      val bcX = data.context.broadcast(Vectors.fromBreeze(x))
    +
    +      // Sum the loss function and gradient computed for each training 
example.
    +      val (loss, grad, count) = data.treeAggregate((0.0, 
BDV.zeros[Double](x.size), 0L))(
    +        seqOp = (c, v) => (c, v) match { case ((loss, grad, count), 
(label, features)) =>
    +          val l = gradient.compute(features, label, bcX.value, 
Vectors.fromBreeze(grad))
    +          (loss + l, grad, count + 1)
    +        },
    +        combOp = (c1, c2) => (c1, c2) match {
    +          case ((loss1, grad1, count1), (loss2, grad2, count2)) =>
    +            (loss1 + loss2, grad1 += grad2, count1 + count2)
    +        })
    +
    +      // Divide the summed loss and gradient by the number of training 
examples.
    +      (loss / count, grad / (count: Double))
    +    }
    +
    +    /**
    +     * Returns the regularization loss and updates weights according to 
the gradient and the
    +     * proximity operator.
    +     */
    +    def applyProjector(x: BDV[Double], g: BDV[Double], step: Double): 
(Double, BDV[Double]) = {
    +      val (weights, regularization) = 
updater.compute(Vectors.fromBreeze(x),
    +                                                      
Vectors.fromBreeze(g),
    +                                                      step,
    +                                                      iter = 1, // Passing 
1 avoids step size
    +                                                                // 
rescaling within the updater.
    +                                                      regParam)
    +      (regularization, BDV[Double](weights.toArray))
    +    }
    +
    +    var x = BDV[Double](initialWeights.toArray)
    +    var z = x
    +    val L = 1.0 / stepSize // Infer a (global) Lipshitz bound from the 
provided stepSize.
    +    var theta = Double.PositiveInfinity
    +    var hasConverged = false
    +    val lossHistory = new ArrayBuffer[Double](numIterations)
    +
    +    for (i <- 1 to numIterations if !hasConverged) {
    +
    +      // Auslender and Teboulle's accelerated method.
    +      val (x_old, z_old) = (x, z)
    +      theta = 2.0 / (1.0 + math.sqrt(1.0 + 4.0 / (theta * theta)))
    +      val y = x_old * (1.0 - theta) + z_old * theta
    +      val (f_y, g_y) = applySmooth(y)
    +      val step = 1.0 / (theta * L)
    +      z = applyProjector(z_old, g_y, step)._2
    +      x = x_old * (1.0 - theta) + z * theta
    +      val d_x = x - x_old
    +
    +      // Track loss history using the loss function at y, since f_y is 
already avaialble and
    --- End diff --
    
    Oops looks like a typo: 'avaialble'


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to