[jira] [Commented] (SPARK-3376) Memory-based shuffle strategy to reduce overhead of disk I/O

2015-04-07 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14484072#comment-14484072
 ] 

Apache Spark commented on SPARK-3376:
-

User 'kayousterhout' has created a pull request for this issue:
https://github.com/apache/spark/pull/5403

 Memory-based shuffle strategy to reduce overhead of disk I/O
 

 Key: SPARK-3376
 URL: https://issues.apache.org/jira/browse/SPARK-3376
 Project: Spark
  Issue Type: New Feature
  Components: Shuffle
Affects Versions: 1.1.0
Reporter: uncleGen
  Labels: performance

 I think a memory-based shuffle can reduce some overhead of disk I/O. I just 
 want to know is there any plan to do something about it. Or any suggestion 
 about it. Base on the work (SPARK-2044), it is feasible to have several 
 implementations of  shuffle.
 
 Currently, there are two implementions of shuffle manager, i.e. SORT and 
 HASH. Both of them will use disk in some stages. For examples, in the map 
 side, all the intermediate data will be written into temporary files. In the 
 reduce side, Spark will use external sort sometimes. In any case, disk I/O 
 will bring some performance loss. Maybe,we can provide a pure-memory shuffle 
 manager. In this shuffle manager, intermediate data will only go through 
 memory. In some of scenes, it can improve performance. Experimentally, I 
 implemented a in-memory shuffle manager upon SPARK-2044. 
 1. Following is my testing result (some heary shuffle operations):
 | data size (Byte)   |  partitions  |  resources |
 | 5131859218  |2000   |   50 executors/ 4 cores/ 4GB |
 | settings   |  operation1   | 
 operation2 |
 | shuffle spill  lz4 |  repartition+flatMap+groupByKey | repartition + 
 groupByKey | 
 |memory   |   38s   |  16s |
 |sort |   45s   |  28s |
 |hash |   46s   |  28s |
 |no shuffle spill  lz4 | | |
 | memory |   16s | 16s |
 | | | |
 |shuffle spill  lzf | | |
 |memory|  28s   | 27s |
 |sort  |  29s   | 29s |
 |hash  |  41s   | 30s |
 |no shuffle spill  lzf | | |
 | memory |  15s | 16s |
 In my implementation, I simply reused the BlockManager in the map-side and 
 set the spark.shuffle.spill false in the reduce-side. All the intermediate 
 data is cached in memory store. Just as Reynold Xin has pointed out, our 
 disk-based shuffle manager has achieved a good performance. With  parameter 
 tuning, the disk-based shuffle manager will  obtain similar performance as 
 memory-based shuffle manager. However, I will continue my work and improve 
 it. And as an alternative tuning option, InMemory shuffle is a good choice. 
 Future work includes, but is not limited to:
 - memory usage management in InMemory Shuffle mode
 - data management when intermediate data can not fit in memory
 Test code:
 {code: borderStyle=solid}
 val conf = new SparkConf().setAppName(InMemoryShuffleTest)
 val sc = new SparkContext(conf)
 val dataPath = args(0)
 val partitions = args(1).toInt
 val rdd1 = sc.textFile(dataPath).cache()
 rdd1.count()
 val startTime = System.currentTimeMillis()
 val rdd2 = rdd1.repartition(partitions)
   .flatMap(_.split(,)).map(s = (s, s))
   .groupBy(e = e._1)
 rdd2.count()
 val endTime = System.currentTimeMillis()
 println(time:  + (endTime - startTime) / 1000 )
 {code}
 2. Following is a Spark Sort Benchmark (in spark 1.1.1). There is no tuning 
 for disk shuffle. 
 2.1. Test the influence of memory size per core
 precondition: 100GB(SORT benchmark), 100 executor /15cores  1491partitions 
 (input file blocks) . 
 | memory size per executor| inmemory shuffle(no shuffle spill)  |  sort 
 shuffle  |  hash shuffle |   improvement(vs.sort)  |   improvement(vs.hash) |
 |9GB   |  79.652849s |  60.102337s | failed|   
 -32.7%|  -|
 |12GB  |  54.821924s |  51.654897s |109.167068s |   
 -3.17%|+47.8% | 
 |15GB  |  33.537199s |  40.140621s |48.088158s  |   
 +16.47%   |+30.26%|
 |18GB  |  30.930927s |  43.392401s |49.830276s  |   
 +28.7%|+37.93%| 
 2.2. Test the influence of partition number
 18GB/15cores per executor
 | partitions | inmemory shuffle(no shuffle spill)  |  sort shuffle  |  hash 
 shuffle |   improvement(vs.sort)  |  

[jira] [Commented] (SPARK-3376) Memory-based shuffle strategy to reduce overhead of disk I/O

2014-12-10 Thread uncleGen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14240985#comment-14240985
 ] 

uncleGen commented on SPARK-3376:
-

[~rxin] Yeah, I agree with you. We can improve the I/O(disk I/O and network 
I/O) performance from hardware resources and software resources. With limited 
hardware resources, we can provide a soft way to achieve a similar performance. 
Maybe, it is a good choice to provide an alternative “memory-based” shuffle 
option.

 Memory-based shuffle strategy to reduce overhead of disk I/O
 

 Key: SPARK-3376
 URL: https://issues.apache.org/jira/browse/SPARK-3376
 Project: Spark
  Issue Type: New Feature
  Components: Shuffle
Affects Versions: 1.1.0
Reporter: uncleGen
Priority: Trivial
  Labels: performance

 I think a memory-based shuffle can reduce some overhead of disk I/O. I just 
 want to know is there any plan to do something about it. Or any suggestion 
 about it. Base on the work (SPARK-2044), it is feasible to have several 
 implementations of  shuffle.
 
 Currently, there are two implementions of shuffle manager, i.e. SORT and 
 HASH. Both of them will use disk in some stages. For examples, in the map 
 side, all the intermediate data will be written into temporary files. In the 
 reduce side, Spark will use external sort sometimes. In any case, disk I/O 
 will bring some performance loss. Maybe,we can provide a pure-memory shuffle 
 manager. In this shuffle manager, intermediate data will only go through 
 memory. In some of scenes, it can improve performance. Experimentally, I 
 implemented a in-memory shuffle manager upon SPARK-2044. Following is my 
 testing result:
 | data size (Byte)   |  partitions  |  resources |
 | 5131859218  |2000   |   50 executors/ 4 cores/ 4GB |
 | settings   |  operation1   | 
 operation2 |
 | shuffle spill  lz4 |  repartition+flatMap+groupByKey | repartition + 
 groupByKey | 
 |memory   |   38s   |  16s |
 |sort |   45s   |  28s |
 |hash |   46s   |  28s |
 |no shuffle spill  lz4 | | |
 | memory |   16s | 16s |
 | | | |
 |shuffle spill  lzf | | |
 |memory|  28s   | 27s |
 |sort  |  29s   | 29s |
 |hash  |  41s   | 30s |
 |no shuffle spill  lzf | | |
 | memory |  15s | 16s |
 In my implementation, I simply reused the BlockManager in the map-side and 
 set the spark.shuffle.spill false in the reduce-side. All the intermediate 
 data is cached in memory store. Just as Reynold Xin has pointed out, our 
 disk-based shuffle manager has achieved a good performance. With  parameter 
 tuning, the disk-based shuffle manager will  obtain similar performance as 
 memory-based shuffle manager. However, I will continue my work and improve 
 it. And as an alternative tuning option, InMemory shuffle is a good choice. 
 Future work includes, but is not limited to:
 - memory usage management in InMemory Shuffle mode
 - data management when intermediate data can not fit in memory
 Test code:
 {code: borderStyle=solid}
 val conf = new SparkConf().setAppName(InMemoryShuffleTest)
 val sc = new SparkContext(conf)
 val dataPath = args(0)
 val partitions = args(1).toInt
 val rdd1 = sc.textFile(dataPath).cache()
 rdd1.count()
 val startTime = System.currentTimeMillis()
 val rdd2 = rdd1.repartition(partitions)
   .flatMap(_.split(,)).map(s = (s, s))
   .groupBy(e = e._1)
 rdd2.count()
 val endTime = System.currentTimeMillis()
 println(time:  + (endTime - startTime) / 1000 )
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3376) Memory-based shuffle strategy to reduce overhead of disk I/O

2014-10-09 Thread Nicholas Chammas (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14165359#comment-14165359
 ] 

Nicholas Chammas commented on SPARK-3376:
-

[~matei], [~rxin], [~pwendell]: This is something to have on your radars, I 
believe.

 Memory-based shuffle strategy to reduce overhead of disk I/O
 

 Key: SPARK-3376
 URL: https://issues.apache.org/jira/browse/SPARK-3376
 Project: Spark
  Issue Type: Planned Work
Reporter: uncleGen
Priority: Trivial

 I think a memory-based shuffle can reduce some overhead of disk I/O. I just 
 want to know is there any plan to do something about it. Or any suggestion 
 about it. Base on the work (SPARK-2044), it is feasible to have several 
 implementations of  shuffle.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-3376) Memory-based shuffle strategy to reduce overhead of disk I/O

2014-10-09 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-3376?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14165702#comment-14165702
 ] 

Reynold Xin commented on SPARK-3376:


It is definitely possible. We should evaluate the benefit. What I find recently 
is that with SSDs and zero copy send, disk-based shuffle can be pretty fast as 
well. That is, the network (assuming 10G) is the new bottleneck. 

 Memory-based shuffle strategy to reduce overhead of disk I/O
 

 Key: SPARK-3376
 URL: https://issues.apache.org/jira/browse/SPARK-3376
 Project: Spark
  Issue Type: Planned Work
Reporter: uncleGen
Priority: Trivial

 I think a memory-based shuffle can reduce some overhead of disk I/O. I just 
 want to know is there any plan to do something about it. Or any suggestion 
 about it. Base on the work (SPARK-2044), it is feasible to have several 
 implementations of  shuffle.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org