Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/353#discussion_r11528081
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala ---
    @@ -0,0 +1,259 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.optimization
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import breeze.linalg.{DenseVector => BDV, axpy}
    +import breeze.optimize.{CachedDiffFunction, DiffFunction}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +
    +/**
    + * Class used to solve an optimization problem using Limited-memory BFGS.
    + * Reference: [[http://en.wikipedia.org/wiki/Limited-memory_BFGS]]
    + * @param gradient Gradient function to be used.
    + * @param updater Updater to be used to update weights after every 
iteration.
    + */
    +class LBFGS(private var gradient: Gradient, private var updater: Updater)
    +  extends Optimizer with Logging {
    +
    +  private var numCorrections = 10
    +  private var convergenceTol = 1E-4
    +  private var maxNumIterations = 100
    +  private var regParam = 0.0
    +  private var miniBatchFraction = 1.0
    +
    +  /**
    +   * Set the number of corrections used in the LBFGS update. Default 10.
    +   * Values of numCorrections less than 3 are not recommended; large values
    +   * of numCorrections will result in excessive computing time.
    +   * 3 < numCorrections < 10 is recommended.
    +   * Restriction: numCorrections > 0
    +   */
    +  def setNumCorrections(corrections: Int): this.type = {
    +    assert(corrections > 0)
    +    this.numCorrections = corrections
    +    this
    +  }
    +
    +  /**
    +   * Set fraction of data to be used for each L-BFGS iteration. Default 
1.0.
    +   */
    +  def setMiniBatchFraction(fraction: Double): this.type = {
    +    this.miniBatchFraction = fraction
    +    this
    +  }
    +
    +  /**
    +   * Set the convergence tolerance of iterations for L-BFGS. Default 1E-4.
    +   * Smaller value will lead to higher accuracy with the cost of more 
iterations.
    +   */
    +  def setConvergenceTol(tolerance: Int): this.type = {
    +    this.convergenceTol = tolerance
    +    this
    +  }
    +
    +  /**
    +   * Set the maximal number of iterations for L-BFGS. Default 100.
    +   */
    +  def setMaxNumIterations(iters: Int): this.type = {
    +    this.maxNumIterations = iters
    +    this
    +  }
    +
    +  /**
    +   * Set the regularization parameter. Default 0.0.
    +   */
    +  def setRegParam(regParam: Double): this.type = {
    +    this.regParam = regParam
    +    this
    +  }
    +
    +  /**
    +   * Set the gradient function (of the loss function of one single data 
example)
    +   * to be used for L-BFGS.
    +   */
    +  def setGradient(gradient: Gradient): this.type = {
    +    this.gradient = gradient
    +    this
    +  }
    +
    +  /**
    +   * Set the updater function to actually perform a gradient step in a 
given direction.
    +   * The updater is responsible to perform the update from the 
regularization term as well,
    +   * and therefore determines what kind or regularization is used, if any.
    +   */
    +  def setUpdater(updater: Updater): this.type = {
    +    this.updater = updater
    +    this
    +  }
    +
    +  override def optimize(data: RDD[(Double, Vector)], initialWeights: 
Vector): Vector = {
    +    val (weights, _) = LBFGS.runMiniBatchLBFGS(
    +      data,
    +      gradient,
    +      updater,
    +      numCorrections,
    +      convergenceTol,
    +      maxNumIterations,
    +      regParam,
    +      miniBatchFraction,
    +      initialWeights)
    +    weights
    +  }
    +
    +}
    +
    +/**
    + * Top-level method to run LBFGS.
    + */
    +object LBFGS extends Logging {
    +  /**
    +   * Run Limited-memory BFGS (L-BFGS) in parallel using mini batches.
    +   * In each iteration, we sample a subset (fraction miniBatchFraction) of 
the total data
    +   * in order to compute a gradient estimate.
    +   * Sampling, and averaging the subgradients over this subset is 
performed using one standard
    +   * spark map-reduce in each iteration.
    +   *
    +   * @param data - Input data for L-BFGS. RDD of the set of data examples, 
each of
    +   *               the form (label, [feature values]).
    +   * @param gradient - Gradient object (used to compute the gradient of 
the loss function of
    +   *                   one single data example)
    +   * @param updater - Updater function to actually perform a gradient step 
in a given direction.
    +   * @param numCorrections - The number of corrections used in the L-BFGS 
update.
    +   * @param convergenceTol - The convergence tolerance of iterations for 
L-BFGS
    +   * @param maxNumIterations - Maximal number of iterations that L-BFGS 
can be run.
    +   * @param regParam - Regularization parameter
    +   * @param miniBatchFraction - Fraction of the input data set that should 
be used for
    +   *                          one iteration of L-BFGS. Default value 1.0.
    +   *
    +   * @return A tuple containing two elements. The first element is a 
column matrix containing
    +   *         weights for every feature, and the second element is an array 
containing the loss
    +   *         computed for every iteration.
    +   */
    +  def runMiniBatchLBFGS(
    +    data: RDD[(Double, Vector)],
    +    gradient: Gradient,
    +    updater: Updater,
    +    numCorrections: Int,
    +    convergenceTol: Double,
    +    maxNumIterations: Int,
    +    regParam: Double,
    +    miniBatchFraction: Double,
    +    initialWeights: Vector): (Vector, Array[Double]) = {
    +
    +    val lossHistory = new ArrayBuffer[Double](maxNumIterations)
    +
    +    val nexamples: Long = data.count()
    +    val miniBatchSize = nexamples * miniBatchFraction
    +
    +    val costFun = new CostFun(
    +      data, gradient, updater, regParam, miniBatchFraction, lossHistory, 
miniBatchSize)
    +
    +    val lbfgs = new breeze.optimize.LBFGS[BDV[Double]](
    +      maxIter = maxNumIterations, m = numCorrections, tolerance = 
convergenceTol)
    +
    +    val weights = Vectors.fromBreeze(
    +      lbfgs.minimize(new CachedDiffFunction(costFun), 
initialWeights.toBreeze.toDenseVector))
    +
    +    logInfo("LBFGS.runMiniBatchSGD finished. Last 10 losses %s".format(
    +      lossHistory.takeRight(10).mkString(", ")))
    +
    +    (weights, lossHistory.toArray)
    +  }
    +
    +  /**
    +   * CostFun implements Breeze's DiffFunction[T], which returns the loss 
and gradient
    +   * at a particular point (weights). It's used in Breeze's convex 
optimization routines.
    +   */
    +  private class CostFun(
    +    data: RDD[(Double, Vector)],
    +    gradient: Gradient,
    +    updater: Updater,
    +    regParam: Double,
    +    miniBatchFraction: Double,
    +    lossHistory: ArrayBuffer[Double],
    +    miniBatchSize: Double) extends DiffFunction[BDV[Double]] {
    +
    +    private var i = 0
    +
    +    override def calculate(weights: BDV[Double]) = {
    +      // Have a local copy to avoid the serialization of CostFun object 
which is not serializable.
    +      val localData = data
    +      val localGradient = gradient
    +
    +      val (gradientSum, lossSum) = localData.sample(false, 
miniBatchFraction, 42 + i)
    +        .aggregate((BDV.zeros[Double](weights.size), 0.0))(
    +          seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, 
features)) =>
    +            val l = localGradient.compute(
    +              features, label, Vectors.fromBreeze(weights), 
Vectors.fromBreeze(grad))
    +            (grad, loss + l)
    +          },
    +          combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), 
(grad2, loss2)) =>
    +            (grad1 += grad2, loss1 + loss2)
    +          })
    +
    +      /**
    +       * regVal is sum of weight squares if it's L2 updater;
    +       * for other updater, the same logic is followed.
    +       */
    +      val regVal = updater.compute(
    +        Vectors.fromBreeze(weights),
    +        Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
    +
    +      val loss = lossSum / miniBatchSize + regVal
    +      /**
    +       * It will return the gradient part of regularization using updater.
    +       *
    +       * Given the input parameters, the updater basically does the 
following,
    +       *
    +       * w' = w - thisIterStepSize * (gradient + regGradient(w))
    +       * Note that regGradient is function of w
    +       *
    +       * If we set gradient = 0, thisIterStepSize = 1, then
    +       *
    +       * regGradient(w) = w - w'
    +       *
    +       * TODO: We need to clean it up by separating the logic of 
regularization out
    +       *       from updater to regularizer.
    +       */
    +      // The following gradientTotal is actually the regularization part 
of gradient.
    +      // Will add the gradientSum computed from the data with weights in 
the next step.
    +      val gradientTotal = weights - updater.compute(
    +        Vectors.fromBreeze(weights),
    +        Vectors.dense(new Array[Double](weights.size)), 1, 1, 
regParam)._1.toBreeze
    +
    +      // gradientTotal = gradientSum / miniBatchSize + gradientTotal
    +      axpy(1.0 / miniBatchSize, gradientSum, gradientTotal)
    +
    +      /**
    +       * NOTE: lossSum and loss is computed using the weights from the 
previous iteration
    +       * and regVal is the regularization value computed in the previous 
iteration as well.
    +       */
    +      lossHistory.append(loss)
    +
    +      i += 1
    +
    +      (loss, gradientTotal)
    +    }
    +  }
    +
    +}
    --- End diff --
    
    You need exact one newline character at the end of file. There were two 
before and now zero of them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to