[jira] [Commented] (SPARK-9850) Adaptive execution in Spark
[ https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16561326#comment-16561326 ] Carson Wang commented on SPARK-9850: We have a new proposal and implementation for Spark SQL adaptive execution discussed in SPARK-23128. Optimizing join strategy at run time and handling skewed join are also supported. The full code is also available at [https://github.com/Intel-bigdata/spark-adaptive|https://github.com/Intel-bigdata/spark-adaptive/]. > Adaptive execution in Spark > --- > > Key: SPARK-9850 > URL: https://issues.apache.org/jira/browse/SPARK-9850 > Project: Spark > Issue Type: Epic > Components: Spark Core, SQL >Reporter: Matei Zaharia >Assignee: Yin Huai >Priority: Major > Attachments: AdaptiveExecutionInSpark.pdf > > > Query planning is one of the main factors in high performance, but the > current Spark engine requires the execution DAG for a job to be set in > advance. Even with cost-based optimization, it is hard to know the behavior > of data and user-defined functions well enough to always get great execution > plans. This JIRA proposes to add adaptive query execution, so that the engine > can change the plan for each query as it sees what data earlier stages > produced. > We propose adding this to Spark SQL / DataFrames first, using a new API in > the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, > the functionality could be extended to other libraries or the RDD API, but > that is more difficult than adding it in SQL. > I've attached a design doc by Yin Huai and myself explaining how it would > work in more detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9850) Adaptive execution in Spark
[ https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16546840#comment-16546840 ] Michail Giannakopoulos commented on SPARK-9850: --- Hello [~yhuai]! Are people currently working on this Epic? In other words, is this work in progress, or have you determined that it should be stalled? I am asking because recently I logged an issue related with adaptive execution (SPARK-24826). It would be nice to know if you are working on this actively since it reduces a lot the number of partitions during shuffles when executing sql queries (one of the main bottlenecks for spark). Thanks a lot! > Adaptive execution in Spark > --- > > Key: SPARK-9850 > URL: https://issues.apache.org/jira/browse/SPARK-9850 > Project: Spark > Issue Type: Epic > Components: Spark Core, SQL >Reporter: Matei Zaharia >Assignee: Yin Huai >Priority: Major > Attachments: AdaptiveExecutionInSpark.pdf > > > Query planning is one of the main factors in high performance, but the > current Spark engine requires the execution DAG for a job to be set in > advance. Even with cost-based optimization, it is hard to know the behavior > of data and user-defined functions well enough to always get great execution > plans. This JIRA proposes to add adaptive query execution, so that the engine > can change the plan for each query as it sees what data earlier stages > produced. > We propose adding this to Spark SQL / DataFrames first, using a new API in > the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, > the functionality could be extended to other libraries or the RDD API, but > that is more difficult than adding it in SQL. > I've attached a design doc by Yin Huai and myself explaining how it would > work in more detail. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9850) Adaptive execution in Spark
[ https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667665#comment-15667665 ] Imran Rashid commented on SPARK-9850: - [~assaf.mendelson] reducers already have to wait for the last mapper to finish. Spark has always behaved this way. (I think you might find discussions referring to this as the "stage barrier"). I don't see that changing anytime soon -- while its not ideal, doing away with that would a lot of complexity. > Adaptive execution in Spark > --- > > Key: SPARK-9850 > URL: https://issues.apache.org/jira/browse/SPARK-9850 > Project: Spark > Issue Type: Epic > Components: Spark Core, SQL >Reporter: Matei Zaharia >Assignee: Yin Huai > Attachments: AdaptiveExecutionInSpark.pdf > > > Query planning is one of the main factors in high performance, but the > current Spark engine requires the execution DAG for a job to be set in > advance. Even with cost-based optimization, it is hard to know the behavior > of data and user-defined functions well enough to always get great execution > plans. This JIRA proposes to add adaptive query execution, so that the engine > can change the plan for each query as it sees what data earlier stages > produced. > We propose adding this to Spark SQL / DataFrames first, using a new API in > the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, > the functionality could be extended to other libraries or the RDD API, but > that is more difficult than adding it in SQL. > I've attached a design doc by Yin Huai and myself explaining how it would > work in more detail. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9850) Adaptive execution in Spark
[ https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15666806#comment-15666806 ] Assaf Mendelson commented on SPARK-9850: I like the overall idea. What I am trying to figure out is the portion where first the map portion is performed and then the reducer. if DAGScheduler.submitMapStage() waits for all the map processing to finish and only then reducer start, this can really slow things down as it will begin only when the last map finishes. Wouldn't it be better to start the reducers once the first few mappers finished (or at least when there are idle executors)? Assuming the first few mappers are a representative of the entire maps then this shouldn't affect the assessment of statistics too much. > Adaptive execution in Spark > --- > > Key: SPARK-9850 > URL: https://issues.apache.org/jira/browse/SPARK-9850 > Project: Spark > Issue Type: Epic > Components: Spark Core, SQL >Reporter: Matei Zaharia >Assignee: Yin Huai > Attachments: AdaptiveExecutionInSpark.pdf > > > Query planning is one of the main factors in high performance, but the > current Spark engine requires the execution DAG for a job to be set in > advance. Even with cost-based optimization, it is hard to know the behavior > of data and user-defined functions well enough to always get great execution > plans. This JIRA proposes to add adaptive query execution, so that the engine > can change the plan for each query as it sees what data earlier stages > produced. > We propose adding this to Spark SQL / DataFrames first, using a new API in > the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, > the functionality could be extended to other libraries or the RDD API, but > that is more difficult than adding it in SQL. > I've attached a design doc by Yin Huai and myself explaining how it would > work in more detail. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9850) Adaptive execution in Spark
[ https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15237056#comment-15237056 ] Justin Uang commented on SPARK-9850: I like this idea a lot. One thing we encounter in our use cases is that we end up accidentally joining on a field that is 50% nulls, or a string that represents null like "N/A". It then becomes quite cumbersome to have to constantly have to have a Spark expert dig in and find why there is 1 task that will never finish. Would it be possible to add a threshold such that if a join key ever gets too big, it will just fail the job with an error message? > Adaptive execution in Spark > --- > > Key: SPARK-9850 > URL: https://issues.apache.org/jira/browse/SPARK-9850 > Project: Spark > Issue Type: Epic > Components: Spark Core, SQL >Reporter: Matei Zaharia >Assignee: Yin Huai > Attachments: AdaptiveExecutionInSpark.pdf > > > Query planning is one of the main factors in high performance, but the > current Spark engine requires the execution DAG for a job to be set in > advance. Even with cost-based optimization, it is hard to know the behavior > of data and user-defined functions well enough to always get great execution > plans. This JIRA proposes to add adaptive query execution, so that the engine > can change the plan for each query as it sees what data earlier stages > produced. > We propose adding this to Spark SQL / DataFrames first, using a new API in > the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, > the functionality could be extended to other libraries or the RDD API, but > that is more difficult than adding it in SQL. > I've attached a design doc by Yin Huai and myself explaining how it would > work in more detail. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9850) Adaptive execution in Spark
[ https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15096985#comment-15096985 ] Maciej Bryński commented on SPARK-9850: --- [~matei] Hi, I'm not sure if my issue is related to this Jira. In 1.6.0 when using sql limit Spark do following: - execute limit on every partition - then take result Is it possible to finish scanning partitions when we collect enough rows for limit ? > Adaptive execution in Spark > --- > > Key: SPARK-9850 > URL: https://issues.apache.org/jira/browse/SPARK-9850 > Project: Spark > Issue Type: Epic > Components: Spark Core, SQL >Reporter: Matei Zaharia >Assignee: Yin Huai > Attachments: AdaptiveExecutionInSpark.pdf > > > Query planning is one of the main factors in high performance, but the > current Spark engine requires the execution DAG for a job to be set in > advance. Even with cost-based optimization, it is hard to know the behavior > of data and user-defined functions well enough to always get great execution > plans. This JIRA proposes to add adaptive query execution, so that the engine > can change the plan for each query as it sees what data earlier stages > produced. > We propose adding this to Spark SQL / DataFrames first, using a new API in > the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, > the functionality could be extended to other libraries or the RDD API, but > that is more difficult than adding it in SQL. > I've attached a design doc by Yin Huai and myself explaining how it would > work in more detail. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9850) Adaptive execution in Spark
[ https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14907518#comment-14907518 ] Matei Zaharia commented on SPARK-9850: -- Hey Imran, this could make sense, but note that the problem will only happen if you have 2000 map *output* partitions, which would've been 2000 reduce tasks normally. Otherwise, you can have as many map *tasks* as needed with fewer partitions. In most jobs, I'd expect data to get significantly smaller after the maps, so we'd catch that. In particular, for choosing between broadcast and shuffle joins this should be fine. We can do something different if we suspect that there is going to be tons of map output *and* we think there's nontrivial planning to be done once we see it. > Adaptive execution in Spark > --- > > Key: SPARK-9850 > URL: https://issues.apache.org/jira/browse/SPARK-9850 > Project: Spark > Issue Type: Epic > Components: Spark Core, SQL >Reporter: Matei Zaharia >Assignee: Yin Huai > Attachments: AdaptiveExecutionInSpark.pdf > > > Query planning is one of the main factors in high performance, but the > current Spark engine requires the execution DAG for a job to be set in > advance. Even with cost-based optimization, it is hard to know the behavior > of data and user-defined functions well enough to always get great execution > plans. This JIRA proposes to add adaptive query execution, so that the engine > can change the plan for each query as it sees what data earlier stages > produced. > We propose adding this to Spark SQL / DataFrames first, using a new API in > the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, > the functionality could be extended to other libraries or the RDD API, but > that is more difficult than adding it in SQL. > I've attached a design doc by Yin Huai and myself explaining how it would > work in more detail. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9850) Adaptive execution in Spark
[ https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14876347#comment-14876347 ] Imran Rashid commented on SPARK-9850: - just to continue brainstorming on what to do with large data -- I realize that my earlier suggestion about sending uncompressed (or at least not {{HighlyCompressed}}) map status back to the driver may not work, since part of the point is to avoid OOM on the driver, not to just reduce communication from the driver back to the executors. But here are two other ideas: 1. Create a variant of {{HighlyCompressedMapOutput}} which stores all block sizes that are more than some factor above the median, lets say 5x? This would let you deal w/ really extreme skew without increasing the size too much. 2. Since you only need the summed size of the map output per reduce partition, you could first perform a tree-reduce of those sizes on the executors before sending back to the driver. avoids trying to guess some arbitrary cutoff factor, but also way more complicated. > Adaptive execution in Spark > --- > > Key: SPARK-9850 > URL: https://issues.apache.org/jira/browse/SPARK-9850 > Project: Spark > Issue Type: Epic > Components: Spark Core, SQL >Reporter: Matei Zaharia >Assignee: Yin Huai > Attachments: AdaptiveExecutionInSpark.pdf > > > Query planning is one of the main factors in high performance, but the > current Spark engine requires the execution DAG for a job to be set in > advance. Even with cost-based optimization, it is hard to know the behavior > of data and user-defined functions well enough to always get great execution > plans. This JIRA proposes to add adaptive query execution, so that the engine > can change the plan for each query as it sees what data earlier stages > produced. > We propose adding this to Spark SQL / DataFrames first, using a new API in > the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, > the functionality could be extended to other libraries or the RDD API, but > that is more difficult than adding it in SQL. > I've attached a design doc by Yin Huai and myself explaining how it would > work in more detail. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-9850) Adaptive execution in Spark
[ https://issues.apache.org/jira/browse/SPARK-9850?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14709624#comment-14709624 ] Imran Rashid commented on SPARK-9850: - I know the 1000 partitions used in the design doc is just for an example, but I just wanted to point out that the number probably needs to be much larger. With a 2GB limit per partition, that is already 2 TB max. My rule of thumb has been to keep partitions around 100 MB, which is roughly inline with the 64 MB you mention in the doc, which brings you down to 100 GB. And given that you want to deal w/ skewed data etc., you probably actually want to leave quite a bit of room, which limits you to relatively small datasets. The key point is that after you go over 2000 partitions, you are going into {{HighlyCompressedMapOutput}} territory, which will be relatively useless for this. Perhaps each shuffle map task can always send an uncompressed map status back to the driver? Maybe you could only use the {{HighlyCompressedMapStatus}} on the shuffle-read side?? I'm not sure about the performance implications, just throwing out an idea. > Adaptive execution in Spark > --- > > Key: SPARK-9850 > URL: https://issues.apache.org/jira/browse/SPARK-9850 > Project: Spark > Issue Type: Epic > Components: Spark Core, SQL >Reporter: Matei Zaharia >Assignee: Yin Huai > Attachments: AdaptiveExecutionInSpark.pdf > > > Query planning is one of the main factors in high performance, but the > current Spark engine requires the execution DAG for a job to be set in > advance. Even with cost-based optimization, it is hard to know the behavior > of data and user-defined functions well enough to always get great execution > plans. This JIRA proposes to add adaptive query execution, so that the engine > can change the plan for each query as it sees what data earlier stages > produced. > We propose adding this to Spark SQL / DataFrames first, using a new API in > the Spark engine that lets libraries run DAGs adaptively. In future JIRAs, > the functionality could be extended to other libraries or the RDD API, but > that is more difficult than adding it in SQL. > I've attached a design doc by Yin Huai and myself explaining how it would > work in more detail. -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org