Hi Margus, thanks for reporting this, I’ve been able to reproduce and there does indeed appear to be a bug. I’ve created a JIRA and have a fix ready, can hopefully include in 1.3.1.
In the meantime, you can get the desired result using transform: > model.trainOn(trainingData) > > testingData.transform { rdd => > val latest = model.latestModel() > rdd.map(lp => (lp.label, latest.predict(lp.features))) > }.print() ------------------------- jeremyfreeman.net @thefreemanlab On Mar 15, 2015, at 2:56 PM, Margus Roo <mar...@roo.ee> wrote: > Hi again > > Tried the same > examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala > from 1.3.0 > and getting in case testing file content is: > (0.0,[3.0,4.0,3.0]) > (0.0,[4.0,4.0,4.0]) > (4.0,[5.0,5.0,5.0]) > (5.0,[5.0,6.0,6.0]) > (6.0,[7.0,4.0,7.0]) > (7.0,[8.0,6.0,8.0]) > (8.0,[44.0,9.0,9.0]) > (9.0,[14.0,30.0,10.0]) > > and the answer: > (0.0,0.0) > (0.0,0.0) > (0.0,0.0) > (4.0,0.0) > (5.0,0.0) > (6.0,0.0) > (7.0,0.0) > (8.0,0.0) > (9.0,0.0) > > What is wrong? > I can see that model's weights are changing in case I put new data into > training dir. > Margus (margusja) Roo > http://margus.roo.ee > skype: margusja > +372 51 480 > On 14/03/15 09:05, Margus Roo wrote: >> Hi >> >> I try to understand example provided in >> https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - Streaming >> linear regression >> >> Code: >> import org.apache.spark._ >> import org.apache.spark.streaming._ >> import org.apache.spark.mllib.linalg.Vectors >> import org.apache.spark.mllib.regression.LabeledPoint >> import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD >> import org.apache.spark.storage.StorageLevel >> import org.apache.spark.streaming.dstream.DStream >> >> object StreamingLinReg { >> >> def main(args: Array[String]) { >> >> val conf = new >> SparkConf().setAppName("StreamLinReg").setMaster("local[2]") >> val ssc = new StreamingContext(conf, Seconds(10)) >> >> >> val trainingData = >> ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/training/").map(LabeledPoint.parse).cache() >> >> val testData = >> ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/testing/").map(LabeledPoint.parse) >> >> val numFeatures = 3 >> val model = new >> StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures)) >> >> model.trainOn(trainingData) >> model.predictOnValues(testData.map(lp => (lp.label, >> lp.features))).print() >> >> ssc.start() >> ssc.awaitTermination() >> >> } >> >> } >> >> Compiled code and run it >> Put file contains >> (1.0,[2.0,2.0,2.0]) >> (2.0,[3.0,3.0,3.0]) >> (3.0,[4.0,4.0,4.0]) >> (4.0,[5.0,5.0,5.0]) >> (5.0,[6.0,6.0,6.0]) >> (6.0,[7.0,7.0,7.0]) >> (7.0,[8.0,8.0,8.0]) >> (8.0,[9.0,9.0,9.0]) >> (9.0,[10.0,10.0,10.0]) >> in to training directory. >> >> I can see that models weight change: >> 15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model: >> weights, [7.333333333333333,7.333333333333333,7.333333333333333] >> >> No I can put what ever in to testing directory but I can not understand >> answer. >> In example I can put the same file I used for training in to testing >> directory. File content is >> (1.0,[2.0,2.0,2.0]) >> (2.0,[3.0,3.0,3.0]) >> (3.0,[4.0,4.0,4.0]) >> (4.0,[5.0,5.0,5.0]) >> (5.0,[6.0,6.0,6.0]) >> (6.0,[7.0,7.0,7.0]) >> (7.0,[8.0,8.0,8.0]) >> (8.0,[9.0,9.0,9.0]) >> (9.0,[10.0,10.0,10.0]) >> >> And answer will be >> (1.0,0.0) >> (2.0,0.0) >> (3.0,0.0) >> (4.0,0.0) >> (5.0,0.0) >> (6.0,0.0) >> (7.0,0.0) >> (8.0,0.0) >> (9.0,0.0) >> >> And in case my file content is >> (0.0,[2.0,2.0,2.0]) >> (0.0,[3.0,3.0,3.0]) >> (0.0,[4.0,4.0,4.0]) >> (0.0,[5.0,5.0,5.0]) >> (0.0,[6.0,6.0,6.0]) >> (0.0,[7.0,7.0,7.0]) >> (0.0,[8.0,8.0,8.0]) >> (0.0,[9.0,9.0,9.0]) >> (0.0,[10.0,10.0,10.0]) >> >> the answer will be: >> (0.0,0.0) >> (0.0,0.0) >> (0.0,0.0) >> (0.0,0.0) >> (0.0,0.0) >> (0.0,0.0) >> (0.0,0.0) >> (0.0,0.0) >> (0.0,0.0) >> >> I except to get label predicted by model. >> -- >> Margus (margusja) Roo >> http://margus.roo.ee >> skype: margusja >> +372 51 480 >