Hi Xiangrui,

Does it mean that mapPartition and then reduce shares the same
behavior as aggregate operation which is O(n)?

Sincerely,

DB Tsai
-------------------------------------------------------
My Blog: https://www.dbtsai.com
LinkedIn: https://www.linkedin.com/in/dbtsai


On Tue, Jun 17, 2014 at 2:00 PM, Xiangrui Meng <men...@gmail.com> wrote:
> Hi DB,
>
> treeReduce (treeAggregate) is a feature I'm testing now. It is a
> compromise between current reduce and butterfly allReduce. The former
> runs in linear time on the number of partitions, the latter introduces
> too many dependencies. treeAggregate with depth = 2 should run in
> O(sqrt(n)) time, where n is the number of partitions. It would be
> great if someone can help test its scalability.
>
> Best,
> Xiangrui
>
> On Tue, Jun 17, 2014 at 1:32 PM, Makoto Yui <yuin...@gmail.com> wrote:
>> Hi Xiangrui,
>>
>>
>> (2014/06/18 4:58), Xiangrui Meng wrote:
>>>
>>> How many partitions did you set? If there are too many partitions,
>>> please do a coalesce before calling ML algorithms.
>>
>>
>> The training data "news20.random.1000" is small and thus only 2 partitions
>> are used by the default.
>>
>> val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false).
>>
>> We also tried 32 partitions as follows but the aggregate never finishes.
>>
>> val training = MLUtils.loadLibSVMFile(sc,
>> "hdfs://host:8020/dataset/news20-binary/news20.random.1000",
>> multiclass=false, numFeatures = 1354731 , minPartitions = 32)
>>
>>
>>> Btw, could you try the tree branch in my repo?
>>> https://github.com/mengxr/spark/tree/tree
>>>
>>> I used tree aggregate in this branch. It should help with the scalability.
>>
>>
>> Is treeAggregate itself available on Spark 1.0?
>>
>> I wonder.. Could I test your modification just by running the following code
>> on REPL?
>>
>> -------------------
>> val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
>>         .treeAggregate((BDV.zeros[Double](weights.size), 0.0))(
>>           seqOp = (c, v) => (c, v) match { case ((grad, loss), (label,
>> features)) =>
>>             val l = gradient.compute(features, label, weights,
>> Vectors.fromBreeze(grad))
>>             (grad, loss + l)
>>           },
>>           combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1),
>> (grad2, loss2)) =>
>>             (grad1 += grad2, loss1 + loss2)
>>           }, 2)
>> -------------------------
>>
>> Rebuilding Spark is quite something to do evaluation.
>>
>> Thanks,
>> Makoto

Reply via email to