Hello Jeremy,

Sorry for the delayed reply!

First issue was resolved, I believe it was just production and consumption
rate problem.

Regarding the second question, I am streaming the data from the file and
there are about 38k records. I am sending the streams in the same sequence
as I am reading from the file, but I am getting different weights each time
may be something to do with how the DStreams are being processed.

Can you suggest me some solution for this case? my requirement is that my
program must generate the same weights for both static and streaming data?
Thank you for your help!

Best Regards,
Arunkumar


On Thu, Mar 19, 2015 at 9:25 PM, Jeremy Freeman <freeman.jer...@gmail.com>
wrote:

> Regarding the first question, can you say more about how you are loading
> your data? And what is the size of the data set? And is that the only error
> you see, and do you only see it in the streaming version?
>
> For the second question, there are a couple reasons the weights might
> slightly differ, it depends on exactly how you set up the comparison. When
> you split it into 5, were those the same 5 chunks of data you used for the
> streaming case? And were they presented to the optimizer in the same order?
> Difference in either could produce small differences in the resulting
> weights, but that doesn’t mean it’s doing anything wrong.
>
> -------------------------
> jeremyfreeman.net
> @thefreemanlab
>
> On Mar 17, 2015, at 6:19 PM, EcoMotto Inc. <ecomot...@gmail.com> wrote:
>
> Hello Jeremy,
>
> Thank you for your reply.
>
> When I am running this code on the local machine on a streaming data, it
> keeps giving me this error:
> *WARN TaskSetManager: Lost task 2.0 in stage 211.0 (TID 4138, localhost):
> java.io.FileNotFoundException:
> /tmp/spark-local-20150316165742-9ac0/27/shuffle_102_2_0.data (No such file
> or directory) *
>
> And when I execute the same code on a static data after randomly splitting
> it into 5 sets, it gives me a little bit different weights (difference is
> in decimals). I am still trying to analyse why would this be happening.
> Any inputs, on why would this be happening?
>
> Best Regards,
> Arunkumar
>
>
> On Tue, Mar 17, 2015 at 11:32 AM, Jeremy Freeman <freeman.jer...@gmail.com
> > wrote:
>
>> Hi Arunkumar,
>>
>> That looks like it should work. Logically, it’s similar to the
>> implementation used by StreamingLinearRegression and
>> StreamingLogisticRegression, see this class:
>>
>>
>> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala
>>
>> which exposes the kind of operation your describing (for any linear
>> method).
>>
>> The nice thing about the gradient-based methods is that they can use
>> existing MLLib optimization routines in this fairly direct way. Other
>> methods (such as KMeans) require a bit more reengineering.
>>
>> — Jeremy
>>
>> -------------------------
>> jeremyfreeman.net
>> @thefreemanlab
>>
>> On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. <ecomot...@gmail.com> wrote:
>>
>> Hello,
>>
>> I am new to spark streaming API.
>>
>> I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on
>> streaming data? Currently I am using forecahRDD for parsing through DStream
>> and I am generating a model based on each RDD. Am I doing anything
>> logically wrong here?
>> Thank you.
>>
>> Sample Code:
>>
>> val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater())
>> var initialWeights = 
>> Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble()))
>> var isFirst = true
>> var model = new LinearRegressionModel(null,1.0)
>>
>> parsedData.foreachRDD{rdd =>
>>   if(isFirst) {
>>     val weights = algorithm.optimize(rdd, initialWeights)
>>     val w = weights.toArray
>>     val intercept = w.head
>>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>>     isFirst = false
>>   }else{
>>     var ab = ArrayBuffer[Double]()
>>     ab.insert(0, model.intercept)
>>     ab.appendAll( model.weights.toArray)
>>     print("Intercept = "+model.intercept+" :: modelWeights = "+model.weights)
>>     initialWeights = Vectors.dense(ab.toArray)
>>     print("Initial Weights: "+ initialWeights)
>>     val weights = algorithm.optimize(rdd, initialWeights)
>>     val w = weights.toArray
>>     val intercept = w.head
>>     model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept)
>>   }
>>
>>
>>
>> Best Regards,
>> Arunkumar
>>
>>
>>
>
>

Reply via email to