Repository: flink
Updated Branches:
  refs/heads/master 35ea6505c -> 1e574750d


[FLINK-1992] [ml] Adds convergence criterion to Flink's SGD algorithm

Added test case for convergence

This closes #692.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3b6a9da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3b6a9da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3b6a9da

Branch: refs/heads/master
Commit: b3b6a9da0532d884f7d633530c11cda15aa6bc1b
Parents: 35ea650
Author: Theodore Vasiloudis <t...@sics.se>
Authored: Wed May 13 17:29:31 2015 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Fri May 22 09:37:20 2015 +0200

----------------------------------------------------------------------
 .../flink/ml/optimization/GradientDescent.scala | 185 ++++++++++++-------
 .../flink/ml/optimization/LossFunction.scala    |  32 +++-
 .../flink/ml/optimization/Regularization.scala  |  32 +++-
 .../apache/flink/ml/optimization/Solver.scala   |  98 ++++++++--
 .../optimization/GradientDescentITSuite.scala   | 132 ++++++++-----
 .../ml/optimization/LossFunctionITSuite.scala   |  13 +-
 .../ml/optimization/RegularizationITSuite.scala |  12 +-
 7 files changed, 374 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b3b6a9da/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
index eff519e..ef171f5 100644
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.ml.common._
 import org.apache.flink.ml.math._
-import org.apache.flink.ml.optimization.IterativeSolver.{Iterations, Stepsize}
+import org.apache.flink.ml.optimization.IterativeSolver.{ConvergenceThreshold, 
Iterations, Stepsize}
 import org.apache.flink.ml.optimization.Solver._
 
 /** This [[Solver]] performs Stochastic Gradient Descent optimization using 
mini batches
@@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._
   * At the moment, the whole partition is used for SGD, making it effectively 
a batch gradient
   * descent. Once a sampling operator has been introduced, the algorithm can 
be optimized
   *
-  * @param runParameters The parameters to tune the algorithm. Currently these 
include:
+  *  The parameters to tune the algorithm are:
   *                      [[Solver.LossFunction]] for the loss function to be 
used,
   *                      [[Solver.RegularizationType]] for the type of 
regularization,
   *                      [[Solver.RegularizationParameter]] for the 
regularization parameter,
   *                      [[IterativeSolver.Iterations]] for the maximum number 
of iteration,
   *                      [[IterativeSolver.Stepsize]] for the learning rate 
used.
+  *                      [[IterativeSolver.ConvergenceThreshold]] when 
provided the algorithm will
+  *                      stop the iterations if the relative change in the 
value of the objective
+  *                      function between successive iterations is is smaller 
than this value.
   */
-class GradientDescent(runParameters: ParameterMap) extends IterativeSolver {
+class GradientDescent() extends IterativeSolver {
 
   import Solver.WEIGHTVECTOR_BROADCAST
 
-  var parameterMap: ParameterMap = parameters ++ runParameters
-
   /** Performs one iteration of Stochastic Gradient Descent using mini batches
     *
     * @param data A Dataset of LabeledVector (label, features) pairs
@@ -76,51 +77,95 @@ class GradientDescent(runParameters: ParameterMap) extends 
IterativeSolver {
     }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
     *
     * @param data A Dataset of LabeledVector (label, features) pairs
-    * @param initWeights The initial weights that will be optimized
+    * @param initialWeights The initial weights that will be optimized
     * @return The weights, optimized for the provided data.
     */
   override def optimize(
     data: DataSet[LabeledVector],
-    initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-    // TODO: Faster way to do this?
-    val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
-
-    val numberOfIterations: Int = parameterMap(Iterations)
+    initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
+    val numberOfIterations: Int = parameters(Iterations)
+    val convergenceThresholdOption: Option[Double] = 
parameters.get(ConvergenceThreshold)
 
     // Initialize weights
-    val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-      // Ensure provided weight vector is a DenseVector
-      case Some(wvDS) => {
-        wvDS.map{wv => {
-          val denseWeights = wv.weights match {
-            case dv: DenseVector => dv
-            case sv: SparseVector => sv.toDenseVector
+    val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+    // Perform the iterations
+    val optimizedWeights = convergenceThresholdOption match {
+      // No convergence criterion
+      case None =>
+        initialWeightsDS.iterate(numberOfIterations) {
+          weightVectorDS => {
+            SGDStep(data, weightVectorDS)
           }
-          WeightVector(denseWeights, wv.intercept)
         }
-
+      case Some(convergence) =>
+        // Calculates the regularized loss, from the data and given weights
+        def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
+        DataSet[Double] = {
+          data
+            .map {new LossCalculation}.withBroadcastSet(weightDS, 
WEIGHTVECTOR_BROADCAST)
+            .reduce {
+              (left, right) =>
+                val (leftLoss, leftCount) = left
+                val (rightLoss, rightCount) = right
+                (leftLoss + rightLoss, rightCount + leftCount)
+            }
+            .map{new RegularizedLossCalculation}.withBroadcastSet(weightDS, 
WEIGHTVECTOR_BROADCAST)
         }
-      }
-      case None => createInitialWeightVector(dimensionsDS)
-    }
-
-    // Perform the iterations
-    // TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
-    initialWeightsDS.iterate(numberOfIterations) {
-      weightVector => {
-        SGDStep(data, weightVector)
-      }
+        // We have to calculate for each weight vector the sum of squared 
residuals,
+        // and then sum them and apply regularization
+        val initialLossSumDS = lossCalculation(data, initialWeightsDS)
+
+        // Combine weight vector with the current loss
+        val initialWeightsWithLossSum = initialWeightsDS.
+          crossWithTiny(initialLossSumDS).setParallelism(1)
+
+        val resultWithLoss = initialWeightsWithLossSum.
+          iterateWithTermination(numberOfIterations) {
+          weightsWithLossSum =>
+
+            // Extract weight vector and loss
+            val previousWeightsDS = weightsWithLossSum.map{_._1}
+            val previousLossSumDS = weightsWithLossSum.map{_._2}
+
+            val currentWeightsDS = SGDStep(data, previousWeightsDS)
+
+            val currentLossSumDS = lossCalculation(data, currentWeightsDS)
+
+            // Check if the relative change in the loss is smaller than the
+            // convergence threshold. If yes, then terminate i.e. return empty 
termination data set
+            val termination = 
previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1).
+              filter{
+              pair => {
+                val (previousLoss, currentLoss) = pair
+
+                if (previousLoss <= 0) {
+                  false
+                } else {
+                  math.abs((previousLoss - currentLoss)/previousLoss) >= 
convergence
+                }
+              }
+            }
+
+            // Result for new iteration
+            (currentWeightsDS cross currentLossSumDS, termination)
+        }
+        // Return just the weights
+        resultWithLoss.map{_._1}
     }
+    optimizedWeights
   }
 
-  /** Mapping function that calculates the weight gradients from the data.
+  /** Calculates the loss value, given a labeled vector and the current weight 
vector
     *
+    * The weight vector is received as a broadcast variable.
     */
-  private class GradientCalculation extends
-    RichMapFunction[LabeledVector, (WeightVector, Double, Int)] {
+  private class LossCalculation extends RichMapFunction[LabeledVector, 
(Double, Int)] {
 
     var weightVector: WeightVector = null
 
@@ -132,30 +177,50 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
       weightVector = list.get(0)
     }
 
-    override def map(example: LabeledVector): (WeightVector, Double, Int) = {
-
-      val lossFunction = parameterMap(LossFunction)
-      val regType = parameterMap(RegularizationType)
-      val regParameter = parameterMap(RegularizationParameter)
-      val predictionFunction = parameterMap(PredictionFunctionParameter)
-      val dimensions = example.vector.size
-      // TODO(tvas): Any point in carrying the weightGradient vector for 
in-place replacement?
-      // The idea in spark is to avoid object creation, but here we have to do 
it anyway
-      val weightGradient = new DenseVector(new Array[Double](dimensions))
-
-      // TODO(tvas): Indentation here?
-      val (loss, lossDeriv) = lossFunction.lossAndGradient(
-                                example,
-                                weightVector,
-                                weightGradient,
-                                regType,
-                                regParameter,
-                                predictionFunction)
-
-      (new WeightVector(weightGradient, lossDeriv), loss, 1)
+    override def map(example: LabeledVector): (Double, Int) = {
+      val lossFunction = parameters(LossFunction)
+      val predictionFunction = parameters(PredictionFunction)
+
+      val loss = lossFunction.lossValue(
+        example,
+        weightVector,
+        predictionFunction)
+
+      (loss, 1)
     }
   }
 
+/** Calculates the regularized loss value, given the loss and the current 
weight vector
+  *
+  * The weight vector is received as a broadcast variable.
+  */
+private class RegularizedLossCalculation extends RichMapFunction[(Double, 
Int), Double] {
+
+  var weightVector: WeightVector = null
+
+  @throws(classOf[Exception])
+  override def open(configuration: Configuration): Unit = {
+    val list = this.getRuntimeContext.
+      getBroadcastVariable[WeightVector](WEIGHTVECTOR_BROADCAST)
+
+    weightVector = list.get(0)
+  }
+
+  override def map(lossAndCount: (Double, Int)): Double = {
+    val (lossSum, count) = lossAndCount
+    val regType = parameters(RegularizationType)
+    val regParameter = parameters(RegularizationParameter)
+
+    val regularizedLoss = {
+      regType.regLoss(
+        lossSum/count,
+        weightVector.weights,
+        regParameter)
+    }
+    regularizedLoss
+  }
+}
+
   /** Performs the update of the weights, according to the given gradients and 
regularization type.
     *
     */
@@ -173,9 +238,9 @@ class GradientDescent(runParameters: ParameterMap) extends 
IterativeSolver {
     }
 
     override def map(gradientLossAndCount: (WeightVector, Double, Int)): 
WeightVector = {
-      val regType = parameterMap(RegularizationType)
-      val regParameter = parameterMap(RegularizationParameter)
-      val stepsize = parameterMap(Stepsize)
+      val regType = parameters(RegularizationType)
+      val regParameter = parameters(RegularizationParameter)
+      val stepsize = parameters(Stepsize)
       val weightGradients = gradientLossAndCount._1
       val lossSum = gradientLossAndCount._2
       val count = gradientLossAndCount._3
@@ -231,11 +296,7 @@ class GradientDescent(runParameters: ParameterMap) extends 
IterativeSolver {
 
 object GradientDescent {
   def apply(): GradientDescent = {
-    new GradientDescent(new ParameterMap())
-  }
-
-  def apply(parameterMap: ParameterMap): GradientDescent = {
-    new GradientDescent(parameterMap)
+    new GradientDescent()
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b3b6a9da/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala
index 1bb6152..d612b90 100644
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala
@@ -51,6 +51,8 @@ abstract class LossFunction extends Serializable{
     * @param example The features and the label associated with the example
     * @param weights The current weight vector
     * @param cumGradient The vector to which the gradient will be added to, in 
place.
+    * @param predictionFunction A [[PredictionFunction]] object which provides 
a way to calculate
+    *                           a prediction and its gradient from the 
features and weights
     * @return A tuple containing the computed loss as its first element and a 
the loss derivative as
     *         its second element. The gradient is updated in-place.
     */
@@ -58,8 +60,6 @@ abstract class LossFunction extends Serializable{
       example: LabeledVector,
       weights: WeightVector,
       cumGradient: FlinkVector,
-      regType: Regularization,
-      regParameter: Double,
       predictionFunction: PredictionFunction):
   (Double, Double) = {
     val features = example.vector
@@ -87,6 +87,28 @@ abstract class LossFunction extends Serializable{
     BLAS.axpy(restrictedLossDeriv, predictionGradient, cumGradient)
     (lossValue, lossDeriv)
   }
+
+  /** Compute the loss for the given data.
+    *
+    * @param example The features and the label associated with the example
+    * @param weights The current weight vector
+    * @param predictionFunction A [[PredictionFunction]] object which provides 
a way to calculate
+    *                           a prediction and its gradient from the 
features and weights
+    * @return The calculated loss value
+    */
+  def lossValue(
+      example: LabeledVector,
+      weights: WeightVector,
+      predictionFunction: PredictionFunction): Double = {
+    val features = example.vector
+    val label = example.label
+    // TODO(tvas): We could also provide for the case where we don't want an 
intercept value
+    // i.e. data already centered
+    val prediction = predictionFunction.predict(features, weights)
+    val lossValue: Double = loss(prediction, label)
+    lossValue
+  }
+
 }
 
 trait ClassificationLoss extends LossFunction
@@ -117,3 +139,9 @@ class SquaredLoss extends RegressionLoss {
   }
 
 }
+
+object SquaredLoss {
+  def apply(): SquaredLoss = {
+    new SquaredLoss
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3b6a9da/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala
index 4ec2452..9e6df4a 100644
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala
@@ -96,7 +96,7 @@ abstract class DiffRegularization extends Regularization {
 }
 
 /** Performs no regularization, equivalent to $R(w) = 0$ **/
-class NoRegularization extends Regularization {
+class NoRegularization extends DiffRegularization {
   /** Adds the regularization term to the loss value
     *
     * @param loss The loss value, before applying regularization
@@ -108,6 +108,24 @@ class NoRegularization extends Regularization {
     loss: Double,
     weightVector: FlinkVector,
     regParameter: Double):  Double = {loss}
+
+  /** Adds the regularization gradient term to the loss gradient. The gradient 
is updated in place.
+    *
+    * Since we don't apply any regularization, the gradient will stay the same.
+    * @param weightVector The current vector of weights
+    * @param lossGradient The loss gradient, without regularization. Updated 
in-place.
+    * @param regParameter The regularization parameter, $\lambda$.
+    */
+  override def regGradient(
+      weightVector: FlinkVector,
+      lossGradient: FlinkVector,
+      regParameter: Double) = {}
+}
+
+object NoRegularization {
+  def apply(): NoRegularization = {
+    new NoRegularization
+  }
 }
 
 /** $L_2$ regularization penalty.
@@ -143,6 +161,12 @@ class L2Regularization extends DiffRegularization {
   }
 }
 
+object L2Regularization {
+  def apply(): L2Regularization = {
+    new L2Regularization
+  }
+}
+
 /** $L_1$ regularization penalty.
   *
   * The $L_1$ penalty can be used to drive a number of the solution 
coefficients to 0, thereby
@@ -196,3 +220,9 @@ class L1Regularization extends Regularization {
     vector.valueIterator.fold(0.0){(a,b) => math.abs(a) + math.abs(b)}
   }
 }
+
+object L1Regularization {
+  def apply(): L1Regularization = {
+    new L1Regularization
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b3b6a9da/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
index 580e096..f2cbce3 100644
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
@@ -18,17 +18,21 @@
 
 package org.apache.flink.ml.optimization
 
+import org.apache.flink.api.common.functions.RichMapFunction
 import org.apache.flink.api.scala.DataSet
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.ml.common._
-import org.apache.flink.ml.math.{Vector => FlinkVector, BLAS, DenseVector}
+import org.apache.flink.ml.math.{SparseVector, DenseVector}
 import org.apache.flink.api.scala._
 import org.apache.flink.ml.optimization.IterativeSolver._
+// TODO(tvas): Kind of ugly that we have to do this. Why not define the 
parameters inside the class?
 import org.apache.flink.ml.optimization.Solver._
 
 /** Base class for optimization algorithms
  *
  */
-abstract class Solver extends Serializable with WithParameters {
+abstract class Solver() extends Serializable with WithParameters {
+
 
   /** Provides a solution for the given optimization problem
     *
@@ -40,6 +44,33 @@ abstract class Solver extends Serializable with 
WithParameters {
     data: DataSet[LabeledVector],
     initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector]
 
+  /** Creates initial weights vector, creating a DataSet with a WeightVector 
element
+    *
+    * @param initialWeights An Option that may contain an initial set of 
weights
+    * @param data The data for which we optimize the weights
+    * @return A DataSet containing a single WeightVector element
+    */
+  def createInitialWeightsDS(initialWeights: Option[DataSet[WeightVector]],
+                             data: DataSet[LabeledVector]):  
DataSet[WeightVector] = {
+    // TODO: Faster way to do this?
+    val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
+
+    initialWeights match {
+      // Ensure provided weight vector is a DenseVector
+      case Some(wvDS) =>
+        wvDS.map {
+          wv => {
+            val denseWeights = wv.weights match {
+              case dv: DenseVector => dv
+              case sv: SparseVector => sv.toDenseVector
+          }
+          WeightVector(denseWeights, wv.intercept)
+          }
+        }
+      case None => createInitialWeightVector(dimensionsDS)
+    }
+  }
+
   /** Creates a DataSet with one zero vector. The zero vector has dimension d, 
which is given
     * by the dimensionDS.
     *
@@ -56,23 +87,26 @@ abstract class Solver extends Serializable with 
WithParameters {
   }
 
   //Setters for parameters
-  def setLossFunction(lossFunction: LossFunction): Solver = {
+  // TODO(tvas): Provide an option to fit an intercept or not
+  def setLossFunction(lossFunction: LossFunction): this.type = {
     parameters.add(LossFunction, lossFunction)
     this
   }
 
-  def setRegularizationType(regularization: Regularization): Solver = {
+  // TODO(tvas): Sanitize the input, i.e. depending on Solver type allow only 
certain types of
+  // regularization to be set.
+  def setRegularizationType(regularization: Regularization): this.type = {
     parameters.add(RegularizationType, regularization)
     this
   }
 
-  def setRegularizationParameter(regularizationParameter: Double): Solver = {
+  def setRegularizationParameter(regularizationParameter: Double): this.type = 
{
     parameters.add(RegularizationParameter, regularizationParameter)
     this
   }
 
-  def setPredictionFunction(predictionFunction: PredictionFunction): Solver = {
-    parameters.add(PredictionFunctionParameter, predictionFunction)
+  def setPredictionFunction(predictionFunction: PredictionFunction): this.type 
= {
+    parameters.add(PredictionFunction, predictionFunction)
     this
   }
 }
@@ -96,7 +130,7 @@ object Solver {
     val defaultValue = Some(0.0) // TODO(tvas): Properly initialize this, 
ensure Parameter > 0!
   }
 
-  case object PredictionFunctionParameter extends 
Parameter[PredictionFunction] {
+  case object PredictionFunction extends Parameter[PredictionFunction] {
     val defaultValue = Some(new LinearPrediction)
   }
 }
@@ -106,18 +140,56 @@ object Solver {
   * See [[https://en.wikipedia.org/wiki/Iterative_method Iterative Methods on 
Wikipedia]] for more
   * info
   */
-abstract class IterativeSolver extends Solver {
+abstract class IterativeSolver() extends Solver {
 
   //Setters for parameters
-  def setIterations(iterations: Int): IterativeSolver = {
+  def setIterations(iterations: Int): this.type = {
     parameters.add(Iterations, iterations)
     this
   }
 
-  def setStepsize(stepsize: Double): IterativeSolver = {
+  def setStepsize(stepsize: Double): this.type = {
     parameters.add(Stepsize, stepsize)
     this
   }
+
+  def setConvergenceThreshold(convergenceThreshold: Double): this.type = {
+    parameters.add(ConvergenceThreshold, convergenceThreshold)
+    this
+  }
+
+  /** Mapping function that calculates the weight gradients from the data.
+    *
+    */
+  protected class GradientCalculation
+    extends RichMapFunction[LabeledVector, (WeightVector, Double, Int)] {
+
+    var weightVector: WeightVector = null
+
+    @throws(classOf[Exception])
+    override def open(configuration: Configuration): Unit = {
+      val list = this.getRuntimeContext.
+        getBroadcastVariable[WeightVector](WEIGHTVECTOR_BROADCAST)
+
+      weightVector = list.get(0)
+    }
+
+    override def map(example: LabeledVector): (WeightVector, Double, Int) = {
+
+      val lossFunction = parameters(LossFunction)
+      val predictionFunction = parameters(PredictionFunction)
+      val dimensions = example.vector.size
+      val weightGradient = new DenseVector(new Array[Double](dimensions))
+
+      val (loss, lossDeriv) = lossFunction.lossAndGradient(
+        example,
+        weightVector,
+        weightGradient,
+        predictionFunction)
+
+      (new WeightVector(weightGradient, lossDeriv), loss, 1)
+    }
+  }
 }
 
 object IterativeSolver {
@@ -132,4 +204,8 @@ object IterativeSolver {
   case object Iterations extends Parameter[Int] {
     val defaultValue = Some(10)
   }
+
+  case object ConvergenceThreshold extends Parameter[Double] {
+    val defaultValue = None
+  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b3b6a9da/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala
 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala
index 2734419..bae0288 100644
--- 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala
+++ 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala
@@ -38,15 +38,12 @@ class GradientDescentITSuite extends FlatSpec with Matchers 
with FlinkTestBase {
 
     env.setParallelism(2)
 
-    val parameters = ParameterMap()
-
-    parameters.add(IterativeSolver.Stepsize, 0.01)
-    parameters.add(IterativeSolver.Iterations, 2000)
-    parameters.add(Solver.LossFunction, new SquaredLoss)
-    parameters.add(Solver.RegularizationType, new L1Regularization)
-    parameters.add(Solver.RegularizationParameter, 0.3)
-
-    val sgd = GradientDescent(parameters)
+    val sgd = GradientDescent()
+      .setStepsize(0.01)
+      .setIterations(2000)
+      .setLossFunction(SquaredLoss())
+      .setRegularizationType(L1Regularization())
+      .setRegularizationParameter(0.3)
 
     val inputDS: DataSet[LabeledVector] = 
env.fromCollection(regularizationData)
 
@@ -72,15 +69,12 @@ class GradientDescentITSuite extends FlatSpec with Matchers 
with FlinkTestBase {
 
     env.setParallelism(2)
 
-    val parameters = ParameterMap()
-
-    parameters.add(IterativeSolver.Stepsize, 0.1)
-    parameters.add(IterativeSolver.Iterations, 1)
-    parameters.add(Solver.LossFunction, new SquaredLoss)
-    parameters.add(Solver.RegularizationType, new L2Regularization)
-    parameters.add(Solver.RegularizationParameter, 1.0)
-
-    val sgd = GradientDescent(parameters)
+    val sgd = GradientDescent()
+      .setStepsize(0.1)
+      .setIterations(1)
+      .setLossFunction(SquaredLoss())
+      .setRegularizationType(L2Regularization())
+      .setRegularizationParameter(1.0)
 
     val inputDS: DataSet[LabeledVector] = env.fromElements(LabeledVector(1.0, 
DenseVector(2.0)))
     val currentWeights = new WeightVector(DenseVector(1.0), 1.0)
@@ -106,15 +100,12 @@ class GradientDescentITSuite extends FlatSpec with 
Matchers with FlinkTestBase {
 
     env.setParallelism(2)
 
-    val parameters = ParameterMap()
-
-    parameters.add(IterativeSolver.Stepsize, 1.0)
-    parameters.add(IterativeSolver.Iterations, 800)
-    parameters.add(Solver.LossFunction, new SquaredLoss)
-    parameters.add(Solver.RegularizationType, new NoRegularization)
-    parameters.add(Solver.RegularizationParameter, 0.0)
-
-    val sgd = GradientDescent(parameters)
+    val sgd = GradientDescent()
+      .setStepsize(1.0)
+      .setIterations(800)
+      .setLossFunction(SquaredLoss())
+      .setRegularizationType(NoRegularization())
+      .setRegularizationParameter(0.0)
 
     val inputDS = env.fromCollection(data)
     val weightDS = sgd.optimize(inputDS, None)
@@ -128,7 +119,6 @@ class GradientDescentITSuite extends FlatSpec with Matchers 
with FlinkTestBase {
     val weights = weightVector.weights.asInstanceOf[DenseVector].data
     val weight0 = weightVector.intercept
 
-
     expectedWeights zip weights foreach {
       case (expectedWeight, weight) =>
         weight should be (expectedWeight +- 0.1)
@@ -141,15 +131,12 @@ class GradientDescentITSuite extends FlatSpec with 
Matchers with FlinkTestBase {
 
     env.setParallelism(2)
 
-    val parameters = ParameterMap()
-
-    parameters.add(IterativeSolver.Stepsize, 0.0001)
-    parameters.add(IterativeSolver.Iterations, 100)
-    parameters.add(Solver.LossFunction, new SquaredLoss)
-    parameters.add(Solver.RegularizationType, new NoRegularization)
-    parameters.add(Solver.RegularizationParameter, 0.0)
-
-    val sgd = GradientDescent(parameters)
+    val sgd = GradientDescent()
+      .setStepsize(0.0001)
+      .setIterations(100)
+      .setLossFunction(SquaredLoss())
+      .setRegularizationType(NoRegularization())
+      .setRegularizationParameter(0.0)
 
     val inputDS = env.fromCollection(noInterceptData)
     val weightDS = sgd.optimize(inputDS, None)
@@ -175,15 +162,12 @@ class GradientDescentITSuite extends FlatSpec with 
Matchers with FlinkTestBase {
 
     env.setParallelism(2)
 
-    val parameters = ParameterMap()
-
-    parameters.add(IterativeSolver.Stepsize, 0.1)
-    parameters.add(IterativeSolver.Iterations, 1)
-    parameters.add(Solver.LossFunction, new SquaredLoss)
-    parameters.add(Solver.RegularizationType, new NoRegularization)
-    parameters.add(Solver.RegularizationParameter, 0.0)
-
-    val sgd = GradientDescent(parameters)
+    val sgd = GradientDescent()
+      .setStepsize(0.1)
+      .setIterations(1)
+      .setLossFunction(SquaredLoss())
+      .setRegularizationType(NoRegularization())
+      .setRegularizationParameter(0.0)
 
     val inputDS: DataSet[LabeledVector] = env.fromElements(LabeledVector(1.0, 
DenseVector(2.0)))
     val currentWeights = new WeightVector(DenseVector(1.0), 1.0)
@@ -205,6 +189,60 @@ class GradientDescentITSuite extends FlatSpec with 
Matchers with FlinkTestBase {
 
   }
 
-  // TODO: Need more corner cases
+  it should "terminate early if the convergence criterion is reached" in {
+    // TODO(tvas): We need a better way to check the convergence of the 
weights.
+    // Ideally we want to have a Breeze-like system, where the optimizers 
carry a history and that
+    // can tell us whether we have converged and at which iteration
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    env.setParallelism(2)
+
+    val sgdEarlyTerminate = GradientDescent()
+      .setConvergenceThreshold(1e2)
+      .setStepsize(1.0)
+      .setIterations(800)
+      .setLossFunction(SquaredLoss())
+      .setRegularizationType(NoRegularization())
+      .setRegularizationParameter(0.0)
+
+    val inputDS = env.fromCollection(data)
+
+    val weightDSEarlyTerminate = sgdEarlyTerminate.optimize(inputDS, None)
+
+    val weightListEarly: Seq[WeightVector] = weightDSEarlyTerminate.collect()
+
+    weightListEarly.size should equal(1)
+
+    val weightVectorEarly: WeightVector = weightListEarly.head
+    val weightsEarly = weightVectorEarly.weights.asInstanceOf[DenseVector].data
+    val weight0Early = weightVectorEarly.intercept
+
+    val sgdNoConvergence = GradientDescent()
+      .setStepsize(1.0)
+      .setIterations(800)
+      .setLossFunction(SquaredLoss())
+      .setRegularizationType(NoRegularization())
+      .setRegularizationParameter(0.0)
+
+    val weightDSNoConvergence = sgdNoConvergence.optimize(inputDS, None)
+
+    val weightListNoConvergence: Seq[WeightVector] = 
weightDSNoConvergence.collect()
+
+    weightListNoConvergence.size should equal(1)
+
+    val weightVectorNoConvergence: WeightVector = weightListNoConvergence.head
+    val weightsNoConvergence = 
weightVectorNoConvergence.weights.asInstanceOf[DenseVector].data
+    val weight0NoConvergence = weightVectorNoConvergence.intercept
+
+    // Since the first optimizer was set to terminate early, its weights 
should be different
+    weightsEarly zip weightsNoConvergence foreach {
+      case (earlyWeight, weightNoConvergence) =>
+        weightNoConvergence should not be (earlyWeight +- 0.1)
+    }
+    weight0NoConvergence should not be (weight0Early +- 0.1)
+  }
+
+  // TODO: Need more corner cases, see sklearn tests for SGD linear model
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b3b6a9da/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala
 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala
index e5509a3..a0921e5 100644
--- 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala
+++ 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala
@@ -30,7 +30,7 @@ class LossFunctionITSuite extends FlatSpec with Matchers with 
FlinkTestBase {
 
   behavior of "The optimization Loss Function implementations"
 
-  it should "calculate squared loss correctly" in {
+  it should "calculate squared loss and gradient correctly" in {
     val env = ExecutionEnvironment.getExecutionEnvironment
 
     env.setParallelism(2)
@@ -41,11 +41,18 @@ class LossFunctionITSuite extends FlatSpec with Matchers 
with FlinkTestBase {
     val weightVector = new WeightVector(DenseVector(1.0), 1.0)
     val gradient = DenseVector(0.0)
 
-    val (loss, lossDerivative) = squaredLoss.lossAndGradient(example, 
weightVector, gradient, new
-        NoRegularization, 0.0, new LinearPrediction)
+    val (loss, lossDerivative) = squaredLoss.lossAndGradient(
+      example,
+      weightVector,
+      gradient,
+      new LinearPrediction)
+
+    val onlyLoss = squaredLoss.lossValue(example, weightVector, new 
LinearPrediction)
 
     loss should be (2.0 +- 0.001)
 
+    onlyLoss should be (2.0 +- 0.001)
+
     lossDerivative should be (2.0 +- 0.001)
 
     gradient.data(0) should be (4.0 +- 0.001)

http://git-wip-us.apache.org/repos/asf/flink/blob/b3b6a9da/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationITSuite.scala
 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationITSuite.scala
index ad3ea89..89c77f2 100644
--- 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationITSuite.scala
+++ 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationITSuite.scala
@@ -32,7 +32,7 @@ class RegularizationITSuite extends FlatSpec with Matchers 
with FlinkTestBase {
 
   behavior of "The regularization type implementations"
 
-  it should "not change the loss when no regularization is used" in {
+  it should "not change the loss or gradient when no regularization is used" 
in {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
 
@@ -41,14 +41,18 @@ class RegularizationITSuite extends FlatSpec with Matchers 
with FlinkTestBase {
     val regularization = new NoRegularization
 
     val weightVector = new WeightVector(DenseVector(1.0), 1.0)
-    val effectiveStepsize = 1.0
-    val regParameter = 0.0
+    val regParameter = 1.0
     val gradient = DenseVector(0.0)
     val originalLoss = 1.0
 
-    val adjustedLoss = regularization.regLoss(originalLoss, 
weightVector.weights, regParameter)
+    val adjustedLoss = regularization.regularizedLossAndGradient(
+      originalLoss,
+      weightVector.weights,
+      gradient,
+      regParameter)
 
     adjustedLoss should be (originalLoss +- 0.0001)
+    gradient shouldEqual DenseVector(0.0)
   }
 
   it should "correctly apply L1 regularization" in {

Reply via email to