Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-19 Thread Makoto Yui

Xiangrui and Debasish,

(2014/06/18 6:33), Debasish Das wrote:

I did run pretty big sparse dataset (20M rows, 3M sparse features) and I
got 100 iterations of SGD running in 200 seconds...10 executors each
with 16 GB memory...


I could figure out what the problem is. spark.akka.frameSize was too 
large. By setting spark.akka.frameSize=10, it worked for the news20 dataset.


The execution was slow for more large KDD cup 2012, Track 2 dataset 
(235M+ records of 16.7M+ (2^24) sparse features in about 33.6GB) due to 
the sequential aggregation of dense vectors on a single driver node.


It took about 7.6m for aggregation for an iteration.

Thanks,
Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-19 Thread Xiangrui Meng
It is because the frame size is not set correctly in executor backend. see 
spark-1112 . We are going to fix it in v1.0.1 . Did you try the treeAggregate?

 On Jun 19, 2014, at 2:01 AM, Makoto Yui yuin...@gmail.com wrote:
 
 Xiangrui and Debasish,
 
 (2014/06/18 6:33), Debasish Das wrote:
 I did run pretty big sparse dataset (20M rows, 3M sparse features) and I
 got 100 iterations of SGD running in 200 seconds...10 executors each
 with 16 GB memory...
 
 I could figure out what the problem is. spark.akka.frameSize was too large. 
 By setting spark.akka.frameSize=10, it worked for the news20 dataset.
 
 The execution was slow for more large KDD cup 2012, Track 2 dataset (235M+ 
 records of 16.7M+ (2^24) sparse features in about 33.6GB) due to the 
 sequential aggregation of dense vectors on a single driver node.
 
 It took about 7.6m for aggregation for an iteration.
 
 Thanks,
 Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-19 Thread Makoto Yui

Xiangrui,

(2014/06/19 23:43), Xiangrui Meng wrote:

It is because the frame size is not set correctly in executor backend. see 
spark-1112 . We are going to fix it in v1.0.1 . Did you try the treeAggregate?


Not yet. I will wait the v1.0.1 release.

Thanks,
Makoto


news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Makoto Yui
Hello,

I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
the number of training examples used in the evaluation is just 1,000.

It works fine for the dataset *news20.binary.1000* that has 178,560
features. However, it does not work for *news20.random.1000* where # of
features is large  (1,354,731 features) though we used a sparse vector
through MLUtils.loadLibSVMFile().

The execution seems not progressing while no error is reported in the
spark-shell as well as in the stdout/stderr of executors.

We used 32 executors with each allocating 7GB (2GB is for RDD) for
working memory.

Any suggesions? Your help is really appreciated.

==
Executed code
==
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

//val training = MLUtils.loadLibSVMFile(sc,
hdfs://host:8020/dataset/news20-binary/news20.binary.1000,
multiclass=false)
val training = MLUtils.loadLibSVMFile(sc,
hdfs://host:8020/dataset/news20-binary/news20.random.1000,
multiclass=false)

val numFeatures = training .take(1)(0).features.size
//numFeatures: Int = 178560 for news20.binary.1000
//numFeatures: Int = 1354731 for news20.random.1000
val model = LogisticRegressionWithSGD.train(training, numIterations=1)

==
The dataset used in the evaluation
==

http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary

$ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' 
news20.binary.1000
$ sort -R news20.binary  news20.random
$ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' 
news20.random.1000

You can find the dataset in
https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000


Thanks,
Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Makoto Yui
Here is follow-up to the previous evaluation.

aggregate at GradientDescent.scala:178 never finishes at
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178

We confirmed, by -verbose:gc, that GC is not happening during the aggregate
and the cumulative CPU time for the task is increasing little by little.

LBFGS also does not work for large # of features (news20.random.1000)
though it works fine for small # of features (news20.binary.1000).

aggregate at LBFGS.scala:201 also never finishes at
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201

---
[Evaluated code for LBFGS]

import org.apache.spark.SparkContext
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionModel
import org.apache.spark.mllib.optimization._

val data = MLUtils.loadLibSVMFile(sc,
hdfs://dm01:8020/dataset/news20-binary/news20.random.1000,
multiclass=false)
val numFeatures = data.take(1)(0).features.size

val training = data.map(x = (x.label, MLUtils.appendBias(x.features))).cache()

// Run training algorithm to build the model
val numCorrections = 10
val convergenceTol = 1e-4
val maxNumIterations = 20
val regParam = 0.1
val initialWeightsWithIntercept = Vectors.dense(new
Array[Double](numFeatures + 1))

val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
  training,
  new LogisticGradient(),
  new SquaredL2Updater(),
  numCorrections,
  convergenceTol,
  maxNumIterations,
  regParam,
  initialWeightsWithIntercept)
---


Thanks,
Makoto

2014-06-17 21:32 GMT+09:00 Makoto Yui yuin...@gmail.com:
 Hello,

 I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
 Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
 the number of training examples used in the evaluation is just 1,000.

 It works fine for the dataset *news20.binary.1000* that has 178,560
 features. However, it does not work for *news20.random.1000* where # of
 features is large  (1,354,731 features) though we used a sparse vector
 through MLUtils.loadLibSVMFile().

 The execution seems not progressing while no error is reported in the
 spark-shell as well as in the stdout/stderr of executors.

 We used 32 executors with each allocating 7GB (2GB is for RDD) for
 working memory.

 Any suggesions? Your help is really appreciated.

 ==
 Executed code
 ==
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

 //val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.binary.1000,
 multiclass=false)
 val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false)

 val numFeatures = training .take(1)(0).features.size
 //numFeatures: Int = 178560 for news20.binary.1000
 //numFeatures: Int = 1354731 for news20.random.1000
 val model = LogisticRegressionWithSGD.train(training, numIterations=1)

 ==
 The dataset used in the evaluation
 ==

 http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary

 $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' 
 news20.binary.1000
 $ sort -R news20.binary  news20.random
 $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' 
 news20.random.1000

 You can find the dataset in
 https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
 https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000


 Thanks,
 Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Hi Makoto,

How many partitions did you set? If there are too many partitions,
please do a coalesce before calling ML algorithms.

Btw, could you try the tree branch in my repo?
https://github.com/mengxr/spark/tree/tree

I used tree aggregate in this branch. It should help with the scalability.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 12:22 PM, Makoto Yui yuin...@gmail.com wrote:
 Here is follow-up to the previous evaluation.

 aggregate at GradientDescent.scala:178 never finishes at
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178

 We confirmed, by -verbose:gc, that GC is not happening during the aggregate
 and the cumulative CPU time for the task is increasing little by little.

 LBFGS also does not work for large # of features (news20.random.1000)
 though it works fine for small # of features (news20.binary.1000).

 aggregate at LBFGS.scala:201 also never finishes at
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201

 ---
 [Evaluated code for LBFGS]

 import org.apache.spark.SparkContext
 import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.mllib.classification.LogisticRegressionModel
 import org.apache.spark.mllib.optimization._

 val data = MLUtils.loadLibSVMFile(sc,
 hdfs://dm01:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false)
 val numFeatures = data.take(1)(0).features.size

 val training = data.map(x = (x.label, 
 MLUtils.appendBias(x.features))).cache()

 // Run training algorithm to build the model
 val numCorrections = 10
 val convergenceTol = 1e-4
 val maxNumIterations = 20
 val regParam = 0.1
 val initialWeightsWithIntercept = Vectors.dense(new
 Array[Double](numFeatures + 1))

 val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
   training,
   new LogisticGradient(),
   new SquaredL2Updater(),
   numCorrections,
   convergenceTol,
   maxNumIterations,
   regParam,
   initialWeightsWithIntercept)
 ---


 Thanks,
 Makoto

 2014-06-17 21:32 GMT+09:00 Makoto Yui yuin...@gmail.com:
 Hello,

 I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
 Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
 the number of training examples used in the evaluation is just 1,000.

 It works fine for the dataset *news20.binary.1000* that has 178,560
 features. However, it does not work for *news20.random.1000* where # of
 features is large  (1,354,731 features) though we used a sparse vector
 through MLUtils.loadLibSVMFile().

 The execution seems not progressing while no error is reported in the
 spark-shell as well as in the stdout/stderr of executors.

 We used 32 executors with each allocating 7GB (2GB is for RDD) for
 working memory.

 Any suggesions? Your help is really appreciated.

 ==
 Executed code
 ==
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

 //val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.binary.1000,
 multiclass=false)
 val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false)

 val numFeatures = training .take(1)(0).features.size
 //numFeatures: Int = 178560 for news20.binary.1000
 //numFeatures: Int = 1354731 for news20.random.1000
 val model = LogisticRegressionWithSGD.train(training, numIterations=1)

 ==
 The dataset used in the evaluation
 ==

 http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary

 $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' 
 news20.binary.1000
 $ sort -R news20.binary  news20.random
 $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' 
 news20.random.1000

 You can find the dataset in
 https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
 https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000


 Thanks,
 Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread DB Tsai
Hi Xiangrui,

What's different between treeAggregate and aggregate? Why
treeAggregate scales better? What if we just use mapPartition, will it
be as fast as treeAggregate?

Thanks.

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Tue, Jun 17, 2014 at 12:58 PM, Xiangrui Meng men...@gmail.com wrote:
 Hi Makoto,

 How many partitions did you set? If there are too many partitions,
 please do a coalesce before calling ML algorithms.

 Btw, could you try the tree branch in my repo?
 https://github.com/mengxr/spark/tree/tree

 I used tree aggregate in this branch. It should help with the scalability.

 Best,
 Xiangrui

 On Tue, Jun 17, 2014 at 12:22 PM, Makoto Yui yuin...@gmail.com wrote:
 Here is follow-up to the previous evaluation.

 aggregate at GradientDescent.scala:178 never finishes at
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L178

 We confirmed, by -verbose:gc, that GC is not happening during the aggregate
 and the cumulative CPU time for the task is increasing little by little.

 LBFGS also does not work for large # of features (news20.random.1000)
 though it works fine for small # of features (news20.binary.1000).

 aggregate at LBFGS.scala:201 also never finishes at
 https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala#L201

 ---
 [Evaluated code for LBFGS]

 import org.apache.spark.SparkContext
 import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
 import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.mllib.classification.LogisticRegressionModel
 import org.apache.spark.mllib.optimization._

 val data = MLUtils.loadLibSVMFile(sc,
 hdfs://dm01:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false)
 val numFeatures = data.take(1)(0).features.size

 val training = data.map(x = (x.label, 
 MLUtils.appendBias(x.features))).cache()

 // Run training algorithm to build the model
 val numCorrections = 10
 val convergenceTol = 1e-4
 val maxNumIterations = 20
 val regParam = 0.1
 val initialWeightsWithIntercept = Vectors.dense(new
 Array[Double](numFeatures + 1))

 val (weightsWithIntercept, loss) = LBFGS.runLBFGS(
   training,
   new LogisticGradient(),
   new SquaredL2Updater(),
   numCorrections,
   convergenceTol,
   maxNumIterations,
   regParam,
   initialWeightsWithIntercept)
 ---


 Thanks,
 Makoto

 2014-06-17 21:32 GMT+09:00 Makoto Yui yuin...@gmail.com:
 Hello,

 I have been evaluating LogisticRegressionWithSGD of Spark 1.0 MLlib on
 Hadoop 0.20.2-cdh3u6 but it does not work for a sparse dataset though
 the number of training examples used in the evaluation is just 1,000.

 It works fine for the dataset *news20.binary.1000* that has 178,560
 features. However, it does not work for *news20.random.1000* where # of
 features is large  (1,354,731 features) though we used a sparse vector
 through MLUtils.loadLibSVMFile().

 The execution seems not progressing while no error is reported in the
 spark-shell as well as in the stdout/stderr of executors.

 We used 32 executors with each allocating 7GB (2GB is for RDD) for
 working memory.

 Any suggesions? Your help is really appreciated.

 ==
 Executed code
 ==
 import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.mllib.classification.LogisticRegressionWithSGD

 //val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.binary.1000,
 multiclass=false)
 val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false)

 val numFeatures = training .take(1)(0).features.size
 //numFeatures: Int = 178560 for news20.binary.1000
 //numFeatures: Int = 1354731 for news20.random.1000
 val model = LogisticRegressionWithSGD.train(training, numIterations=1)

 ==
 The dataset used in the evaluation
 ==

 http://www.csie.ntu.edu.tw/~cjlin/libsvmtools/datasets/binary.html#news20.binary

 $ head -1000 news20.binary | sed 's/+1/1/g' | sed 's/-1/0/g' 
 news20.binary.1000
 $ sort -R news20.binary  news20.random
 $ head -1000 news20.random | sed 's/+1/1/g' | sed 's/-1/0/g' 
 news20.random.1000

 You can find the dataset in
 https://dl.dropboxusercontent.com/u/13123103/news20.random.1000
 https://dl.dropboxusercontent.com/u/13123103/news20.binary.1000


 Thanks,
 Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Makoto Yui

Hi Xiangrui,

(2014/06/18 4:58), Xiangrui Meng wrote:

How many partitions did you set? If there are too many partitions,
please do a coalesce before calling ML algorithms.


The training data news20.random.1000 is small and thus only 2 
partitions are used by the default.


val training = MLUtils.loadLibSVMFile(sc, 
hdfs://host:8020/dataset/news20-binary/news20.random.1000, 
multiclass=false).


We also tried 32 partitions as follows but the aggregate never finishes.

val training = MLUtils.loadLibSVMFile(sc, 
hdfs://host:8020/dataset/news20-binary/news20.random.1000, 
multiclass=false, numFeatures = 1354731 , minPartitions = 32)



Btw, could you try the tree branch in my repo?
https://github.com/mengxr/spark/tree/tree

I used tree aggregate in this branch. It should help with the scalability.


Is treeAggregate itself available on Spark 1.0?

I wonder.. Could I test your modification just by running the following 
code on REPL?


---
val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
  seqOp = (c, v) = (c, v) match { case ((grad, loss), (label, 
features)) =
val l = gradient.compute(features, label, weights, 
Vectors.fromBreeze(grad))

(grad, loss + l)
  },
  combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1), 
(grad2, loss2)) =

(grad1 += grad2, loss1 + loss2)
  }, 2)
-

Rebuilding Spark is quite something to do evaluation.

Thanks,
Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Hi DB,

treeReduce (treeAggregate) is a feature I'm testing now. It is a
compromise between current reduce and butterfly allReduce. The former
runs in linear time on the number of partitions, the latter introduces
too many dependencies. treeAggregate with depth = 2 should run in
O(sqrt(n)) time, where n is the number of partitions. It would be
great if someone can help test its scalability.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote:
 Hi Xiangrui,


 (2014/06/18 4:58), Xiangrui Meng wrote:

 How many partitions did you set? If there are too many partitions,
 please do a coalesce before calling ML algorithms.


 The training data news20.random.1000 is small and thus only 2 partitions
 are used by the default.

 val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false).

 We also tried 32 partitions as follows but the aggregate never finishes.

 val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false, numFeatures = 1354731 , minPartitions = 32)


 Btw, could you try the tree branch in my repo?
 https://github.com/mengxr/spark/tree/tree

 I used tree aggregate in this branch. It should help with the scalability.


 Is treeAggregate itself available on Spark 1.0?

 I wonder.. Could I test your modification just by running the following code
 on REPL?

 ---
 val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
 .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
   seqOp = (c, v) = (c, v) match { case ((grad, loss), (label,
 features)) =
 val l = gradient.compute(features, label, weights,
 Vectors.fromBreeze(grad))
 (grad, loss + l)
   },
   combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1),
 (grad2, loss2)) =
 (grad1 += grad2, loss1 + loss2)
   }, 2)
 -

 Rebuilding Spark is quite something to do evaluation.

 Thanks,
 Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Hi Makoto,

Are you using Spark 1.0 or 0.9? Could you go to the executor tab of
the web UI and check the driver's memory?

treeAggregate is not part of 1.0.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote:
 Hi DB,

 treeReduce (treeAggregate) is a feature I'm testing now. It is a
 compromise between current reduce and butterfly allReduce. The former
 runs in linear time on the number of partitions, the latter introduces
 too many dependencies. treeAggregate with depth = 2 should run in
 O(sqrt(n)) time, where n is the number of partitions. It would be
 great if someone can help test its scalability.

 Best,
 Xiangrui

 On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote:
 Hi Xiangrui,


 (2014/06/18 4:58), Xiangrui Meng wrote:

 How many partitions did you set? If there are too many partitions,
 please do a coalesce before calling ML algorithms.


 The training data news20.random.1000 is small and thus only 2 partitions
 are used by the default.

 val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false).

 We also tried 32 partitions as follows but the aggregate never finishes.

 val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false, numFeatures = 1354731 , minPartitions = 32)


 Btw, could you try the tree branch in my repo?
 https://github.com/mengxr/spark/tree/tree

 I used tree aggregate in this branch. It should help with the scalability.


 Is treeAggregate itself available on Spark 1.0?

 I wonder.. Could I test your modification just by running the following code
 on REPL?

 ---
 val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
 .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
   seqOp = (c, v) = (c, v) match { case ((grad, loss), (label,
 features)) =
 val l = gradient.compute(features, label, weights,
 Vectors.fromBreeze(grad))
 (grad, loss + l)
   },
   combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1),
 (grad2, loss2)) =
 (grad1 += grad2, loss1 + loss2)
   }, 2)
 -

 Rebuilding Spark is quite something to do evaluation.

 Thanks,
 Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread DB Tsai
Hi Xiangrui,

Does it mean that mapPartition and then reduce shares the same
behavior as aggregate operation which is O(n)?

Sincerely,

DB Tsai
---
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote:
 Hi DB,

 treeReduce (treeAggregate) is a feature I'm testing now. It is a
 compromise between current reduce and butterfly allReduce. The former
 runs in linear time on the number of partitions, the latter introduces
 too many dependencies. treeAggregate with depth = 2 should run in
 O(sqrt(n)) time, where n is the number of partitions. It would be
 great if someone can help test its scalability.

 Best,
 Xiangrui

 On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote:
 Hi Xiangrui,


 (2014/06/18 4:58), Xiangrui Meng wrote:

 How many partitions did you set? If there are too many partitions,
 please do a coalesce before calling ML algorithms.


 The training data news20.random.1000 is small and thus only 2 partitions
 are used by the default.

 val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false).

 We also tried 32 partitions as follows but the aggregate never finishes.

 val training = MLUtils.loadLibSVMFile(sc,
 hdfs://host:8020/dataset/news20-binary/news20.random.1000,
 multiclass=false, numFeatures = 1354731 , minPartitions = 32)


 Btw, could you try the tree branch in my repo?
 https://github.com/mengxr/spark/tree/tree

 I used tree aggregate in this branch. It should help with the scalability.


 Is treeAggregate itself available on Spark 1.0?

 I wonder.. Could I test your modification just by running the following code
 on REPL?

 ---
 val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
 .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
   seqOp = (c, v) = (c, v) match { case ((grad, loss), (label,
 features)) =
 val l = gradient.compute(features, label, weights,
 Vectors.fromBreeze(grad))
 (grad, loss + l)
   },
   combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1),
 (grad2, loss2)) =
 (grad1 += grad2, loss1 + loss2)
   }, 2)
 -

 Rebuilding Spark is quite something to do evaluation.

 Thanks,
 Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Makoto Yui

Hi Xiangrui,

(2014/06/18 6:03), Xiangrui Meng wrote:

Are you using Spark 1.0 or 0.9? Could you go to the executor tab of
the web UI and check the driver's memory?


I am using Spark 1.0.

588.8 MB is allocated for driver RDDs.
I am setting SPARK_DRIVER_MEMORY=2g in the conf/spark-env.sh.

The value allocated for driver RDDs in the web UI was not changed by 
doing as follows:

$ SPARK_DRIVER_MEMORY=6g bin/spark-shell

I set -verbose:gc but full GC (or continuous GCs) does not happen 
during the aggregate at the driver.


Thanks,
Makoto


Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Debasish Das
Xiangrui,

Could you point to the JIRA related to tree aggregate ? ...sounds like the
allreduce idea...

I would definitely like to try it on our dataset...

Makoto,

I did run pretty big sparse dataset (20M rows, 3M sparse features) and I
got 100 iterations of SGD running in 200 seconds...10 executors each with
16 GB memory...

Although the best result on the same dataset came out of liblinear and
BFGS-L1 out of box...so I did not tune the SGD further on learning rate and
other heuristics...it was arnd 5% off...

Thanks.
Deb



On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai dbt...@stanford.edu wrote:

 Hi Xiangrui,

 Does it mean that mapPartition and then reduce shares the same
 behavior as aggregate operation which is O(n)?

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote:
  Hi DB,
 
  treeReduce (treeAggregate) is a feature I'm testing now. It is a
  compromise between current reduce and butterfly allReduce. The former
  runs in linear time on the number of partitions, the latter introduces
  too many dependencies. treeAggregate with depth = 2 should run in
  O(sqrt(n)) time, where n is the number of partitions. It would be
  great if someone can help test its scalability.
 
  Best,
  Xiangrui
 
  On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote:
  Hi Xiangrui,
 
 
  (2014/06/18 4:58), Xiangrui Meng wrote:
 
  How many partitions did you set? If there are too many partitions,
  please do a coalesce before calling ML algorithms.
 
 
  The training data news20.random.1000 is small and thus only 2
 partitions
  are used by the default.
 
  val training = MLUtils.loadLibSVMFile(sc,
  hdfs://host:8020/dataset/news20-binary/news20.random.1000,
  multiclass=false).
 
  We also tried 32 partitions as follows but the aggregate never finishes.
 
  val training = MLUtils.loadLibSVMFile(sc,
  hdfs://host:8020/dataset/news20-binary/news20.random.1000,
  multiclass=false, numFeatures = 1354731 , minPartitions = 32)
 
 
  Btw, could you try the tree branch in my repo?
  https://github.com/mengxr/spark/tree/tree
 
  I used tree aggregate in this branch. It should help with the
 scalability.
 
 
  Is treeAggregate itself available on Spark 1.0?
 
  I wonder.. Could I test your modification just by running the following
 code
  on REPL?
 
  ---
  val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 +
 i)
  .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
seqOp = (c, v) = (c, v) match { case ((grad, loss), (label,
  features)) =
  val l = gradient.compute(features, label, weights,
  Vectors.fromBreeze(grad))
  (grad, loss + l)
},
combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1),
  (grad2, loss2)) =
  (grad1 += grad2, loss1 + loss2)
}, 2)
  -
 
  Rebuilding Spark is quite something to do evaluation.
 
  Thanks,
  Makoto



Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
DB, Yes, reduce and aggregate are linear.

Makoto, dense vectors are used to in aggregation. If you have 32
partitions and each one sending a dense vector of size 1,354,731 to
master. Then the driver needs 300M+. That may be the problem. Which
deploy mode are you using, standalone or local?

Debasish, there is an old PR for butterfly allreduce. However, it
doesn't seem to be the right way to go for Spark. I just sent out the
PR: https://github.com/apache/spark/pull/1110 . This is a WIP and it
needs more testing before we are confident to merge it. It would be
great if you can help test it.

Best,
Xiangrui

On Tue, Jun 17, 2014 at 2:33 PM, Debasish Das debasish.da...@gmail.com wrote:
 Xiangrui,

 Could you point to the JIRA related to tree aggregate ? ...sounds like the
 allreduce idea...

 I would definitely like to try it on our dataset...

 Makoto,

 I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got
 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB
 memory...

 Although the best result on the same dataset came out of liblinear and
 BFGS-L1 out of box...so I did not tune the SGD further on learning rate and
 other heuristics...it was arnd 5% off...

 Thanks.
 Deb



 On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai dbt...@stanford.edu wrote:

 Hi Xiangrui,

 Does it mean that mapPartition and then reduce shares the same
 behavior as aggregate operation which is O(n)?

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote:
  Hi DB,
 
  treeReduce (treeAggregate) is a feature I'm testing now. It is a
  compromise between current reduce and butterfly allReduce. The former
  runs in linear time on the number of partitions, the latter introduces
  too many dependencies. treeAggregate with depth = 2 should run in
  O(sqrt(n)) time, where n is the number of partitions. It would be
  great if someone can help test its scalability.
 
  Best,
  Xiangrui
 
  On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote:
  Hi Xiangrui,
 
 
  (2014/06/18 4:58), Xiangrui Meng wrote:
 
  How many partitions did you set? If there are too many partitions,
  please do a coalesce before calling ML algorithms.
 
 
  The training data news20.random.1000 is small and thus only 2
  partitions
  are used by the default.
 
  val training = MLUtils.loadLibSVMFile(sc,
  hdfs://host:8020/dataset/news20-binary/news20.random.1000,
  multiclass=false).
 
  We also tried 32 partitions as follows but the aggregate never
  finishes.
 
  val training = MLUtils.loadLibSVMFile(sc,
  hdfs://host:8020/dataset/news20-binary/news20.random.1000,
  multiclass=false, numFeatures = 1354731 , minPartitions = 32)
 
 
  Btw, could you try the tree branch in my repo?
  https://github.com/mengxr/spark/tree/tree
 
  I used tree aggregate in this branch. It should help with the
  scalability.
 
 
  Is treeAggregate itself available on Spark 1.0?
 
  I wonder.. Could I test your modification just by running the following
  code
  on REPL?
 
  ---
  val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 +
  i)
  .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
seqOp = (c, v) = (c, v) match { case ((grad, loss), (label,
  features)) =
  val l = gradient.compute(features, label, weights,
  Vectors.fromBreeze(grad))
  (grad, loss + l)
},
combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1),
  (grad2, loss2)) =
  (grad1 += grad2, loss1 + loss2)
}, 2)
  -
 
  Rebuilding Spark is quite something to do evaluation.
 
  Thanks,
  Makoto




Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Xiangrui Meng
Makoto, please use --driver-memory 8G when you launch spark-shell. -Xiangrui

On Tue, Jun 17, 2014 at 4:49 PM, Xiangrui Meng men...@gmail.com wrote:
 DB, Yes, reduce and aggregate are linear.

 Makoto, dense vectors are used to in aggregation. If you have 32
 partitions and each one sending a dense vector of size 1,354,731 to
 master. Then the driver needs 300M+. That may be the problem. Which
 deploy mode are you using, standalone or local?

 Debasish, there is an old PR for butterfly allreduce. However, it
 doesn't seem to be the right way to go for Spark. I just sent out the
 PR: https://github.com/apache/spark/pull/1110 . This is a WIP and it
 needs more testing before we are confident to merge it. It would be
 great if you can help test it.

 Best,
 Xiangrui

 On Tue, Jun 17, 2014 at 2:33 PM, Debasish Das debasish.da...@gmail.com 
 wrote:
 Xiangrui,

 Could you point to the JIRA related to tree aggregate ? ...sounds like the
 allreduce idea...

 I would definitely like to try it on our dataset...

 Makoto,

 I did run pretty big sparse dataset (20M rows, 3M sparse features) and I got
 100 iterations of SGD running in 200 seconds...10 executors each with 16 GB
 memory...

 Although the best result on the same dataset came out of liblinear and
 BFGS-L1 out of box...so I did not tune the SGD further on learning rate and
 other heuristics...it was arnd 5% off...

 Thanks.
 Deb



 On Tue, Jun 17, 2014 at 2:09 PM, DB Tsai dbt...@stanford.edu wrote:

 Hi Xiangrui,

 Does it mean that mapPartition and then reduce shares the same
 behavior as aggregate operation which is O(n)?

 Sincerely,

 DB Tsai
 ---
 My Blog: https://www.dbtsai.com
 LinkedIn: https://www.linkedin.com/in/dbtsai


 On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng men...@gmail.com wrote:
  Hi DB,
 
  treeReduce (treeAggregate) is a feature I'm testing now. It is a
  compromise between current reduce and butterfly allReduce. The former
  runs in linear time on the number of partitions, the latter introduces
  too many dependencies. treeAggregate with depth = 2 should run in
  O(sqrt(n)) time, where n is the number of partitions. It would be
  great if someone can help test its scalability.
 
  Best,
  Xiangrui
 
  On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui yuin...@gmail.com wrote:
  Hi Xiangrui,
 
 
  (2014/06/18 4:58), Xiangrui Meng wrote:
 
  How many partitions did you set? If there are too many partitions,
  please do a coalesce before calling ML algorithms.
 
 
  The training data news20.random.1000 is small and thus only 2
  partitions
  are used by the default.
 
  val training = MLUtils.loadLibSVMFile(sc,
  hdfs://host:8020/dataset/news20-binary/news20.random.1000,
  multiclass=false).
 
  We also tried 32 partitions as follows but the aggregate never
  finishes.
 
  val training = MLUtils.loadLibSVMFile(sc,
  hdfs://host:8020/dataset/news20-binary/news20.random.1000,
  multiclass=false, numFeatures = 1354731 , minPartitions = 32)
 
 
  Btw, could you try the tree branch in my repo?
  https://github.com/mengxr/spark/tree/tree
 
  I used tree aggregate in this branch. It should help with the
  scalability.
 
 
  Is treeAggregate itself available on Spark 1.0?
 
  I wonder.. Could I test your modification just by running the following
  code
  on REPL?
 
  ---
  val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 +
  i)
  .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
seqOp = (c, v) = (c, v) match { case ((grad, loss), (label,
  features)) =
  val l = gradient.compute(features, label, weights,
  Vectors.fromBreeze(grad))
  (grad, loss + l)
},
combOp = (c1, c2) = (c1, c2) match { case ((grad1, loss1),
  (grad2, loss2)) =
  (grad1 += grad2, loss1 + loss2)
}, 2)
  -
 
  Rebuilding Spark is quite something to do evaluation.
 
  Thanks,
  Makoto




Re: news20-binary classification with LogisticRegressionWithSGD

2014-06-17 Thread Makoto Yui

Hi Xiangrui,

(2014/06/18 8:49), Xiangrui Meng wrote:

Makoto, dense vectors are used to in aggregation. If you have 32
partitions and each one sending a dense vector of size 1,354,731 to
master. Then the driver needs 300M+. That may be the problem.


It seems that it could cuase certain problems for a convex optimization 
of large training data and a merging tree, like allreduce, would help to 
reduce memory requirements (though time for aggregation might increase).



Which deploy mode are you using, standalone or local?


Standalone.

Setting -driver-memory 8G was not solved the aggregate problem.
Aggregation never finishes.

`ps aux | grep spark` on master is as follows:

myui  7049 79.3  1.1 8768868 592348 pts/2  Sl+  11:10   0:14 
/usr/java/jdk1.7/bin/java -cp 
::/opt/spark-1.0.0/conf:/opt/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop0.20.2-cdh3u6.jar:/usr/lib/hadoop-0.20/conf 
-XX:MaxPermSize=128m -verbose:gc -XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps -Djava.library.path= -Xms2g -Xmx2g 
org.apache.spark.deploy.SparkSubmit spark-shell --driver-memory 8G 
--class org.apache.spark.repl.Main


myui  5694  2.5  0.5 6868296 292572 pts/2  Sl   10:59   0:17 
/usr/java/jdk1.7/bin/java -cp 
::/opt/spark-1.0.0/conf:/opt/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop0.20.2-cdh3u6.jar:/usr/lib/hadoop-0.20/conf 
-XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms512m 
-Xmx512m org.apache.spark.deploy.master.Master --ip 10.0.0.1 --port 7077 
--webui-port 8081



Exporting SPARK_DAEMON_MEMORY=4g in spark-env.sh did not take effect for 
the evaluation.


`ps aux | grep spark`
/usr/java/jdk1.7/bin/java -cp 
::/opt/spark-1.0.0/conf:/opt/spark-1.0.0/assembly/target/scala-2.10/spark-assembly-1.0.0-hadoop0.20.2-cdh3u6.jar:/usr/lib/hadoop-0.20/conf 
-XX:MaxPermSize=128m -Dspark.akka.logLifecycleEvents=true -Xms4g -Xmx4g 
org.apache.spark.deploy.master.Master --ip 10.0.0.1 --port 7077 
--webui-port 8081

...


Thanks,
Makoto