Re: Questions about disk IOs

2014-07-25 Thread Charles Li
Hi Xiangrui,

I have 16 * 40 cpu cores in total. But I am only using 200 partitions on the 
200 executors. I use coalesce without shuffle to reduce the default partition 
of RDD.

The shuffle size from the WebUI is nearly 100m.

On Jul 25, 2014, at 23:51, Xiangrui Meng  wrote:

> How many partitions did you use and how many CPU cores in total? The
> former shouldn't be much larger than the latter. Could you also check
> the shuffle size from the WebUI? -Xiangrui
> 
> On Fri, Jul 25, 2014 at 4:10 AM, Charles Li  wrote:
>> Hi Xiangrui,
>> 
>> Thanks for your treeAggregate patch. It is very helpful.
>> After applying your patch in my local repos, the new spark can handle more 
>> partition than before.
>> But after some iteration(mapPartition + reduceByKey), the reducer seems 
>> become more slower and finally hang.
>> 
>> The logs shows there always 1 message pending in the outbox, and we are 
>> waiting for it. Are you aware this kind issue?
>> How can I know which message is pending?  Where is it supposed to go?
>> 
>> Log:
>> 
>> 14/07/25 17:49:54 INFO storage.BlockManager: Found block rdd_2_158 locally
>> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:50:03 INFO executor.Executor: Serialized size of result for 302 
>> is 752
>> 14/07/25 17:50:03 INFO executor.Executor: Sending result for 302 directly to 
>> driver
>> 14/07/25 17:50:03 INFO executor.Executor: Finished task ID 302
>> 14/07/25 17:50:34 INFO network.ConnectionManager: Accepted connection from 
>> [*/**]
>> 14/07/25 17:50:34 INFO network.SendingConnection: Initiating connection to 
>> [/]
>> 14/07/25 17:50:34 INFO network.SendingConnection: Connected to 
>> [/], 1 messages pending
>> 14/07/25 17:51:28 INFO storage.ShuffleBlockManager: Deleted all files for 
>> shuffle 0
>> 14/07/25 17:51:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
>> task 742
>> 14/07/25 17:51:37 INFO executor.Executor: Running task ID 742
>> 14/07/25 17:51:37 INFO storage.BlockManager: Found block broadcast_1 locally
>> 14/07/25 17:51:38 INFO spark.MapOutputTrackerWorker: Updating epoch to 1 and 
>> clearing cache
>> 14/07/25 17:51:38 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:38 INFO storage.BlockManager: Found block rdd_2_158 locally
>> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
>> 14/07/25 17:51:48 INFO executor.Executor: Serialized size of result for 742 
>> is 752
>> 14/07/25 17:51:48 INFO executor.Executor: Sending result for 742 directly to 
>> driver
>> 14/07/25 17:51:48 INFO executor.Executor: Finished task ID 742
>> <—— I have shutdown the App
>> 14/07/25 18:16:36 INFO executor.CoarseGrainedExecutorBackend: Driver 
>> commanded a shutdown
>> 
>> On Jul 2, 2014, at 0:08, Xiangrui Meng  wrote:
>> 
>>> Try to reduce number of partitions to match the number of cores. We
>>> will add treeAggregate to reduce the communication cost.
>>> 
>>> PR: https://github.com/apache/spark/pull/1110
>>> 
>>> -Xiangrui
>>> 
>>> On Tue, Jul 1, 2014 at 12:55 AM, Charles Li  wrote:
 Hi Spark,
 
 I am running LBFGS on our user data. The data size with Kryo serialisation 
 is about 210G. The weight size is around 1,300,000. I am quite confused 
 that the performance is very close whether the data is cached or not.
 
 The program is simple:
 points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..)
 points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not 
 cached
 gradient = new LogisticGrandient();
 updater = new SquaredL2Updater();
 initWeight = Vectors.sparse(size, new int[]{}, new double[]{})
 result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections, 
 convergeTol, maxIter, regParam, initWeight);
 
 I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its 
 cluster mode. Below are some arguments I am using:
 —executor-memory 10G
 —num-executors 50
 —executor-cores 2
 
 Storage Using:
 When caching:
 Cached Partitions 951
 Fraction Cached 100%
 Size in Memory 215.7GB
 Size in Tachyon 0.0B
 Size on Disk 1029.7MB
 
 The time cost by every aggregate is around 5 minutes with cache enabled. 
 Lots of disk IOs can be seen on the hadoop node. I have the same result 
 with cache disabled.
 
 Should data points caching improve the performance? Should caching 
 decrease the disk IO?
 
 Thanks in advance.
>> 



Re: Questions about disk IOs

2014-07-25 Thread Xiangrui Meng
How many partitions did you use and how many CPU cores in total? The
former shouldn't be much larger than the latter. Could you also check
the shuffle size from the WebUI? -Xiangrui

On Fri, Jul 25, 2014 at 4:10 AM, Charles Li  wrote:
> Hi Xiangrui,
>
> Thanks for your treeAggregate patch. It is very helpful.
> After applying your patch in my local repos, the new spark can handle more 
> partition than before.
> But after some iteration(mapPartition + reduceByKey), the reducer seems 
> become more slower and finally hang.
>
> The logs shows there always 1 message pending in the outbox, and we are 
> waiting for it. Are you aware this kind issue?
> How can I know which message is pending?  Where is it supposed to go?
>
> Log:
>
> 14/07/25 17:49:54 INFO storage.BlockManager: Found block rdd_2_158 locally
> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
> 14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
> 14/07/25 17:50:03 INFO executor.Executor: Serialized size of result for 302 
> is 752
> 14/07/25 17:50:03 INFO executor.Executor: Sending result for 302 directly to 
> driver
> 14/07/25 17:50:03 INFO executor.Executor: Finished task ID 302
> 14/07/25 17:50:34 INFO network.ConnectionManager: Accepted connection from 
> [*/**]
> 14/07/25 17:50:34 INFO network.SendingConnection: Initiating connection to 
> [/]
> 14/07/25 17:50:34 INFO network.SendingConnection: Connected to 
> [/], 1 messages pending
> 14/07/25 17:51:28 INFO storage.ShuffleBlockManager: Deleted all files for 
> shuffle 0
> 14/07/25 17:51:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
> task 742
> 14/07/25 17:51:37 INFO executor.Executor: Running task ID 742
> 14/07/25 17:51:37 INFO storage.BlockManager: Found block broadcast_1 locally
> 14/07/25 17:51:38 INFO spark.MapOutputTrackerWorker: Updating epoch to 1 and 
> clearing cache
> 14/07/25 17:51:38 INFO spark.SparkRegistry: Using kryo with register
> 14/07/25 17:51:38 INFO storage.BlockManager: Found block rdd_2_158 locally
> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
> 14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
> 14/07/25 17:51:48 INFO executor.Executor: Serialized size of result for 742 
> is 752
> 14/07/25 17:51:48 INFO executor.Executor: Sending result for 742 directly to 
> driver
> 14/07/25 17:51:48 INFO executor.Executor: Finished task ID 742
> <—— I have shutdown the App
> 14/07/25 18:16:36 INFO executor.CoarseGrainedExecutorBackend: Driver 
> commanded a shutdown
>
> On Jul 2, 2014, at 0:08, Xiangrui Meng  wrote:
>
>> Try to reduce number of partitions to match the number of cores. We
>> will add treeAggregate to reduce the communication cost.
>>
>> PR: https://github.com/apache/spark/pull/1110
>>
>> -Xiangrui
>>
>> On Tue, Jul 1, 2014 at 12:55 AM, Charles Li  wrote:
>>> Hi Spark,
>>>
>>> I am running LBFGS on our user data. The data size with Kryo serialisation 
>>> is about 210G. The weight size is around 1,300,000. I am quite confused 
>>> that the performance is very close whether the data is cached or not.
>>>
>>> The program is simple:
>>> points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..)
>>> points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not 
>>> cached
>>> gradient = new LogisticGrandient();
>>> updater = new SquaredL2Updater();
>>> initWeight = Vectors.sparse(size, new int[]{}, new double[]{})
>>> result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections, 
>>> convergeTol, maxIter, regParam, initWeight);
>>>
>>> I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its 
>>> cluster mode. Below are some arguments I am using:
>>> —executor-memory 10G
>>> —num-executors 50
>>> —executor-cores 2
>>>
>>> Storage Using:
>>> When caching:
>>> Cached Partitions 951
>>> Fraction Cached 100%
>>> Size in Memory 215.7GB
>>> Size in Tachyon 0.0B
>>> Size on Disk 1029.7MB
>>>
>>> The time cost by every aggregate is around 5 minutes with cache enabled. 
>>> Lots of disk IOs can be seen on the hadoop node. I have the same result 
>>> with cache disabled.
>>>
>>> Should data points caching improve the performance? Should caching decrease 
>>> the disk IO?
>>>
>>> Thanks in advance.
>


Re: Questions about disk IOs

2014-07-25 Thread Charles Li
Hi Xiangrui,

Thanks for your treeAggregate patch. It is very helpful.
After applying your patch in my local repos, the new spark can handle more 
partition than before.
But after some iteration(mapPartition + reduceByKey), the reducer seems become 
more slower and finally hang.

The logs shows there always 1 message pending in the outbox, and we are waiting 
for it. Are you aware this kind issue?
How can I know which message is pending?  Where is it supposed to go?

Log:

14/07/25 17:49:54 INFO storage.BlockManager: Found block rdd_2_158 locally
14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:50:03 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:50:03 INFO executor.Executor: Serialized size of result for 302 is 
752
14/07/25 17:50:03 INFO executor.Executor: Sending result for 302 directly to 
driver
14/07/25 17:50:03 INFO executor.Executor: Finished task ID 302
14/07/25 17:50:34 INFO network.ConnectionManager: Accepted connection from 
[*/**]
14/07/25 17:50:34 INFO network.SendingConnection: Initiating connection to 
[/]
14/07/25 17:50:34 INFO network.SendingConnection: Connected to 
[/], 1 messages pending
14/07/25 17:51:28 INFO storage.ShuffleBlockManager: Deleted all files for 
shuffle 0
14/07/25 17:51:37 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 
742
14/07/25 17:51:37 INFO executor.Executor: Running task ID 742
14/07/25 17:51:37 INFO storage.BlockManager: Found block broadcast_1 locally
14/07/25 17:51:38 INFO spark.MapOutputTrackerWorker: Updating epoch to 1 and 
clearing cache
14/07/25 17:51:38 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:51:38 INFO storage.BlockManager: Found block rdd_2_158 locally
14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:51:48 INFO spark.SparkRegistry: Using kryo with register
14/07/25 17:51:48 INFO executor.Executor: Serialized size of result for 742 is 
752
14/07/25 17:51:48 INFO executor.Executor: Sending result for 742 directly to 
driver
14/07/25 17:51:48 INFO executor.Executor: Finished task ID 742
<—— I have shutdown the App
14/07/25 18:16:36 INFO executor.CoarseGrainedExecutorBackend: Driver commanded 
a shutdown

On Jul 2, 2014, at 0:08, Xiangrui Meng  wrote:

> Try to reduce number of partitions to match the number of cores. We
> will add treeAggregate to reduce the communication cost.
> 
> PR: https://github.com/apache/spark/pull/1110
> 
> -Xiangrui
> 
> On Tue, Jul 1, 2014 at 12:55 AM, Charles Li  wrote:
>> Hi Spark,
>> 
>> I am running LBFGS on our user data. The data size with Kryo serialisation 
>> is about 210G. The weight size is around 1,300,000. I am quite confused that 
>> the performance is very close whether the data is cached or not.
>> 
>> The program is simple:
>> points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..)
>> points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not 
>> cached
>> gradient = new LogisticGrandient();
>> updater = new SquaredL2Updater();
>> initWeight = Vectors.sparse(size, new int[]{}, new double[]{})
>> result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections, 
>> convergeTol, maxIter, regParam, initWeight);
>> 
>> I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its 
>> cluster mode. Below are some arguments I am using:
>> —executor-memory 10G
>> —num-executors 50
>> —executor-cores 2
>> 
>> Storage Using:
>> When caching:
>> Cached Partitions 951
>> Fraction Cached 100%
>> Size in Memory 215.7GB
>> Size in Tachyon 0.0B
>> Size on Disk 1029.7MB
>> 
>> The time cost by every aggregate is around 5 minutes with cache enabled. 
>> Lots of disk IOs can be seen on the hadoop node. I have the same result with 
>> cache disabled.
>> 
>> Should data points caching improve the performance? Should caching decrease 
>> the disk IO?
>> 
>> Thanks in advance.



Re: Questions about disk IOs

2014-07-01 Thread Xiangrui Meng
Try to reduce number of partitions to match the number of cores. We
will add treeAggregate to reduce the communication cost.

PR: https://github.com/apache/spark/pull/1110

-Xiangrui

On Tue, Jul 1, 2014 at 12:55 AM, Charles Li  wrote:
> Hi Spark,
>
> I am running LBFGS on our user data. The data size with Kryo serialisation is 
> about 210G. The weight size is around 1,300,000. I am quite confused that the 
> performance is very close whether the data is cached or not.
>
> The program is simple:
> points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..)
> points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not cached
> gradient = new LogisticGrandient();
> updater = new SquaredL2Updater();
> initWeight = Vectors.sparse(size, new int[]{}, new double[]{})
> result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections, 
> convergeTol, maxIter, regParam, initWeight);
>
> I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its 
> cluster mode. Below are some arguments I am using:
> —executor-memory 10G
> —num-executors 50
> —executor-cores 2
>
> Storage Using:
> When caching:
> Cached Partitions 951
> Fraction Cached 100%
> Size in Memory 215.7GB
> Size in Tachyon 0.0B
> Size on Disk 1029.7MB
>
> The time cost by every aggregate is around 5 minutes with cache enabled. Lots 
> of disk IOs can be seen on the hadoop node. I have the same result with cache 
> disabled.
>
> Should data points caching improve the performance? Should caching decrease 
> the disk IO?
>
> Thanks in advance.


Questions about disk IOs

2014-07-01 Thread Charles Li
Hi Spark,

I am running LBFGS on our user data. The data size with Kryo serialisation is 
about 210G. The weight size is around 1,300,000. I am quite confused that the 
performance is very close whether the data is cached or not.

The program is simple:
points = sc.hadoopFIle(int, SequenceFileInputFormat.class …..)
points.persist(StorageLevel.Memory_AND_DISK_SER()) // comment it if not cached
gradient = new LogisticGrandient();
updater = new SquaredL2Updater();
initWeight = Vectors.sparse(size, new int[]{}, new double[]{})
result = LBFGS.runLBFGS(points.rdd(), grandaunt, updater, numCorrections, 
convergeTol, maxIter, regParam, initWeight);

I have 13 machines with 16 cpus, 48G RAM each. Spark is running on its cluster 
mode. Below are some arguments I am using:
—executor-memory 10G
—num-executors 50
—executor-cores 2

Storage Using:
When caching:
Cached Partitions 951
Fraction Cached 100%
Size in Memory 215.7GB
Size in Tachyon 0.0B
Size on Disk 1029.7MB

The time cost by every aggregate is around 5 minutes with cache enabled. Lots 
of disk IOs can be seen on the hadoop node. I have the same result with cache 
disabled.

Should data points caching improve the performance? Should caching decrease the 
disk IO?

Thanks in advance.