Hello Jeremy, Thank you for your reply.
When I am running this code on the local machine on a streaming data, it keeps giving me this error: *WARN TaskSetManager: Lost task 2.0 in stage 211.0 (TID 4138, localhost): java.io.FileNotFoundException: /tmp/spark-local-20150316165742-9ac0/27/shuffle_102_2_0.data (No such file or directory) * And when I execute the same code on a static data after randomly splitting it into 5 sets, it gives me a little bit different weights (difference is in decimals). I am still trying to analyse why would this be happening. Any inputs, on why would this be happening? Best Regards, Arunkumar On Tue, Mar 17, 2015 at 11:32 AM, Jeremy Freeman <freeman.jer...@gmail.com> wrote: > Hi Arunkumar, > > That looks like it should work. Logically, it’s similar to the > implementation used by StreamingLinearRegression and > StreamingLogisticRegression, see this class: > > > https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala > > which exposes the kind of operation your describing (for any linear > method). > > The nice thing about the gradient-based methods is that they can use > existing MLLib optimization routines in this fairly direct way. Other > methods (such as KMeans) require a bit more reengineering. > > — Jeremy > > ------------------------- > jeremyfreeman.net > @thefreemanlab > > On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. <ecomot...@gmail.com> wrote: > > Hello, > > I am new to spark streaming API. > > I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on > streaming data? Currently I am using forecahRDD for parsing through DStream > and I am generating a model based on each RDD. Am I doing anything > logically wrong here? > Thank you. > > Sample Code: > > val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater()) > var initialWeights = > Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble())) > var isFirst = true > var model = new LinearRegressionModel(null,1.0) > > parsedData.foreachRDD{rdd => > if(isFirst) { > val weights = algorithm.optimize(rdd, initialWeights) > val w = weights.toArray > val intercept = w.head > model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) > isFirst = false > }else{ > var ab = ArrayBuffer[Double]() > ab.insert(0, model.intercept) > ab.appendAll( model.weights.toArray) > print("Intercept = "+model.intercept+" :: modelWeights = "+model.weights) > initialWeights = Vectors.dense(ab.toArray) > print("Initial Weights: "+ initialWeights) > val weights = algorithm.optimize(rdd, initialWeights) > val w = weights.toArray > val intercept = w.head > model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) > } > > > > Best Regards, > Arunkumar > > >