Repository: flink Updated Branches: refs/heads/master 35ea6505c -> 1e574750d
[FLINK-1992] [ml] Adds convergence criterion to Flink's SGD algorithm Added test case for convergence This closes #692. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b3b6a9da Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b3b6a9da Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b3b6a9da Branch: refs/heads/master Commit: b3b6a9da0532d884f7d633530c11cda15aa6bc1b Parents: 35ea650 Author: Theodore Vasiloudis <t...@sics.se> Authored: Wed May 13 17:29:31 2015 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Fri May 22 09:37:20 2015 +0200 ---------------------------------------------------------------------- .../flink/ml/optimization/GradientDescent.scala | 185 ++++++++++++------- .../flink/ml/optimization/LossFunction.scala | 32 +++- .../flink/ml/optimization/Regularization.scala | 32 +++- .../apache/flink/ml/optimization/Solver.scala | 98 ++++++++-- .../optimization/GradientDescentITSuite.scala | 132 ++++++++----- .../ml/optimization/LossFunctionITSuite.scala | 13 +- .../ml/optimization/RegularizationITSuite.scala | 12 +- 7 files changed, 374 insertions(+), 130 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/b3b6a9da/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala index eff519e..ef171f5 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala @@ -24,7 +24,7 @@ import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration import org.apache.flink.ml.common._ import org.apache.flink.ml.math._ -import org.apache.flink.ml.optimization.IterativeSolver.{Iterations, Stepsize} +import org.apache.flink.ml.optimization.IterativeSolver.{ConvergenceThreshold, Iterations, Stepsize} import org.apache.flink.ml.optimization.Solver._ /** This [[Solver]] performs Stochastic Gradient Descent optimization using mini batches @@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._ * At the moment, the whole partition is used for SGD, making it effectively a batch gradient * descent. Once a sampling operator has been introduced, the algorithm can be optimized * - * @param runParameters The parameters to tune the algorithm. Currently these include: + * The parameters to tune the algorithm are: * [[Solver.LossFunction]] for the loss function to be used, * [[Solver.RegularizationType]] for the type of regularization, * [[Solver.RegularizationParameter]] for the regularization parameter, * [[IterativeSolver.Iterations]] for the maximum number of iteration, * [[IterativeSolver.Stepsize]] for the learning rate used. + * [[IterativeSolver.ConvergenceThreshold]] when provided the algorithm will + * stop the iterations if the relative change in the value of the objective + * function between successive iterations is is smaller than this value. */ -class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { +class GradientDescent() extends IterativeSolver { import Solver.WEIGHTVECTOR_BROADCAST - var parameterMap: ParameterMap = parameters ++ runParameters - /** Performs one iteration of Stochastic Gradient Descent using mini batches * * @param data A Dataset of LabeledVector (label, features) pairs @@ -76,51 +77,95 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs - * @param initWeights The initial weights that will be optimized + * @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], - initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { - // TODO: Faster way to do this? - val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b) - - val numberOfIterations: Int = parameterMap(Iterations) + initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { + val numberOfIterations: Int = parameters(Iterations) + val convergenceThresholdOption: Option[Double] = parameters.get(ConvergenceThreshold) // Initialize weights - val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) => { - wvDS.map{wv => { - val denseWeights = wv.weights match { - case dv: DenseVector => dv - case sv: SparseVector => sv.toDenseVector + val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + + // Perform the iterations + val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None => + initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS => { + SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) => + // Calculates the regularized loss, from the data and given weights + def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): + DataSet[Double] = { + data + .map {new LossCalculation}.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) + .reduce { + (left, right) => + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } + .map{new RegularizedLossCalculation}.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } - } - case None => createInitialWeightVector(dimensionsDS) - } - - // Perform the iterations - // TODO: Enable convergence stopping criterion, as in Multiple Linear regression - initialWeightsDS.iterate(numberOfIterations) { - weightVector => { - SGDStep(data, weightVector) - } + // We have to calculate for each weight vector the sum of squared residuals, + // and then sum them and apply regularization + val initialLossSumDS = lossCalculation(data, initialWeightsDS) + + // Combine weight vector with the current loss + val initialWeightsWithLossSum = initialWeightsDS. + crossWithTiny(initialLossSumDS).setParallelism(1) + + val resultWithLoss = initialWeightsWithLossSum. + iterateWithTermination(numberOfIterations) { + weightsWithLossSum => + + // Extract weight vector and loss + val previousWeightsDS = weightsWithLossSum.map{_._1} + val previousLossSumDS = weightsWithLossSum.map{_._2} + + val currentWeightsDS = SGDStep(data, previousWeightsDS) + + val currentLossSumDS = lossCalculation(data, currentWeightsDS) + + // Check if the relative change in the loss is smaller than the + // convergence threshold. If yes, then terminate i.e. return empty termination data set + val termination = previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1). + filter{ + pair => { + val (previousLoss, currentLoss) = pair + + if (previousLoss <= 0) { + false + } else { + math.abs((previousLoss - currentLoss)/previousLoss) >= convergence + } + } + } + + // Result for new iteration + (currentWeightsDS cross currentLossSumDS, termination) + } + // Return just the weights + resultWithLoss.map{_._1} } + optimizedWeights } - /** Mapping function that calculates the weight gradients from the data. + /** Calculates the loss value, given a labeled vector and the current weight vector * + * The weight vector is received as a broadcast variable. */ - private class GradientCalculation extends - RichMapFunction[LabeledVector, (WeightVector, Double, Int)] { + private class LossCalculation extends RichMapFunction[LabeledVector, (Double, Int)] { var weightVector: WeightVector = null @@ -132,30 +177,50 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { weightVector = list.get(0) } - override def map(example: LabeledVector): (WeightVector, Double, Int) = { - - val lossFunction = parameterMap(LossFunction) - val regType = parameterMap(RegularizationType) - val regParameter = parameterMap(RegularizationParameter) - val predictionFunction = parameterMap(PredictionFunctionParameter) - val dimensions = example.vector.size - // TODO(tvas): Any point in carrying the weightGradient vector for in-place replacement? - // The idea in spark is to avoid object creation, but here we have to do it anyway - val weightGradient = new DenseVector(new Array[Double](dimensions)) - - // TODO(tvas): Indentation here? - val (loss, lossDeriv) = lossFunction.lossAndGradient( - example, - weightVector, - weightGradient, - regType, - regParameter, - predictionFunction) - - (new WeightVector(weightGradient, lossDeriv), loss, 1) + override def map(example: LabeledVector): (Double, Int) = { + val lossFunction = parameters(LossFunction) + val predictionFunction = parameters(PredictionFunction) + + val loss = lossFunction.lossValue( + example, + weightVector, + predictionFunction) + + (loss, 1) } } +/** Calculates the regularized loss value, given the loss and the current weight vector + * + * The weight vector is received as a broadcast variable. + */ +private class RegularizedLossCalculation extends RichMapFunction[(Double, Int), Double] { + + var weightVector: WeightVector = null + + @throws(classOf[Exception]) + override def open(configuration: Configuration): Unit = { + val list = this.getRuntimeContext. + getBroadcastVariable[WeightVector](WEIGHTVECTOR_BROADCAST) + + weightVector = list.get(0) + } + + override def map(lossAndCount: (Double, Int)): Double = { + val (lossSum, count) = lossAndCount + val regType = parameters(RegularizationType) + val regParameter = parameters(RegularizationParameter) + + val regularizedLoss = { + regType.regLoss( + lossSum/count, + weightVector.weights, + regParameter) + } + regularizedLoss + } +} + /** Performs the update of the weights, according to the given gradients and regularization type. * */ @@ -173,9 +238,9 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { } override def map(gradientLossAndCount: (WeightVector, Double, Int)): WeightVector = { - val regType = parameterMap(RegularizationType) - val regParameter = parameterMap(RegularizationParameter) - val stepsize = parameterMap(Stepsize) + val regType = parameters(RegularizationType) + val regParameter = parameters(RegularizationParameter) + val stepsize = parameters(Stepsize) val weightGradients = gradientLossAndCount._1 val lossSum = gradientLossAndCount._2 val count = gradientLossAndCount._3 @@ -231,11 +296,7 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { object GradientDescent { def apply(): GradientDescent = { - new GradientDescent(new ParameterMap()) - } - - def apply(parameterMap: ParameterMap): GradientDescent = { - new GradientDescent(parameterMap) + new GradientDescent() } } http://git-wip-us.apache.org/repos/asf/flink/blob/b3b6a9da/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala index 1bb6152..d612b90 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala @@ -51,6 +51,8 @@ abstract class LossFunction extends Serializable{ * @param example The features and the label associated with the example * @param weights The current weight vector * @param cumGradient The vector to which the gradient will be added to, in place. + * @param predictionFunction A [[PredictionFunction]] object which provides a way to calculate + * a prediction and its gradient from the features and weights * @return A tuple containing the computed loss as its first element and a the loss derivative as * its second element. The gradient is updated in-place. */ @@ -58,8 +60,6 @@ abstract class LossFunction extends Serializable{ example: LabeledVector, weights: WeightVector, cumGradient: FlinkVector, - regType: Regularization, - regParameter: Double, predictionFunction: PredictionFunction): (Double, Double) = { val features = example.vector @@ -87,6 +87,28 @@ abstract class LossFunction extends Serializable{ BLAS.axpy(restrictedLossDeriv, predictionGradient, cumGradient) (lossValue, lossDeriv) } + + /** Compute the loss for the given data. + * + * @param example The features and the label associated with the example + * @param weights The current weight vector + * @param predictionFunction A [[PredictionFunction]] object which provides a way to calculate + * a prediction and its gradient from the features and weights + * @return The calculated loss value + */ + def lossValue( + example: LabeledVector, + weights: WeightVector, + predictionFunction: PredictionFunction): Double = { + val features = example.vector + val label = example.label + // TODO(tvas): We could also provide for the case where we don't want an intercept value + // i.e. data already centered + val prediction = predictionFunction.predict(features, weights) + val lossValue: Double = loss(prediction, label) + lossValue + } + } trait ClassificationLoss extends LossFunction @@ -117,3 +139,9 @@ class SquaredLoss extends RegressionLoss { } } + +object SquaredLoss { + def apply(): SquaredLoss = { + new SquaredLoss + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b3b6a9da/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala index 4ec2452..9e6df4a 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala @@ -96,7 +96,7 @@ abstract class DiffRegularization extends Regularization { } /** Performs no regularization, equivalent to $R(w) = 0$ **/ -class NoRegularization extends Regularization { +class NoRegularization extends DiffRegularization { /** Adds the regularization term to the loss value * * @param loss The loss value, before applying regularization @@ -108,6 +108,24 @@ class NoRegularization extends Regularization { loss: Double, weightVector: FlinkVector, regParameter: Double): Double = {loss} + + /** Adds the regularization gradient term to the loss gradient. The gradient is updated in place. + * + * Since we don't apply any regularization, the gradient will stay the same. + * @param weightVector The current vector of weights + * @param lossGradient The loss gradient, without regularization. Updated in-place. + * @param regParameter The regularization parameter, $\lambda$. + */ + override def regGradient( + weightVector: FlinkVector, + lossGradient: FlinkVector, + regParameter: Double) = {} +} + +object NoRegularization { + def apply(): NoRegularization = { + new NoRegularization + } } /** $L_2$ regularization penalty. @@ -143,6 +161,12 @@ class L2Regularization extends DiffRegularization { } } +object L2Regularization { + def apply(): L2Regularization = { + new L2Regularization + } +} + /** $L_1$ regularization penalty. * * The $L_1$ penalty can be used to drive a number of the solution coefficients to 0, thereby @@ -196,3 +220,9 @@ class L1Regularization extends Regularization { vector.valueIterator.fold(0.0){(a,b) => math.abs(a) + math.abs(b)} } } + +object L1Regularization { + def apply(): L1Regularization = { + new L1Regularization + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b3b6a9da/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala index 580e096..f2cbce3 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala @@ -18,17 +18,21 @@ package org.apache.flink.ml.optimization +import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.scala.DataSet +import org.apache.flink.configuration.Configuration import org.apache.flink.ml.common._ -import org.apache.flink.ml.math.{Vector => FlinkVector, BLAS, DenseVector} +import org.apache.flink.ml.math.{SparseVector, DenseVector} import org.apache.flink.api.scala._ import org.apache.flink.ml.optimization.IterativeSolver._ +// TODO(tvas): Kind of ugly that we have to do this. Why not define the parameters inside the class? import org.apache.flink.ml.optimization.Solver._ /** Base class for optimization algorithms * */ -abstract class Solver extends Serializable with WithParameters { +abstract class Solver() extends Serializable with WithParameters { + /** Provides a solution for the given optimization problem * @@ -40,6 +44,33 @@ abstract class Solver extends Serializable with WithParameters { data: DataSet[LabeledVector], initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] + /** Creates initial weights vector, creating a DataSet with a WeightVector element + * + * @param initialWeights An Option that may contain an initial set of weights + * @param data The data for which we optimize the weights + * @return A DataSet containing a single WeightVector element + */ + def createInitialWeightsDS(initialWeights: Option[DataSet[WeightVector]], + data: DataSet[LabeledVector]): DataSet[WeightVector] = { + // TODO: Faster way to do this? + val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b) + + initialWeights match { + // Ensure provided weight vector is a DenseVector + case Some(wvDS) => + wvDS.map { + wv => { + val denseWeights = wv.weights match { + case dv: DenseVector => dv + case sv: SparseVector => sv.toDenseVector + } + WeightVector(denseWeights, wv.intercept) + } + } + case None => createInitialWeightVector(dimensionsDS) + } + } + /** Creates a DataSet with one zero vector. The zero vector has dimension d, which is given * by the dimensionDS. * @@ -56,23 +87,26 @@ abstract class Solver extends Serializable with WithParameters { } //Setters for parameters - def setLossFunction(lossFunction: LossFunction): Solver = { + // TODO(tvas): Provide an option to fit an intercept or not + def setLossFunction(lossFunction: LossFunction): this.type = { parameters.add(LossFunction, lossFunction) this } - def setRegularizationType(regularization: Regularization): Solver = { + // TODO(tvas): Sanitize the input, i.e. depending on Solver type allow only certain types of + // regularization to be set. + def setRegularizationType(regularization: Regularization): this.type = { parameters.add(RegularizationType, regularization) this } - def setRegularizationParameter(regularizationParameter: Double): Solver = { + def setRegularizationParameter(regularizationParameter: Double): this.type = { parameters.add(RegularizationParameter, regularizationParameter) this } - def setPredictionFunction(predictionFunction: PredictionFunction): Solver = { - parameters.add(PredictionFunctionParameter, predictionFunction) + def setPredictionFunction(predictionFunction: PredictionFunction): this.type = { + parameters.add(PredictionFunction, predictionFunction) this } } @@ -96,7 +130,7 @@ object Solver { val defaultValue = Some(0.0) // TODO(tvas): Properly initialize this, ensure Parameter > 0! } - case object PredictionFunctionParameter extends Parameter[PredictionFunction] { + case object PredictionFunction extends Parameter[PredictionFunction] { val defaultValue = Some(new LinearPrediction) } } @@ -106,18 +140,56 @@ object Solver { * See [[https://en.wikipedia.org/wiki/Iterative_method Iterative Methods on Wikipedia]] for more * info */ -abstract class IterativeSolver extends Solver { +abstract class IterativeSolver() extends Solver { //Setters for parameters - def setIterations(iterations: Int): IterativeSolver = { + def setIterations(iterations: Int): this.type = { parameters.add(Iterations, iterations) this } - def setStepsize(stepsize: Double): IterativeSolver = { + def setStepsize(stepsize: Double): this.type = { parameters.add(Stepsize, stepsize) this } + + def setConvergenceThreshold(convergenceThreshold: Double): this.type = { + parameters.add(ConvergenceThreshold, convergenceThreshold) + this + } + + /** Mapping function that calculates the weight gradients from the data. + * + */ + protected class GradientCalculation + extends RichMapFunction[LabeledVector, (WeightVector, Double, Int)] { + + var weightVector: WeightVector = null + + @throws(classOf[Exception]) + override def open(configuration: Configuration): Unit = { + val list = this.getRuntimeContext. + getBroadcastVariable[WeightVector](WEIGHTVECTOR_BROADCAST) + + weightVector = list.get(0) + } + + override def map(example: LabeledVector): (WeightVector, Double, Int) = { + + val lossFunction = parameters(LossFunction) + val predictionFunction = parameters(PredictionFunction) + val dimensions = example.vector.size + val weightGradient = new DenseVector(new Array[Double](dimensions)) + + val (loss, lossDeriv) = lossFunction.lossAndGradient( + example, + weightVector, + weightGradient, + predictionFunction) + + (new WeightVector(weightGradient, lossDeriv), loss, 1) + } + } } object IterativeSolver { @@ -132,4 +204,8 @@ object IterativeSolver { case object Iterations extends Parameter[Int] { val defaultValue = Some(10) } + + case object ConvergenceThreshold extends Parameter[Double] { + val defaultValue = None + } } http://git-wip-us.apache.org/repos/asf/flink/blob/b3b6a9da/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala index 2734419..bae0288 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala @@ -38,15 +38,12 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { env.setParallelism(2) - val parameters = ParameterMap() - - parameters.add(IterativeSolver.Stepsize, 0.01) - parameters.add(IterativeSolver.Iterations, 2000) - parameters.add(Solver.LossFunction, new SquaredLoss) - parameters.add(Solver.RegularizationType, new L1Regularization) - parameters.add(Solver.RegularizationParameter, 0.3) - - val sgd = GradientDescent(parameters) + val sgd = GradientDescent() + .setStepsize(0.01) + .setIterations(2000) + .setLossFunction(SquaredLoss()) + .setRegularizationType(L1Regularization()) + .setRegularizationParameter(0.3) val inputDS: DataSet[LabeledVector] = env.fromCollection(regularizationData) @@ -72,15 +69,12 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { env.setParallelism(2) - val parameters = ParameterMap() - - parameters.add(IterativeSolver.Stepsize, 0.1) - parameters.add(IterativeSolver.Iterations, 1) - parameters.add(Solver.LossFunction, new SquaredLoss) - parameters.add(Solver.RegularizationType, new L2Regularization) - parameters.add(Solver.RegularizationParameter, 1.0) - - val sgd = GradientDescent(parameters) + val sgd = GradientDescent() + .setStepsize(0.1) + .setIterations(1) + .setLossFunction(SquaredLoss()) + .setRegularizationType(L2Regularization()) + .setRegularizationParameter(1.0) val inputDS: DataSet[LabeledVector] = env.fromElements(LabeledVector(1.0, DenseVector(2.0))) val currentWeights = new WeightVector(DenseVector(1.0), 1.0) @@ -106,15 +100,12 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { env.setParallelism(2) - val parameters = ParameterMap() - - parameters.add(IterativeSolver.Stepsize, 1.0) - parameters.add(IterativeSolver.Iterations, 800) - parameters.add(Solver.LossFunction, new SquaredLoss) - parameters.add(Solver.RegularizationType, new NoRegularization) - parameters.add(Solver.RegularizationParameter, 0.0) - - val sgd = GradientDescent(parameters) + val sgd = GradientDescent() + .setStepsize(1.0) + .setIterations(800) + .setLossFunction(SquaredLoss()) + .setRegularizationType(NoRegularization()) + .setRegularizationParameter(0.0) val inputDS = env.fromCollection(data) val weightDS = sgd.optimize(inputDS, None) @@ -128,7 +119,6 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { val weights = weightVector.weights.asInstanceOf[DenseVector].data val weight0 = weightVector.intercept - expectedWeights zip weights foreach { case (expectedWeight, weight) => weight should be (expectedWeight +- 0.1) @@ -141,15 +131,12 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { env.setParallelism(2) - val parameters = ParameterMap() - - parameters.add(IterativeSolver.Stepsize, 0.0001) - parameters.add(IterativeSolver.Iterations, 100) - parameters.add(Solver.LossFunction, new SquaredLoss) - parameters.add(Solver.RegularizationType, new NoRegularization) - parameters.add(Solver.RegularizationParameter, 0.0) - - val sgd = GradientDescent(parameters) + val sgd = GradientDescent() + .setStepsize(0.0001) + .setIterations(100) + .setLossFunction(SquaredLoss()) + .setRegularizationType(NoRegularization()) + .setRegularizationParameter(0.0) val inputDS = env.fromCollection(noInterceptData) val weightDS = sgd.optimize(inputDS, None) @@ -175,15 +162,12 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { env.setParallelism(2) - val parameters = ParameterMap() - - parameters.add(IterativeSolver.Stepsize, 0.1) - parameters.add(IterativeSolver.Iterations, 1) - parameters.add(Solver.LossFunction, new SquaredLoss) - parameters.add(Solver.RegularizationType, new NoRegularization) - parameters.add(Solver.RegularizationParameter, 0.0) - - val sgd = GradientDescent(parameters) + val sgd = GradientDescent() + .setStepsize(0.1) + .setIterations(1) + .setLossFunction(SquaredLoss()) + .setRegularizationType(NoRegularization()) + .setRegularizationParameter(0.0) val inputDS: DataSet[LabeledVector] = env.fromElements(LabeledVector(1.0, DenseVector(2.0))) val currentWeights = new WeightVector(DenseVector(1.0), 1.0) @@ -205,6 +189,60 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { } - // TODO: Need more corner cases + it should "terminate early if the convergence criterion is reached" in { + // TODO(tvas): We need a better way to check the convergence of the weights. + // Ideally we want to have a Breeze-like system, where the optimizers carry a history and that + // can tell us whether we have converged and at which iteration + + val env = ExecutionEnvironment.getExecutionEnvironment + + env.setParallelism(2) + + val sgdEarlyTerminate = GradientDescent() + .setConvergenceThreshold(1e2) + .setStepsize(1.0) + .setIterations(800) + .setLossFunction(SquaredLoss()) + .setRegularizationType(NoRegularization()) + .setRegularizationParameter(0.0) + + val inputDS = env.fromCollection(data) + + val weightDSEarlyTerminate = sgdEarlyTerminate.optimize(inputDS, None) + + val weightListEarly: Seq[WeightVector] = weightDSEarlyTerminate.collect() + + weightListEarly.size should equal(1) + + val weightVectorEarly: WeightVector = weightListEarly.head + val weightsEarly = weightVectorEarly.weights.asInstanceOf[DenseVector].data + val weight0Early = weightVectorEarly.intercept + + val sgdNoConvergence = GradientDescent() + .setStepsize(1.0) + .setIterations(800) + .setLossFunction(SquaredLoss()) + .setRegularizationType(NoRegularization()) + .setRegularizationParameter(0.0) + + val weightDSNoConvergence = sgdNoConvergence.optimize(inputDS, None) + + val weightListNoConvergence: Seq[WeightVector] = weightDSNoConvergence.collect() + + weightListNoConvergence.size should equal(1) + + val weightVectorNoConvergence: WeightVector = weightListNoConvergence.head + val weightsNoConvergence = weightVectorNoConvergence.weights.asInstanceOf[DenseVector].data + val weight0NoConvergence = weightVectorNoConvergence.intercept + + // Since the first optimizer was set to terminate early, its weights should be different + weightsEarly zip weightsNoConvergence foreach { + case (earlyWeight, weightNoConvergence) => + weightNoConvergence should not be (earlyWeight +- 0.1) + } + weight0NoConvergence should not be (weight0Early +- 0.1) + } + + // TODO: Need more corner cases, see sklearn tests for SGD linear model } http://git-wip-us.apache.org/repos/asf/flink/blob/b3b6a9da/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala index e5509a3..a0921e5 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/LossFunctionITSuite.scala @@ -30,7 +30,7 @@ class LossFunctionITSuite extends FlatSpec with Matchers with FlinkTestBase { behavior of "The optimization Loss Function implementations" - it should "calculate squared loss correctly" in { + it should "calculate squared loss and gradient correctly" in { val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(2) @@ -41,11 +41,18 @@ class LossFunctionITSuite extends FlatSpec with Matchers with FlinkTestBase { val weightVector = new WeightVector(DenseVector(1.0), 1.0) val gradient = DenseVector(0.0) - val (loss, lossDerivative) = squaredLoss.lossAndGradient(example, weightVector, gradient, new - NoRegularization, 0.0, new LinearPrediction) + val (loss, lossDerivative) = squaredLoss.lossAndGradient( + example, + weightVector, + gradient, + new LinearPrediction) + + val onlyLoss = squaredLoss.lossValue(example, weightVector, new LinearPrediction) loss should be (2.0 +- 0.001) + onlyLoss should be (2.0 +- 0.001) + lossDerivative should be (2.0 +- 0.001) gradient.data(0) should be (4.0 +- 0.001) http://git-wip-us.apache.org/repos/asf/flink/blob/b3b6a9da/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationITSuite.scala index ad3ea89..89c77f2 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/RegularizationITSuite.scala @@ -32,7 +32,7 @@ class RegularizationITSuite extends FlatSpec with Matchers with FlinkTestBase { behavior of "The regularization type implementations" - it should "not change the loss when no regularization is used" in { + it should "not change the loss or gradient when no regularization is used" in { val env = ExecutionEnvironment.getExecutionEnvironment @@ -41,14 +41,18 @@ class RegularizationITSuite extends FlatSpec with Matchers with FlinkTestBase { val regularization = new NoRegularization val weightVector = new WeightVector(DenseVector(1.0), 1.0) - val effectiveStepsize = 1.0 - val regParameter = 0.0 + val regParameter = 1.0 val gradient = DenseVector(0.0) val originalLoss = 1.0 - val adjustedLoss = regularization.regLoss(originalLoss, weightVector.weights, regParameter) + val adjustedLoss = regularization.regularizedLossAndGradient( + originalLoss, + weightVector.weights, + gradient, + regParameter) adjustedLoss should be (originalLoss +- 0.0001) + gradient shouldEqual DenseVector(0.0) } it should "correctly apply L1 regularization" in {