Hi

I try to understand example provided in https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - Streaming linear regression

Code:
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.DStream

object StreamingLinReg {

  def main(args: Array[String]) {

val conf = new SparkConf().setAppName("StreamLinReg").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))


val trainingData = ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/training/").map(LabeledPoint.parse).cache()

val testData = ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/testing/").map(LabeledPoint.parse)

    val numFeatures = 3
val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))

    model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

    ssc.start()
    ssc.awaitTermination()

  }

}

Compiled code and run it
Put file contains
  (1.0,[2.0,2.0,2.0])
  (2.0,[3.0,3.0,3.0])
  (3.0,[4.0,4.0,4.0])
  (4.0,[5.0,5.0,5.0])
  (5.0,[6.0,6.0,6.0])
  (6.0,[7.0,7.0,7.0])
  (7.0,[8.0,8.0,8.0])
  (8.0,[9.0,9.0,9.0])
  (9.0,[10.0,10.0,10.0])
in to training directory.

I can see that models weight change:
15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model: weights, [7.333333333333333,7.333333333333333,7.333333333333333]

No I can put what ever in to testing directory but I can not understand answer. In example I can put the same file I used for training in to testing directory. File content is
  (1.0,[2.0,2.0,2.0])
  (2.0,[3.0,3.0,3.0])
  (3.0,[4.0,4.0,4.0])
  (4.0,[5.0,5.0,5.0])
  (5.0,[6.0,6.0,6.0])
  (6.0,[7.0,7.0,7.0])
  (7.0,[8.0,8.0,8.0])
  (8.0,[9.0,9.0,9.0])
  (9.0,[10.0,10.0,10.0])

And answer will be
(1.0,0.0)
(2.0,0.0)
(3.0,0.0)
(4.0,0.0)
(5.0,0.0)
(6.0,0.0)
(7.0,0.0)
(8.0,0.0)
(9.0,0.0)

And in case my file content is
  (0.0,[2.0,2.0,2.0])
  (0.0,[3.0,3.0,3.0])
  (0.0,[4.0,4.0,4.0])
  (0.0,[5.0,5.0,5.0])
  (0.0,[6.0,6.0,6.0])
  (0.0,[7.0,7.0,7.0])
  (0.0,[8.0,8.0,8.0])
  (0.0,[9.0,9.0,9.0])
  (0.0,[10.0,10.0,10.0])

the answer will be:
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)
(0.0,0.0)

I except to get label predicted by model.

--
Margus (margusja) Roo
http://margus.roo.ee
skype: margusja
+372 51 480

Reply via email to