Hi Xiangrui, Thanks for your treeAggregate patch. It is very helpful. After applying your patch in my local repos, the new spark can handle more partition than before. But after some iteration(mapPartition + reduceByKey), the reducer seems become more slower and finally hang.
The logs shows there always 1 message pending in the outbox, and we are waiting for it. Are you aware this kind issue? How can I know which message is pending? Where is it supposed to go? Log: 14/07/25 17:49:54 INFO storage.BlockManager: Found block rdd_2_158 locally 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register 14/07/25 17:50:03 INFO executor.Executor: Serialized size of result for 302 is 752 14/07/25 17:50:03 INFO executor.Executor: Sending result for 302 directly to driver 14/07/25 17:50:03 INFO executor.Executor: Finished task ID 302 14/07/25 17:50:34 INFO network.ConnectionManager: Accepted connection from [*********/**********] 14/07/25 17:50:34 INFO network.SendingConnection: Initiating connection to [********/************] 14/07/25 17:50:34 INFO network.SendingConnection: Connected to [********/********], 1 messages pending 14/07/25 17:51:28 INFO storage.ShuffleBlockManager: Deleted all files for shuffle 0 14/07/25 17:51:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 742 14/07/25 17:51:37 INFO executor.Executor: Running task ID 742 14/07/25 17:51:37 INFO storage.BlockManager: Found block broadcast_1 locally 14/07/25 17:51:38 INFO spark.MapOutputTrackerWorker: Updating epoch to 1 and clearing cache 14/07/25 17:51:38 INFO spark.SparkRegistry: Using kryo with register 14/07/25 17:51:38 INFO storage.BlockManager: Found block rdd_2_158 locally 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register 14/07/25 17:51:48 INFO executor.Executor: Serialized size of result for 742 is 752 14/07/25 17:51:48 INFO executor.Executor: Sending result for 742 directly to driver 14/07/25 17:51:48 INFO executor.Executor: Finished task ID 742 <—— I have shutdown the App 14/07/25 18:16:36 INFO executor.CoarseGrainedExecutorBackend: Driver commanded a shutdown On Jul 2, 2014, at 0:08, Xiangrui Meng <men...@gmail.com> wrote: > Try to reduce number of partitions to match the number of cores. We > will add treeAggregate to reduce the communication cost. > > PR: https://github.com/apache/spark/pull/1110 > > -Xiangrui > > On Tue, Jul 1, 2014 at 12:55 AM, Charles Li <littlee1...@gmail.com> wrote: >> Hi Spark, >> >> I am running LBFGS on our user data. The data size with Kryo serialisation >> is about 210G. The weight size is around 1,300,000. I am quite confused that >> the performance is very close whether the data is cached or not. >> >> The program is simple: >> points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..) >> points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not >> cached >> gradient = new LogisticGrandient(); >> updater = new SquaredL2Updater(); >> initWeight = Vectors.sparse(size, new int[]{}, new double[]{}) >> result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections, >> convergeTol, maxIter, regParam, initWeight); >> >> I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its >> cluster mode. Below are some arguments I am using: >> —executor-memory 10G >> —num-executors 50 >> —executor-cores 2 >> >> Storage Using: >> When caching: >> Cached Partitions 951 >> Fraction Cached 100% >> Size in Memory 215.7GB >> Size in Tachyon 0.0B >> Size on Disk 1029.7MB >> >> The time cost by every aggregate is around 5 minutes with cache enabled. >> Lots of disk IOs can be seen on the hadoop node. I have the same result with >> cache disabled. >> >> Should data points caching improve the performance? Should caching decrease >> the disk IO? >> >> Thanks in advance.