Hi all,

We are trying to use Spark MLlib to train super large data (100M features
and 5B rows). The input data in HDFS has ~26K partitions. By default, MLlib
will create a task for every partition at each iteration. But because our
dimensions are also very high, such large number of tasks will increase
large network overhead to transfer the weight vector. So we want to reduce
the number of tasks, we tried below ways:

1. Coalesce partitions without shuffling, then cache.

data.coalesce(numPartitions).cache()

This works fine for relative small data, but when data is increasing and
numPartitions is fixed, the size of one partition will be large. This
introduces two issues: the first is, the larger partition will need larger
object and more memory at runtime, and trigger GC more frequently; the
second is, we meet the issue 'size exceeds integer.max_value' error, which
seems be caused by the size of one partition larger than 2G (
https://issues.apache.org/jira/browse/SPARK-1391).

2. Coalesce partitions with shuffling, then cache.

data.coalesce(numPartitions, true).cache()

It could mitigate the second issue in #1 at some degree, but fist issue is
still there, and it also will introduce large amount of shullfling.

3. Cache data first, and coalesce partitions.

data.cache().coalesce(numPartitions)

In this way, the number of cached partitions is not change, but each task
read the data from multiple partitions. However, I find the task will loss
locality by this way. I find a lot of 'ANY' tasks, that means that tasks
read data from other nodes, and become slower than that read data from
local memory.

I think the best way should like #3, but leverage locality as more as
possible. Is there any way to do that? Any suggestions?

Thanks!

-- 
ZHENG, Xu-dong

Reply via email to