Hi Xiangrui, Does it mean that mapPartition and then reduce shares the same behavior as aggregate operation which is O(n)?
Sincerely, DB Tsai ------------------------------------------------------- My Blog: https://www.dbtsai.com LinkedIn: https://www.linkedin.com/in/dbtsai On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng <men...@gmail.com> wrote: > Hi DB, > > treeReduce (treeAggregate) is a feature I'm testing now. It is a > compromise between current reduce and butterfly allReduce. The former > runs in linear time on the number of partitions, the latter introduces > too many dependencies. treeAggregate with depth = 2 should run in > O(sqrt(n)) time, where n is the number of partitions. It would be > great if someone can help test its scalability. > > Best, > Xiangrui > > On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui <yuin...@gmail.com> wrote: >> Hi Xiangrui, >> >> >> (2014/06/18 4:58), Xiangrui Meng wrote: >>> >>> How many partitions did you set? If there are too many partitions, >>> please do a coalesce before calling ML algorithms. >> >> >> The training data "news20.random.1000" is small and thus only 2 partitions >> are used by the default. >> >> val training = MLUtils.loadLibSVMFile(sc, >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000", >> multiclass=false). >> >> We also tried 32 partitions as follows but the aggregate never finishes. >> >> val training = MLUtils.loadLibSVMFile(sc, >> "hdfs://host:8020/dataset/news20-binary/news20.random.1000", >> multiclass=false, numFeatures = 1354731 , minPartitions = 32) >> >> >>> Btw, could you try the tree branch in my repo? >>> https://github.com/mengxr/spark/tree/tree >>> >>> I used tree aggregate in this branch. It should help with the scalability. >> >> >> Is treeAggregate itself available on Spark 1.0? >> >> I wonder.. Could I test your modification just by running the following code >> on REPL? >> >> ------------------- >> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i) >> .treeAggregate((BDV.zeros[Double](weights.size), 0.0))( >> seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, >> features)) => >> val l = gradient.compute(features, label, weights, >> Vectors.fromBreeze(grad)) >> (grad, loss + l) >> }, >> combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), >> (grad2, loss2)) => >> (grad1 += grad2, loss1 + loss2) >> }, 2) >> ------------------------- >> >> Rebuilding Spark is quite something to do evaluation. >> >> Thanks, >> Makoto