spark git commit: [SPARK-8200] [MLLIB] Check for empty RDDs in StreamingLinearAlgorithm
Repository: spark Updated Branches: refs/heads/branch-1.4 2846a357f - 59fc3f197 [SPARK-8200] [MLLIB] Check for empty RDDs in StreamingLinearAlgorithm Test cases for both StreamingLinearRegression and StreamingLogisticRegression, and code fix. Edit: This contribution is my original work and I license the work to the project under the project's open source license. Author: Paavo ppark...@gmail.com Closes #6713 from pparkkin/streamingmodel-empty-rdd and squashes the following commits: ff5cd78 [Paavo] Update strings to use interpolation. db234cf [Paavo] Use !rdd.isEmpty. 54ad89e [Paavo] Test case for empty stream. 393e36f [Paavo] Ignore empty RDDs. 0bfc365 [Paavo] Test case for empty stream. (cherry picked from commit b928f543845ddd39e914a0e8f0b0205fd86100c5) Signed-off-by: Sean Owen so...@cloudera.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59fc3f19 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59fc3f19 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59fc3f19 Branch: refs/heads/branch-1.4 Commit: 59fc3f197247c6c8c40ea7479573af023c89d718 Parents: 2846a35 Author: Paavo ppark...@gmail.com Authored: Wed Jun 10 23:17:42 2015 +0100 Committer: Sean Owen so...@cloudera.com Committed: Wed Jun 10 23:26:54 2015 +0100 -- .../regression/StreamingLinearAlgorithm.scala | 28 +++- .../StreamingLogisticRegressionSuite.scala | 17 .../StreamingLinearRegressionSuite.scala| 18 + 3 files changed, 50 insertions(+), 13 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/59fc3f19/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index cea8f3f..2dd8aca 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -83,21 +83,23 @@ abstract class StreamingLinearAlgorithm[ throw new IllegalArgumentException(Model must be initialized before starting training.) } data.foreachRDD { (rdd, time) = - val initialWeights = -model match { - case Some(m) = -m.weights - case None = -val numFeatures = rdd.first().features.size -Vectors.dense(numFeatures) + if (!rdd.isEmpty) { +val initialWeights = + model match { +case Some(m) = + m.weights +case None = + val numFeatures = rdd.first().features.size + Vectors.dense(numFeatures) + } +model = Some(algorithm.run(rdd, initialWeights)) +logInfo(sModel updated at time ${time.toString}) +val display = model.get.weights.size match { + case x if x 100 = model.get.weights.toArray.take(100).mkString([, ,, ...) + case _ = model.get.weights.toArray.mkString([, ,, ]) } - model = Some(algorithm.run(rdd, initialWeights)) - logInfo(Model updated at time %s.format(time.toString)) - val display = model.get.weights.size match { -case x if x 100 = model.get.weights.toArray.take(100).mkString([, ,, ...) -case _ = model.get.weights.toArray.mkString([, ,, ]) +logInfo(sCurrent model: weights, ${display}) } - logInfo(Current model: weights, %s.format (display)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/59fc3f19/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala index e98b61e..fd65329 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala @@ -158,4 +158,21 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase val error = output.map(batch = batch.map(p = math.abs(p._1 - p._2)).sum / nPoints).toList assert(error.head 0.8 error.last 0.2) } + + // Test empty RDDs in a stream + test(handling empty RDDs in a stream) { +val model = new StreamingLogisticRegressionWithSGD() + .setInitialWeights(Vectors.dense(-0.1)) + .setStepSize(0.01) +
spark git commit: [SPARK-8200] [MLLIB] Check for empty RDDs in StreamingLinearAlgorithm
Repository: spark Updated Branches: refs/heads/master 96a7c888d - b928f5438 [SPARK-8200] [MLLIB] Check for empty RDDs in StreamingLinearAlgorithm Test cases for both StreamingLinearRegression and StreamingLogisticRegression, and code fix. Edit: This contribution is my original work and I license the work to the project under the project's open source license. Author: Paavo ppark...@gmail.com Closes #6713 from pparkkin/streamingmodel-empty-rdd and squashes the following commits: ff5cd78 [Paavo] Update strings to use interpolation. db234cf [Paavo] Use !rdd.isEmpty. 54ad89e [Paavo] Test case for empty stream. 393e36f [Paavo] Ignore empty RDDs. 0bfc365 [Paavo] Test case for empty stream. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b928f543 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b928f543 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b928f543 Branch: refs/heads/master Commit: b928f543845ddd39e914a0e8f0b0205fd86100c5 Parents: 96a7c88 Author: Paavo ppark...@gmail.com Authored: Wed Jun 10 23:17:42 2015 +0100 Committer: Sean Owen so...@cloudera.com Committed: Wed Jun 10 23:17:42 2015 +0100 -- .../regression/StreamingLinearAlgorithm.scala | 14 -- .../StreamingLogisticRegressionSuite.scala| 17 + .../StreamingLinearRegressionSuite.scala | 18 ++ 3 files changed, 43 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b928f543/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala -- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala index aee51bf..141052b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala @@ -83,13 +83,15 @@ abstract class StreamingLinearAlgorithm[ throw new IllegalArgumentException(Model must be initialized before starting training.) } data.foreachRDD { (rdd, time) = - model = Some(algorithm.run(rdd, model.get.weights)) - logInfo(Model updated at time %s.format(time.toString)) - val display = model.get.weights.size match { -case x if x 100 = model.get.weights.toArray.take(100).mkString([, ,, ...) -case _ = model.get.weights.toArray.mkString([, ,, ]) + if (!rdd.isEmpty) { +model = Some(algorithm.run(rdd, model.get.weights)) +logInfo(sModel updated at time ${time.toString}) +val display = model.get.weights.size match { + case x if x 100 = model.get.weights.toArray.take(100).mkString([, ,, ...) + case _ = model.get.weights.toArray.mkString([, ,, ]) +} +logInfo(sCurrent model: weights, ${display}) } - logInfo(Current model: weights, %s.format (display)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/b928f543/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala -- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala index e98b61e..fd65329 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala @@ -158,4 +158,21 @@ class StreamingLogisticRegressionSuite extends SparkFunSuite with TestSuiteBase val error = output.map(batch = batch.map(p = math.abs(p._1 - p._2)).sum / nPoints).toList assert(error.head 0.8 error.last 0.2) } + + // Test empty RDDs in a stream + test(handling empty RDDs in a stream) { +val model = new StreamingLogisticRegressionWithSGD() + .setInitialWeights(Vectors.dense(-0.1)) + .setStepSize(0.01) + .setNumIterations(10) +val numBatches = 10 +val emptyInput = Seq.empty[Seq[LabeledPoint]] +val ssc = setupStreams(emptyInput, + (inputDStream: DStream[LabeledPoint]) = { +model.trainOn(inputDStream) +model.predictOnValues(inputDStream.map(x = (x.label, x.features))) + } +) +val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/b928f543/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala