How about increase HDFS file extent size? like current value is 128M, we make it 512M or bigger.
On Tue, Aug 12, 2014 at 11:46 AM, ZHENG, Xu-dong <dong...@gmail.com> wrote: > Hi all, > > We are trying to use Spark MLlib to train super large data (100M features > and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib > will create a task for every partition at each iteration. But because our > dimensions are also very high, such large number of tasks will increase > large network overhead to transfer the weight vector. So we want to reduce > the number of tasks, we tried below ways: > > 1. Coalesce partitions without shuffling, then cache. > > data.coalesce(numPartitions).cache() > > This works fine for relative small data, but when data is increasing and > numPartitions is fixed, the size of one partition will be large. This > introduces two issues: the first is, the larger partition will need larger > object and more memory at runtime, and trigger GC more frequently; the > second is, we meet the issue 'size exceeds integer.max_value' error, which > seems be caused by the size of one partition larger than 2G ( > https://issues.apache.org/jira/browse/SPARK-1391). > > 2. Coalesce partitions with shuffling, then cache. > > data.coalesce(numPartitions, true).cache() > > It could mitigate the second issue in #1 at some degree, but fist issue is > still there, and it also will introduce large amount of shullfling. > > 3. Cache data first, and coalesce partitions. > > data.cache().coalesce(numPartitions) > > In this way, the number of cached partitions is not change, but each task > read the data from multiple partitions. However, I find the task will loss > locality by this way. I find a lot of 'ANY' tasks, that means that tasks > read data from other nodes, and become slower than that read data from > local memory. > > I think the best way should like #3, but leverage locality as more as > possible. Is there any way to do that? Any suggestions? > > Thanks! > > -- > ZHENG, Xu-dong > >