mingmwang opened a new issue #1805:
URL: https://github.com/apache/arrow-datafusion/issues/1805


   **Is your feature request related to a problem or challenge? Please describe 
what you are trying to do.**
   
   A new feature enhancement
   
   **Describe the solution you'd like**
   
   Current Ballista’s shuffle implementation is very similar to Spark’s early 
version. It’s the hash-based shuffle solution where shuffle data is 
materialized to disks, each map task produces separate files for each reduce 
task. For a shuffle operation involves M map tasks and N reduce task, it will 
generate M*N files. Too many tiny files will cause performance, memory and 
scalability issues. Later Spark version introduced the sort-based shuffle 
solution and became the default shuffle implementation. the sort-based shuffle 
will not generate M*N files, each map task sort the records by the partition id 
+ key and generate a pair of files, all records are consolidate in one data 
file and an index file is created to manage the data range metadata for 
different partitions. In Spark 2.0, the hash-based shuffle code was removed. 
Spark also introduced external shuffle services to serve materialized 
intermediate shuffle data in order to achieve better fault-tolerance and 
performance iso
 lation. In the recent Spark 3.2 release, it introduces a push based shuffle 
solution (SPARK-30602) to further improve the shuffle stability and IO 
performance. With spark’s push-based shuffle, shuffle is performed at the end 
of the map tasks and shuffle blocks are pre-merged and pushed to selected 
reducer nodes or upload to spark external shuffle servers.
   
   Other distributed compute engines like Flink and Presto also support the 
shuffle operation. But they didn’t materialize the shuffle data to disks, 
instead, shuffle data is streamingly materialized into an in-memory buffer, the 
reduce tasks poll the shuffle data from map tasks’ in-memory buffer to minimize 
the end-to-end latency.
   
   Here, we propose a new streaming style push-based shuffle solution for 
Ballista. Where shuffle is performed at the end of map tasks. Instead of 
materializing the intermediate shuffle data to disks and generate M*N files, 
shuffle data is directly pushed to the reduce tasks via Arrow-Flight gRpc call 
to achieve very low latency. This is important for low latency queries. The 
corresponding Stage scheduling will be enhanced to support the All-at-Once 
scheduling. With all-at-once scheduling, all the stages of a SQL/Job will be 
scheduled at almost the same time. The distributed DAG of the query is fixed at 
the beginning,  so that the map tasks can streamingly push the shuffle data to 
downstream reduce tasks.
   
   I will draft a detailed design doc to cover the proposed API changes later.
   
   **Describe alternatives you've considered**
   A clear and concise description of any alternative solutions or features 
you've considered.
   
   **Additional context**
   Add any other context or screenshots about the feature request here.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to