Re: Streaming linear regression example question
Tnx for the workaround. Margus (margusja) Roo http://margus.roo.ee skype: margusja +372 51 480 On 16/03/15 06:20, Jeremy Freeman wrote: Hi Margus, thanks for reporting this, I’ve been able to reproduce and there does indeed appear to be a bug. I’ve created a JIRA and have a fix ready, can hopefully include in 1.3.1. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming linear regression example question
Hi again Tried the same examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala from 1.3.0 and getting in case testing file content is: (0.0,[3.0,4.0,3.0]) (0.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[5.0,6.0,6.0]) (6.0,[7.0,4.0,7.0]) (7.0,[8.0,6.0,8.0]) (8.0,[44.0,9.0,9.0]) (9.0,[14.0,30.0,10.0]) and the answer: (0.0,0.0) (0.0,0.0) (0.0,0.0) (4.0,0.0) (5.0,0.0) (6.0,0.0) (7.0,0.0) (8.0,0.0) (9.0,0.0) What is wrong? I can see that model's weights are changing in case I put new data into training dir. Margus (margusja) Roo http://margus.roo.ee skype: margusja +372 51 480 On 14/03/15 09:05, Margus Roo wrote: Hi I try to understand example provided in https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - Streaming linear regression Code: import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream object StreamingLinReg { def main(args: Array[String]) { val conf = new SparkConf().setAppName(StreamLinReg).setMaster(local[2]) val ssc = new StreamingContext(conf, Seconds(10)) val trainingData = ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/training/).map(LabeledPoint.parse).cache() val testData = ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/testing/).map(LabeledPoint.parse) val numFeatures = 3 val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures)) model.trainOn(trainingData) model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination() } } Compiled code and run it Put file contains (1.0,[2.0,2.0,2.0]) (2.0,[3.0,3.0,3.0]) (3.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[6.0,6.0,6.0]) (6.0,[7.0,7.0,7.0]) (7.0,[8.0,8.0,8.0]) (8.0,[9.0,9.0,9.0]) (9.0,[10.0,10.0,10.0]) in to training directory. I can see that models weight change: 15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model: weights, [7.333,7.333,7.333] No I can put what ever in to testing directory but I can not understand answer. In example I can put the same file I used for training in to testing directory. File content is (1.0,[2.0,2.0,2.0]) (2.0,[3.0,3.0,3.0]) (3.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[6.0,6.0,6.0]) (6.0,[7.0,7.0,7.0]) (7.0,[8.0,8.0,8.0]) (8.0,[9.0,9.0,9.0]) (9.0,[10.0,10.0,10.0]) And answer will be (1.0,0.0) (2.0,0.0) (3.0,0.0) (4.0,0.0) (5.0,0.0) (6.0,0.0) (7.0,0.0) (8.0,0.0) (9.0,0.0) And in case my file content is (0.0,[2.0,2.0,2.0]) (0.0,[3.0,3.0,3.0]) (0.0,[4.0,4.0,4.0]) (0.0,[5.0,5.0,5.0]) (0.0,[6.0,6.0,6.0]) (0.0,[7.0,7.0,7.0]) (0.0,[8.0,8.0,8.0]) (0.0,[9.0,9.0,9.0]) (0.0,[10.0,10.0,10.0]) the answer will be: (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) I except to get label predicted by model. -- Margus (margusja) Roo http://margus.roo.ee skype: margusja +372 51 480
Re: Streaming linear regression example question
Hi Margus, thanks for reporting this, I’ve been able to reproduce and there does indeed appear to be a bug. I’ve created a JIRA and have a fix ready, can hopefully include in 1.3.1. In the meantime, you can get the desired result using transform: model.trainOn(trainingData) testingData.transform { rdd = val latest = model.latestModel() rdd.map(lp = (lp.label, latest.predict(lp.features))) }.print() - jeremyfreeman.net @thefreemanlab On Mar 15, 2015, at 2:56 PM, Margus Roo mar...@roo.ee wrote: Hi again Tried the same examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala from 1.3.0 and getting in case testing file content is: (0.0,[3.0,4.0,3.0]) (0.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[5.0,6.0,6.0]) (6.0,[7.0,4.0,7.0]) (7.0,[8.0,6.0,8.0]) (8.0,[44.0,9.0,9.0]) (9.0,[14.0,30.0,10.0]) and the answer: (0.0,0.0) (0.0,0.0) (0.0,0.0) (4.0,0.0) (5.0,0.0) (6.0,0.0) (7.0,0.0) (8.0,0.0) (9.0,0.0) What is wrong? I can see that model's weights are changing in case I put new data into training dir. Margus (margusja) Roo http://margus.roo.ee skype: margusja +372 51 480 On 14/03/15 09:05, Margus Roo wrote: Hi I try to understand example provided in https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - Streaming linear regression Code: import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream object StreamingLinReg { def main(args: Array[String]) { val conf = new SparkConf().setAppName(StreamLinReg).setMaster(local[2]) val ssc = new StreamingContext(conf, Seconds(10)) val trainingData = ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/training/).map(LabeledPoint.parse).cache() val testData = ssc.textFileStream(/Users/margusja/Documents/workspace/sparcdemo/testing/).map(LabeledPoint.parse) val numFeatures = 3 val model = new StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures)) model.trainOn(trainingData) model.predictOnValues(testData.map(lp = (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination() } } Compiled code and run it Put file contains (1.0,[2.0,2.0,2.0]) (2.0,[3.0,3.0,3.0]) (3.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[6.0,6.0,6.0]) (6.0,[7.0,7.0,7.0]) (7.0,[8.0,8.0,8.0]) (8.0,[9.0,9.0,9.0]) (9.0,[10.0,10.0,10.0]) in to training directory. I can see that models weight change: 15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model: weights, [7.333,7.333,7.333] No I can put what ever in to testing directory but I can not understand answer. In example I can put the same file I used for training in to testing directory. File content is (1.0,[2.0,2.0,2.0]) (2.0,[3.0,3.0,3.0]) (3.0,[4.0,4.0,4.0]) (4.0,[5.0,5.0,5.0]) (5.0,[6.0,6.0,6.0]) (6.0,[7.0,7.0,7.0]) (7.0,[8.0,8.0,8.0]) (8.0,[9.0,9.0,9.0]) (9.0,[10.0,10.0,10.0]) And answer will be (1.0,0.0) (2.0,0.0) (3.0,0.0) (4.0,0.0) (5.0,0.0) (6.0,0.0) (7.0,0.0) (8.0,0.0) (9.0,0.0) And in case my file content is (0.0,[2.0,2.0,2.0]) (0.0,[3.0,3.0,3.0]) (0.0,[4.0,4.0,4.0]) (0.0,[5.0,5.0,5.0]) (0.0,[6.0,6.0,6.0]) (0.0,[7.0,7.0,7.0]) (0.0,[8.0,8.0,8.0]) (0.0,[9.0,9.0,9.0]) (0.0,[10.0,10.0,10.0]) the answer will be: (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) (0.0,0.0) I except to get label predicted by model. -- Margus (margusja) Roo http://margus.roo.ee skype: margusja +372 51 480