Assuming that your data is very sparse, I would recommend
RDD.repartition. But if it is not the case and you don't want to
shuffle the data, you can try a CombineInputFormat and then parse the
lines into labeled points. Coalesce may cause locality problems if you
didn't use the right number of partitions. -Xiangrui

On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <dong...@gmail.com> wrote:
> I think this has the same effect and issue with #1, right?
>
>
> On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen <chenjiush...@gmail.com>
> wrote:
>>
>> How about increase HDFS file extent size? like current value is 128M, we
>> make it 512M or bigger.
>>
>>
>> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong <dong...@gmail.com>
>> wrote:
>>>
>>> Hi all,
>>>
>>> We are trying to use Spark MLlib to train super large data (100M features
>>> and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib
>>> will create a task for every partition at each iteration. But because our
>>> dimensions are also very high, such large number of tasks will increase
>>> large network overhead to transfer the weight vector. So we want to reduce
>>> the number of tasks, we tried below ways:
>>>
>>> 1. Coalesce partitions without shuffling, then cache.
>>>
>>> data.coalesce(numPartitions).cache()
>>>
>>> This works fine for relative small data, but when data is increasing and
>>> numPartitions is fixed, the size of one partition will be large. This
>>> introduces two issues: the first is, the larger partition will need larger
>>> object and more memory at runtime, and trigger GC more frequently; the
>>> second is, we meet the issue 'size exceeds integer.max_value' error, which
>>> seems be caused by the size of one partition larger than 2G
>>> (https://issues.apache.org/jira/browse/SPARK-1391).
>>>
>>> 2. Coalesce partitions with shuffling, then cache.
>>>
>>> data.coalesce(numPartitions, true).cache()
>>>
>>> It could mitigate the second issue in #1 at some degree, but fist issue
>>> is still there, and it also will introduce large amount of shullfling.
>>>
>>> 3. Cache data first, and coalesce partitions.
>>>
>>> data.cache().coalesce(numPartitions)
>>>
>>> In this way, the number of cached partitions is not change, but each task
>>> read the data from multiple partitions. However, I find the task will loss
>>> locality by this way. I find a lot of 'ANY' tasks, that means that tasks
>>> read data from other nodes, and become slower than that read data from local
>>> memory.
>>>
>>> I think the best way should like #3, but leverage locality as more as
>>> possible. Is there any way to do that? Any suggestions?
>>>
>>> Thanks!
>>>
>>> --
>>> ZHENG, Xu-dong
>>>
>>
>
>
>
> --
> 郑旭东
> ZHENG, Xu-dong
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to