Hi Xiangrui,

Thanks for your treeAggregate patch. It is very helpful.
After applying your patch in my local repos, the new spark can handle more 
partition than before.
But after some iteration(mapPartition + reduceByKey), the reducer seems become 
more slower and finally hang.

The logs shows there always 1 message pending in the outbox, and we are waiting 
for it. Are you aware this kind issue?
How can I know which message is pending?  Where is it supposed to go?

Log:

14/07/25 17:49:54 INFO storage.BlockManager: Found block rdd_2_158 locally
14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:50:03 INFO executor.Executor: Serialized size of result for 302 is 
752
14/07/25 17:50:03 INFO executor.Executor: Sending result for 302 directly to 
driver
14/07/25 17:50:03 INFO executor.Executor: Finished task ID 302
14/07/25 17:50:34 INFO network.ConnectionManager: Accepted connection from 
[*********/**********]
14/07/25 17:50:34 INFO network.SendingConnection: Initiating connection to 
[********/************]
14/07/25 17:50:34 INFO network.SendingConnection: Connected to 
[********/********], 1 messages pending
14/07/25 17:51:28 INFO storage.ShuffleBlockManager: Deleted all files for 
shuffle 0
14/07/25 17:51:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 
742
14/07/25 17:51:37 INFO executor.Executor: Running task ID 742
14/07/25 17:51:37 INFO storage.BlockManager: Found block broadcast_1 locally
14/07/25 17:51:38 INFO spark.MapOutputTrackerWorker: Updating epoch to 1 and 
clearing cache
14/07/25 17:51:38 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:51:38 INFO storage.BlockManager: Found block rdd_2_158 locally
14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:51:48 INFO executor.Executor: Serialized size of result for 742 is 
752
14/07/25 17:51:48 INFO executor.Executor: Sending result for 742 directly to 
driver
14/07/25 17:51:48 INFO executor.Executor: Finished task ID 742
<—— I have shutdown the App
14/07/25 18:16:36 INFO executor.CoarseGrainedExecutorBackend: Driver commanded 
a shutdown

On Jul 2, 2014, at 0:08, Xiangrui Meng <men...@gmail.com> wrote:

> Try to reduce number of partitions to match the number of cores. We
> will add treeAggregate to reduce the communication cost.
> 
> PR: https://github.com/apache/spark/pull/1110
> 
> -Xiangrui
> 
> On Tue, Jul 1, 2014 at 12:55 AM, Charles Li <littlee1...@gmail.com> wrote:
>> Hi Spark,
>> 
>> I am running LBFGS on our user data. The data size with Kryo serialisation 
>> is about 210G. The weight size is around 1,300,000. I am quite confused that 
>> the performance is very close whether the data is cached or not.
>> 
>> The program is simple:
>> points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..)
>> points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not 
>> cached
>> gradient = new LogisticGrandient();
>> updater = new SquaredL2Updater();
>> initWeight = Vectors.sparse(size, new int[]{}, new double[]{})
>> result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections, 
>> convergeTol, maxIter, regParam, initWeight);
>> 
>> I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its 
>> cluster mode. Below are some arguments I am using:
>> —executor-memory 10G
>> —num-executors 50
>> —executor-cores 2
>> 
>> Storage Using:
>> When caching:
>> Cached Partitions 951
>> Fraction Cached 100%
>> Size in Memory 215.7GB
>> Size in Tachyon 0.0B
>> Size on Disk 1029.7MB
>> 
>> The time cost by every aggregate is around 5 minutes with cache enabled. 
>> Lots of disk IOs can be seen on the hadoop node. I have the same result with 
>> cache disabled.
>> 
>> Should data points caching improve the performance? Should caching decrease 
>> the disk IO?
>> 
>> Thanks in advance.

Reply via email to