Hello Jeremy,

Thank you for your reply.

When I am running this code on the local machine on a streaming data, it
keeps giving me this error:
*WARN TaskSetManager: Lost task 2.0 in stage 211.0 (TID 4138, localhost):
java.io.FileNotFoundException:
/tmp/spark-local-20150316165742-9ac0/27/shuffle_102_2_0.data (No such file
or directory) *

And when I execute the same code on a static data after randomly splitting
it into 5 sets, it gives me a little bit different weights (difference is
in decimals). I am still trying to analyse why would this be happening.
Any inputs, on why would this be happening?

Best Regards,
Arunkumar


On Tue, Mar 17, 2015 at 11:32 AM, Jeremy Freeman <freeman.jer...@gmail.com>
wrote:

> Hi Arunkumar,
>
> That looks like it should work. Logically, it’s similar to the
> implementation used by StreamingLinearRegression and
> StreamingLogisticRegression, see this class:
>
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
>
> which exposes the kind of operation your describing (for any linear
> method).
>
> The nice thing about the gradient-based methods is that they can use
> existing MLLib optimization routines in this fairly direct way. Other
> methods (such as KMeans) require a bit more reengineering.
>
> — Jeremy
>
> -------------------------
> jeremyfreeman.net
> @thefreemanlab
>
> On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. <ecomot...@gmail.com> wrote:
>
> Hello,
>
> I am new to spark streaming API.
>
> I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on
> streaming data? Currently I am using forecahRDD for parsing through DStream
> and I am generating a model based on each RDD. Am I doing anything
> logically wrong here?
> Thank you.
>
> Sample Code:
>
> val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater())
> var initialWeights = 
> Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble()))
> var isFirst = true
> var model = new LinearRegressionModel(null,1.0)
>
> parsedData.foreachRDD{rdd =>
>   if(isFirst) {
>     val weights = algorithm.optimize(rdd, initialWeights)
>     val w = weights.toArray
>     val intercept = w.head
>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>     isFirst = false
>   }else{
>     var ab = ArrayBuffer[Double]()
>     ab.insert(0, model.intercept)
>     ab.appendAll( model.weights.toArray)
>     print("Intercept = "+model.intercept+" :: modelWeights = "+model.weights)
>     initialWeights = Vectors.dense(ab.toArray)
>     print("Initial Weights: "+ initialWeights)
>     val weights = algorithm.optimize(rdd, initialWeights)
>     val w = weights.toArray
>     val intercept = w.head
>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>   }
>
>
>
> Best Regards,
> Arunkumar
>
>
>

Reply via email to