Hi Arunkumar, That looks like it should work. Logically, it’s similar to the implementation used by StreamingLinearRegression and StreamingLogisticRegression, see this class:
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala which exposes the kind of operation your describing (for any linear method). The nice thing about the gradient-based methods is that they can use existing MLLib optimization routines in this fairly direct way. Other methods (such as KMeans) require a bit more reengineering. — Jeremy ------------------------- jeremyfreeman.net @thefreemanlab On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. <ecomot...@gmail.com> wrote: > Hello, > > I am new to spark streaming API. > > I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on streaming > data? Currently I am using forecahRDD for parsing through DStream and I am > generating a model based on each RDD. Am I doing anything logically wrong > here? > Thank you. > > Sample Code: > val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater()) > var initialWeights = > Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble())) > var isFirst = true > var model = new LinearRegressionModel(null,1.0) > > parsedData.foreachRDD{rdd => > if(isFirst) { > val weights = algorithm.optimize(rdd, initialWeights) > val w = weights.toArray > val intercept = w.head > model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) > isFirst = false > }else{ > var ab = ArrayBuffer[Double]() > ab.insert(0, model.intercept) > ab.appendAll( model.weights.toArray) > print("Intercept = "+model.intercept+" :: modelWeights = "+model.weights) > initialWeights = Vectors.dense(ab.toArray) > print("Initial Weights: "+ initialWeights) > val weights = algorithm.optimize(rdd, initialWeights) > val w = weights.toArray > val intercept = w.head > model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) > } > > > Best Regards, > Arunkumar