spark git commit: [SPARK-8200] [MLLIB] Check for empty RDDs in StreamingLinearAlgorithm

2015-06-10 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.4 2846a357f - 59fc3f197


[SPARK-8200] [MLLIB] Check for empty RDDs in StreamingLinearAlgorithm

Test cases for both StreamingLinearRegression and StreamingLogisticRegression, 
and code fix.

Edit:
This contribution is my original work and I license the work to the project 
under the project's open source license.

Author: Paavo ppark...@gmail.com

Closes #6713 from pparkkin/streamingmodel-empty-rdd and squashes the following 
commits:

ff5cd78 [Paavo] Update strings to use interpolation.
db234cf [Paavo] Use !rdd.isEmpty.
54ad89e [Paavo] Test case for empty stream.
393e36f [Paavo] Ignore empty RDDs.
0bfc365 [Paavo] Test case for empty stream.

(cherry picked from commit b928f543845ddd39e914a0e8f0b0205fd86100c5)
Signed-off-by: Sean Owen so...@cloudera.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/59fc3f19
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/59fc3f19
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/59fc3f19

Branch: refs/heads/branch-1.4
Commit: 59fc3f197247c6c8c40ea7479573af023c89d718
Parents: 2846a35
Author: Paavo ppark...@gmail.com
Authored: Wed Jun 10 23:17:42 2015 +0100
Committer: Sean Owen so...@cloudera.com
Committed: Wed Jun 10 23:26:54 2015 +0100

--
 .../regression/StreamingLinearAlgorithm.scala   | 28 +++-
 .../StreamingLogisticRegressionSuite.scala  | 17 
 .../StreamingLinearRegressionSuite.scala| 18 +
 3 files changed, 50 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/59fc3f19/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
index cea8f3f..2dd8aca 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
@@ -83,21 +83,23 @@ abstract class StreamingLinearAlgorithm[
   throw new IllegalArgumentException(Model must be initialized before 
starting training.)
 }
 data.foreachRDD { (rdd, time) =
-  val initialWeights =
-model match {
-  case Some(m) =
-m.weights
-  case None =
-val numFeatures = rdd.first().features.size
-Vectors.dense(numFeatures)
+  if (!rdd.isEmpty) {
+val initialWeights =
+  model match {
+case Some(m) =
+  m.weights
+case None =
+  val numFeatures = rdd.first().features.size
+  Vectors.dense(numFeatures)
+  }
+model = Some(algorithm.run(rdd, initialWeights))
+logInfo(sModel updated at time ${time.toString})
+val display = model.get.weights.size match {
+  case x if x  100 = 
model.get.weights.toArray.take(100).mkString([, ,, ...)
+  case _ = model.get.weights.toArray.mkString([, ,, ])
 }
-  model = Some(algorithm.run(rdd, initialWeights))
-  logInfo(Model updated at time %s.format(time.toString))
-  val display = model.get.weights.size match {
-case x if x  100 = model.get.weights.toArray.take(100).mkString([, 
,, ...)
-case _ = model.get.weights.toArray.mkString([, ,, ])
+logInfo(sCurrent model: weights, ${display})
   }
-  logInfo(Current model: weights, %s.format (display))
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/59fc3f19/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala
index e98b61e..fd65329 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala
@@ -158,4 +158,21 @@ class StreamingLogisticRegressionSuite extends 
SparkFunSuite with TestSuiteBase
 val error = output.map(batch = batch.map(p = math.abs(p._1 - p._2)).sum 
/ nPoints).toList
 assert(error.head  0.8  error.last  0.2)
   }
+
+  // Test empty RDDs in a stream
+  test(handling empty RDDs in a stream) {
+val model = new StreamingLogisticRegressionWithSGD()
+  .setInitialWeights(Vectors.dense(-0.1))
+  .setStepSize(0.01)
+  

spark git commit: [SPARK-8200] [MLLIB] Check for empty RDDs in StreamingLinearAlgorithm

2015-06-10 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 96a7c888d - b928f5438


[SPARK-8200] [MLLIB] Check for empty RDDs in StreamingLinearAlgorithm

Test cases for both StreamingLinearRegression and StreamingLogisticRegression, 
and code fix.

Edit:
This contribution is my original work and I license the work to the project 
under the project's open source license.

Author: Paavo ppark...@gmail.com

Closes #6713 from pparkkin/streamingmodel-empty-rdd and squashes the following 
commits:

ff5cd78 [Paavo] Update strings to use interpolation.
db234cf [Paavo] Use !rdd.isEmpty.
54ad89e [Paavo] Test case for empty stream.
393e36f [Paavo] Ignore empty RDDs.
0bfc365 [Paavo] Test case for empty stream.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b928f543
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b928f543
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b928f543

Branch: refs/heads/master
Commit: b928f543845ddd39e914a0e8f0b0205fd86100c5
Parents: 96a7c88
Author: Paavo ppark...@gmail.com
Authored: Wed Jun 10 23:17:42 2015 +0100
Committer: Sean Owen so...@cloudera.com
Committed: Wed Jun 10 23:17:42 2015 +0100

--
 .../regression/StreamingLinearAlgorithm.scala | 14 --
 .../StreamingLogisticRegressionSuite.scala| 17 +
 .../StreamingLinearRegressionSuite.scala  | 18 ++
 3 files changed, 43 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b928f543/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
index aee51bf..141052b 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
@@ -83,13 +83,15 @@ abstract class StreamingLinearAlgorithm[
   throw new IllegalArgumentException(Model must be initialized before 
starting training.)
 }
 data.foreachRDD { (rdd, time) =
-  model = Some(algorithm.run(rdd, model.get.weights))
-  logInfo(Model updated at time %s.format(time.toString))
-  val display = model.get.weights.size match {
-case x if x  100 = model.get.weights.toArray.take(100).mkString([, 
,, ...)
-case _ = model.get.weights.toArray.mkString([, ,, ])
+  if (!rdd.isEmpty) {
+model = Some(algorithm.run(rdd, model.get.weights))
+logInfo(sModel updated at time ${time.toString})
+val display = model.get.weights.size match {
+  case x if x  100 = 
model.get.weights.toArray.take(100).mkString([, ,, ...)
+  case _ = model.get.weights.toArray.mkString([, ,, ])
+}
+logInfo(sCurrent model: weights, ${display})
   }
-  logInfo(Current model: weights, %s.format (display))
 }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b928f543/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala
--
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala
index e98b61e..fd65329 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/classification/StreamingLogisticRegressionSuite.scala
@@ -158,4 +158,21 @@ class StreamingLogisticRegressionSuite extends 
SparkFunSuite with TestSuiteBase
 val error = output.map(batch = batch.map(p = math.abs(p._1 - p._2)).sum 
/ nPoints).toList
 assert(error.head  0.8  error.last  0.2)
   }
+
+  // Test empty RDDs in a stream
+  test(handling empty RDDs in a stream) {
+val model = new StreamingLogisticRegressionWithSGD()
+  .setInitialWeights(Vectors.dense(-0.1))
+  .setStepSize(0.01)
+  .setNumIterations(10)
+val numBatches = 10
+val emptyInput = Seq.empty[Seq[LabeledPoint]]
+val ssc = setupStreams(emptyInput,
+  (inputDStream: DStream[LabeledPoint]) = {
+model.trainOn(inputDStream)
+model.predictOnValues(inputDStream.map(x = (x.label, x.features)))
+  }
+)
+val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, 
numBatches)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b928f543/mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala