Hi all, We are trying to use Spark MLlib to train super large data (100M features and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib will create a task for every partition at each iteration. But because our dimensions are also very high, such large number of tasks will increase large network overhead to transfer the weight vector. So we want to reduce the number of tasks, we tried below ways:
1. Coalesce partitions without shuffling, then cache. data.coalesce(numPartitions).cache() This works fine for relative small data, but when data is increasing and numPartitions is fixed, the size of one partition will be large. This introduces two issues: the first is, the larger partition will need larger object and more memory at runtime, and trigger GC more frequently; the second is, we meet the issue 'size exceeds integer.max_value' error, which seems be caused by the size of one partition larger than 2G ( https://issues.apache.org/jira/browse/SPARK-1391). 2. Coalesce partitions with shuffling, then cache. data.coalesce(numPartitions, true).cache() It could mitigate the second issue in #1 at some degree, but fist issue is still there, and it also will introduce large amount of shullfling. 3. Cache data first, and coalesce partitions. data.cache().coalesce(numPartitions) In this way, the number of cached partitions is not change, but each task read the data from multiple partitions. However, I find the task will loss locality by this way. I find a lot of 'ANY' tasks, that means that tasks read data from other nodes, and become slower than that read data from local memory. I think the best way should like #3, but leverage locality as more as possible. Is there any way to do that? Any suggestions? Thanks! -- ZHENG, Xu-dong