Assuming that your data is very sparse, I would recommend RDD.repartition. But if it is not the case and you don't want to shuffle the data, you can try a CombineInputFormat and then parse the lines into labeled points. Coalesce may cause locality problems if you didn't use the right number of partitions. -Xiangrui
On Mon, Aug 11, 2014 at 10:39 PM, ZHENG, Xu-dong <dong...@gmail.com> wrote: > I think this has the same effect and issue with #1, right? > > > On Tue, Aug 12, 2014 at 1:08 PM, Jiusheng Chen <chenjiush...@gmail.com> > wrote: >> >> How about increase HDFS file extent size? like current value is 128M, we >> make it 512M or bigger. >> >> >> On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong <dong...@gmail.com> >> wrote: >>> >>> Hi all, >>> >>> We are trying to use Spark MLlib to train super large data (100M features >>> and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib >>> will create a task for every partition at each iteration. But because our >>> dimensions are also very high, such large number of tasks will increase >>> large network overhead to transfer the weight vector. So we want to reduce >>> the number of tasks, we tried below ways: >>> >>> 1. Coalesce partitions without shuffling, then cache. >>> >>> data.coalesce(numPartitions).cache() >>> >>> This works fine for relative small data, but when data is increasing and >>> numPartitions is fixed, the size of one partition will be large. This >>> introduces two issues: the first is, the larger partition will need larger >>> object and more memory at runtime, and trigger GC more frequently; the >>> second is, we meet the issue 'size exceeds integer.max_value' error, which >>> seems be caused by the size of one partition larger than 2G >>> (https://issues.apache.org/jira/browse/SPARK-1391). >>> >>> 2. Coalesce partitions with shuffling, then cache. >>> >>> data.coalesce(numPartitions, true).cache() >>> >>> It could mitigate the second issue in #1 at some degree, but fist issue >>> is still there, and it also will introduce large amount of shullfling. >>> >>> 3. Cache data first, and coalesce partitions. >>> >>> data.cache().coalesce(numPartitions) >>> >>> In this way, the number of cached partitions is not change, but each task >>> read the data from multiple partitions. However, I find the task will loss >>> locality by this way. I find a lot of 'ANY' tasks, that means that tasks >>> read data from other nodes, and become slower than that read data from local >>> memory. >>> >>> I think the best way should like #3, but leverage locality as more as >>> possible. Is there any way to do that? Any suggestions? >>> >>> Thanks! >>> >>> -- >>> ZHENG, Xu-dong >>> >> > > > > -- > 郑旭东 > ZHENG, Xu-dong > --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org