Re: Can LBFGS be used on streaming data?
Hello DB, Thank you! Do you know how to run Linear Regression without SGD on streaming data in spark? I tried SGD but due to step size I was not getting the expected weights. Best Regards, Arunkumar On Wed, Mar 25, 2015 at 4:33 PM, DB Tsai dbt...@dbtsai.com wrote: Hi Arunkumar, I think L-BFGS will not work since L-BFGS algorithm assumes that the objective function will be always the same (i.e., the data is the same) for entire optimization process to construct the approximated Hessian matrix. In the streaming case, the data will be changing, so it will cause problem for the algorithm. Sincerely, DB Tsai --- Blog: https://www.dbtsai.com On Mon, Mar 16, 2015 at 3:19 PM, EcoMotto Inc. ecomot...@gmail.com wrote: Hello, I am new to spark streaming API. I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on streaming data? Currently I am using forecahRDD for parsing through DStream and I am generating a model based on each RDD. Am I doing anything logically wrong here? Thank you. Sample Code: val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater()) var initialWeights = Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble())) var isFirst = true var model = new LinearRegressionModel(null,1.0) parsedData.foreachRDD{rdd = if(isFirst) { val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) isFirst = false }else{ var ab = ArrayBuffer[Double]() ab.insert(0, model.intercept) ab.appendAll( model.weights.toArray) print(Intercept = +model.intercept+ :: modelWeights = +model.weights) initialWeights = Vectors.dense(ab.toArray) print(Initial Weights: + initialWeights) val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) } Best Regards, Arunkumar
Re: Can LBFGS be used on streaming data?
Hello Jeremy, Sorry for the delayed reply! First issue was resolved, I believe it was just production and consumption rate problem. Regarding the second question, I am streaming the data from the file and there are about 38k records. I am sending the streams in the same sequence as I am reading from the file, but I am getting different weights each time may be something to do with how the DStreams are being processed. Can you suggest me some solution for this case? my requirement is that my program must generate the same weights for both static and streaming data? Thank you for your help! Best Regards, Arunkumar On Thu, Mar 19, 2015 at 9:25 PM, Jeremy Freeman freeman.jer...@gmail.com wrote: Regarding the first question, can you say more about how you are loading your data? And what is the size of the data set? And is that the only error you see, and do you only see it in the streaming version? For the second question, there are a couple reasons the weights might slightly differ, it depends on exactly how you set up the comparison. When you split it into 5, were those the same 5 chunks of data you used for the streaming case? And were they presented to the optimizer in the same order? Difference in either could produce small differences in the resulting weights, but that doesn’t mean it’s doing anything wrong. - jeremyfreeman.net @thefreemanlab On Mar 17, 2015, at 6:19 PM, EcoMotto Inc. ecomot...@gmail.com wrote: Hello Jeremy, Thank you for your reply. When I am running this code on the local machine on a streaming data, it keeps giving me this error: *WARN TaskSetManager: Lost task 2.0 in stage 211.0 (TID 4138, localhost): java.io.FileNotFoundException: /tmp/spark-local-20150316165742-9ac0/27/shuffle_102_2_0.data (No such file or directory) * And when I execute the same code on a static data after randomly splitting it into 5 sets, it gives me a little bit different weights (difference is in decimals). I am still trying to analyse why would this be happening. Any inputs, on why would this be happening? Best Regards, Arunkumar On Tue, Mar 17, 2015 at 11:32 AM, Jeremy Freeman freeman.jer...@gmail.com wrote: Hi Arunkumar, That looks like it should work. Logically, it’s similar to the implementation used by StreamingLinearRegression and StreamingLogisticRegression, see this class: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala which exposes the kind of operation your describing (for any linear method). The nice thing about the gradient-based methods is that they can use existing MLLib optimization routines in this fairly direct way. Other methods (such as KMeans) require a bit more reengineering. — Jeremy - jeremyfreeman.net @thefreemanlab On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. ecomot...@gmail.com wrote: Hello, I am new to spark streaming API. I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on streaming data? Currently I am using forecahRDD for parsing through DStream and I am generating a model based on each RDD. Am I doing anything logically wrong here? Thank you. Sample Code: val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater()) var initialWeights = Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble())) var isFirst = true var model = new LinearRegressionModel(null,1.0) parsedData.foreachRDD{rdd = if(isFirst) { val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) isFirst = false }else{ var ab = ArrayBuffer[Double]() ab.insert(0, model.intercept) ab.appendAll( model.weights.toArray) print(Intercept = +model.intercept+ :: modelWeights = +model.weights) initialWeights = Vectors.dense(ab.toArray) print(Initial Weights: + initialWeights) val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) } Best Regards, Arunkumar
Re: Can LBFGS be used on streaming data?
Hello Jeremy, Thank you for your reply. When I am running this code on the local machine on a streaming data, it keeps giving me this error: *WARN TaskSetManager: Lost task 2.0 in stage 211.0 (TID 4138, localhost): java.io.FileNotFoundException: /tmp/spark-local-20150316165742-9ac0/27/shuffle_102_2_0.data (No such file or directory) * And when I execute the same code on a static data after randomly splitting it into 5 sets, it gives me a little bit different weights (difference is in decimals). I am still trying to analyse why would this be happening. Any inputs, on why would this be happening? Best Regards, Arunkumar On Tue, Mar 17, 2015 at 11:32 AM, Jeremy Freeman freeman.jer...@gmail.com wrote: Hi Arunkumar, That looks like it should work. Logically, it’s similar to the implementation used by StreamingLinearRegression and StreamingLogisticRegression, see this class: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala which exposes the kind of operation your describing (for any linear method). The nice thing about the gradient-based methods is that they can use existing MLLib optimization routines in this fairly direct way. Other methods (such as KMeans) require a bit more reengineering. — Jeremy - jeremyfreeman.net @thefreemanlab On Mar 16, 2015, at 6:19 PM, EcoMotto Inc. ecomot...@gmail.com wrote: Hello, I am new to spark streaming API. I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on streaming data? Currently I am using forecahRDD for parsing through DStream and I am generating a model based on each RDD. Am I doing anything logically wrong here? Thank you. Sample Code: val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater()) var initialWeights = Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble())) var isFirst = true var model = new LinearRegressionModel(null,1.0) parsedData.foreachRDD{rdd = if(isFirst) { val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) isFirst = false }else{ var ab = ArrayBuffer[Double]() ab.insert(0, model.intercept) ab.appendAll( model.weights.toArray) print(Intercept = +model.intercept+ :: modelWeights = +model.weights) initialWeights = Vectors.dense(ab.toArray) print(Initial Weights: + initialWeights) val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) } Best Regards, Arunkumar
Can LBFGS be used on streaming data?
Hello, I am new to spark streaming API. I wanted to ask if I can apply LBFGS (with LeastSquaresGradient) on streaming data? Currently I am using forecahRDD for parsing through DStream and I am generating a model based on each RDD. Am I doing anything logically wrong here? Thank you. Sample Code: val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater()) var initialWeights = Vectors.dense(Array.fill(numFeatures)(scala.util.Random.nextDouble())) var isFirst = true var model = new LinearRegressionModel(null,1.0) parsedData.foreachRDD{rdd = if(isFirst) { val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) isFirst = false }else{ var ab = ArrayBuffer[Double]() ab.insert(0, model.intercept) ab.appendAll( model.weights.toArray) print(Intercept = +model.intercept+ :: modelWeights = +model.weights) initialWeights = Vectors.dense(ab.toArray) print(Initial Weights: + initialWeights) val weights = algorithm.optimize(rdd, initialWeights) val w = weights.toArray val intercept = w.head model = new LinearRegressionModel(Vectors.dense(w.drop(1)), intercept) } Best Regards, Arunkumar
Re: Getting incorrect weights for LinearRegression
Thanks a lot Burak, that helped. On Fri, Mar 13, 2015 at 1:55 PM, Burak Yavuz brk...@gmail.com wrote: Hi, I would suggest you use LBFGS, as I think the step size is hurting you. You can run the same thing in LBFGS as: ``` val algorithm = new LBFGS(new LeastSquaresGradient(), new SimpleUpdater()) val initialWeights = Vectors.dense(Array.fill(3)( scala.util.Random.nextDouble())) val weights = algorithm.optimize(parsedData,initialWeights) ``` Note that weights will be a Vector and not a model. You can then generate the model with: val w = weights.toArray val intercept = w.takeRight(1).head() val model = new LinearRegressionModel(Vectors.dense(w.dropRight(1)), intercept) Best, Burak On Wed, Mar 11, 2015 at 11:59 AM, EcoMotto Inc. ecomot...@gmail.com wrote: Hello, I am trying to run LinearRegression on a dummy data set, given below. Here I tried all different settings but I am still failing to reproduce desired coefficients. Please help me out, as I facing the same problem in my actual dataset. Thank you. This dataset is generated based on the simple equation: Y = 4 + (2 * x1) + (3 * x2) *Data:* y,x1,x2 6.3,1,0.1 8.6,2,0.2 10.9,3,0.3 13.8,4,0.6 16.4,5,0.8 19.6,6,1.2 22.8,7,1.6 25.7,8,1.9 28.3,9,2.1 31.2,10,2.4 34.1,11,2.7 *Spark Code:* val data = sc.textFile(Data/tempData_1.csv ) val parsedData = data.mapPartitions(_.drop(1)).map { line = val parts = line.split(',') LabeledPoint(parts(0).toDouble,Vectors.dense(Array(1.0,parts(1).toDouble,parts(2).toDouble))) }.cache() var numIterations = 400 val step = 0.01 val algorithm = new LinearRegressionWithSGD() algorithm.setIntercept(false) //Even tried with intercept(True) and just (x1,x2) features algorithm.optimizer.setStepSize(step) algorithm.optimizer.setNumIterations(numIterations) .setUpdater(new SimpleUpdater()) //.setRegParam(0.1) .setMiniBatchFraction(1.0) val initialWeights = Vectors.dense(Array.fill(3)(scala.util.Random.nextDouble())) val model = algorithm.run(parsedData,initialWeights) println(s Model intercept: ${model.intercept}, weights: ${model.weights}) Regards, Arun