Hi Arunkumar,

That looks like it should work. Logically, it’s similar to the implementation 
used by StreamingLinearRegression and StreamingLogisticRegression, see this 
class:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala

which exposes the kind of operation your describing (for any linear method).

The nice thing about the gradient-based methods is that they can use existing 
MLLib optimization routines in this fairly direct way. Other methods (such as 
KMeans) require a bit more reengineering.

— Jeremy

-------------------------
jeremyfreeman.net
@thefreemanlab

On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. <ecomot...@gmail.com> wrote:

> Hello,
> 
> I am new to spark streaming API.
> 
> I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on streaming 
> data? Currently I am using forecahRDD for parsing through DStream and I am 
> generating a model based on each RDD. Am I doing anything logically wrong 
> here?
> Thank you.
> 
> Sample Code:
> val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater())
> var initialWeights = 
> Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble()))
> var isFirst = true
> var model = new LinearRegressionModel(null,1.0)
> 
> parsedData.foreachRDD{rdd =>
>   if(isFirst) {
>     val weights = algorithm.optimize(rdd, initialWeights)
>     val w = weights.toArray
>     val intercept = w.head
>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>     isFirst = false
>   }else{
>     var ab = ArrayBuffer[Double]()
>     ab.insert(0, model.intercept)
>     ab.appendAll( model.weights.toArray)
>     print("Intercept = "+model.intercept+" :: modelWeights = "+model.weights)
>     initialWeights = Vectors.dense(ab.toArray)
>     print("Initial Weights: "+ initialWeights)
>     val weights = algorithm.optimize(rdd, initialWeights)
>     val w = weights.toArray
>     val intercept = w.head
>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>   }
> 
> 
> Best Regards,
> Arunkumar

Reply via email to