Hi Margus, thanks for reporting this, I’ve been able to reproduce and there 
does indeed appear to be a bug. I’ve created a JIRA and have a fix ready, can 
hopefully include in 1.3.1.

In the meantime, you can get the desired result using transform:

> model.trainOn(trainingData)
> 
> testingData.transform { rdd =>
>       val latest = model.latestModel()
>       rdd.map(lp => (lp.label, latest.predict(lp.features)))
> }.print()

-------------------------
jeremyfreeman.net
@thefreemanlab

On Mar 15, 2015, at 2:56 PM, Margus Roo <mar...@roo.ee> wrote:

> Hi again
> 
> Tried the same 
> examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala
>  from 1.3.0
> and getting in case testing file content is:
>   (0.0,[3.0,4.0,3.0])
>   (0.0,[4.0,4.0,4.0])
>   (4.0,[5.0,5.0,5.0])
>   (5.0,[5.0,6.0,6.0])
>   (6.0,[7.0,4.0,7.0])
>   (7.0,[8.0,6.0,8.0])
>   (8.0,[44.0,9.0,9.0])
>   (9.0,[14.0,30.0,10.0])
> 
> and the answer:
> (0.0,0.0)
> (0.0,0.0)
> (0.0,0.0)
> (4.0,0.0)
> (5.0,0.0)
> (6.0,0.0)
> (7.0,0.0)
> (8.0,0.0)
> (9.0,0.0)
> 
> What is wrong?
> I can see that model's weights are changing in case I put new data into 
> training dir.
> Margus (margusja) Roo
> http://margus.roo.ee
> skype: margusja
> +372 51 480
> On 14/03/15 09:05, Margus Roo wrote:
>> Hi
>> 
>> I try to understand example provided in 
>> https://spark.apache.org/docs/1.2.1/mllib-linear-methods.html - Streaming 
>> linear regression
>> 
>> Code:
>> import org.apache.spark._
>> import org.apache.spark.streaming._
>> import org.apache.spark.mllib.linalg.Vectors
>> import org.apache.spark.mllib.regression.LabeledPoint
>> import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
>> import org.apache.spark.storage.StorageLevel
>> import org.apache.spark.streaming.dstream.DStream
>> 
>> object StreamingLinReg {
>> 
>>   def main(args: Array[String]) {
>>   
>>     val conf = new 
>> SparkConf().setAppName("StreamLinReg").setMaster("local[2]")
>>     val ssc = new StreamingContext(conf, Seconds(10))
>>   
>>     
>>     val trainingData = 
>> ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/training/").map(LabeledPoint.parse).cache()
>>     
>>     val testData = 
>> ssc.textFileStream("/Users/margusja/Documents/workspace/sparcdemo/testing/").map(LabeledPoint.parse)
>>     
>>     val numFeatures = 3
>>     val model = new 
>> StreamingLinearRegressionWithSGD().setInitialWeights(Vectors.zeros(numFeatures))
>>     
>>     model.trainOn(trainingData)
>>     model.predictOnValues(testData.map(lp => (lp.label, 
>> lp.features))).print()
>> 
>>     ssc.start()
>>     ssc.awaitTermination()
>>     
>>   }
>>     
>> }
>> 
>> Compiled code and run it
>> Put file contains
>>   (1.0,[2.0,2.0,2.0])
>>   (2.0,[3.0,3.0,3.0])
>>   (3.0,[4.0,4.0,4.0])
>>   (4.0,[5.0,5.0,5.0])
>>   (5.0,[6.0,6.0,6.0])
>>   (6.0,[7.0,7.0,7.0])
>>   (7.0,[8.0,8.0,8.0])
>>   (8.0,[9.0,9.0,9.0])
>>   (9.0,[10.0,10.0,10.0])
>> in to training directory.
>> 
>> I can see that models weight change:
>> 15/03/14 08:53:40 INFO StreamingLinearRegressionWithSGD: Current model: 
>> weights, [7.333333333333333,7.333333333333333,7.333333333333333]
>> 
>> No I can put what ever in to testing directory but I can not understand 
>> answer.
>> In example I can put the same file I used for training in to testing 
>> directory. File content is
>>   (1.0,[2.0,2.0,2.0])
>>   (2.0,[3.0,3.0,3.0])
>>   (3.0,[4.0,4.0,4.0])
>>   (4.0,[5.0,5.0,5.0])
>>   (5.0,[6.0,6.0,6.0])
>>   (6.0,[7.0,7.0,7.0])
>>   (7.0,[8.0,8.0,8.0])
>>   (8.0,[9.0,9.0,9.0])
>>   (9.0,[10.0,10.0,10.0])
>> 
>> And answer will be
>> (1.0,0.0)
>> (2.0,0.0)
>> (3.0,0.0)
>> (4.0,0.0)
>> (5.0,0.0)
>> (6.0,0.0)
>> (7.0,0.0)
>> (8.0,0.0)
>> (9.0,0.0)
>> 
>> And in case my file content is
>>   (0.0,[2.0,2.0,2.0])
>>   (0.0,[3.0,3.0,3.0])
>>   (0.0,[4.0,4.0,4.0])
>>   (0.0,[5.0,5.0,5.0])
>>   (0.0,[6.0,6.0,6.0])
>>   (0.0,[7.0,7.0,7.0])
>>   (0.0,[8.0,8.0,8.0])
>>   (0.0,[9.0,9.0,9.0])
>>   (0.0,[10.0,10.0,10.0])
>> 
>> the answer will be:
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> (0.0,0.0)
>> 
>> I except to get label predicted by model.
>> -- 
>> Margus (margusja) Roo
>> http://margus.roo.ee
>> skype: margusja
>> +372 51 480
> 

Reply via email to