[jira] [Commented] (SPARK-29655) Prefer bucket join if adaptive execution is enabled and maxNumPostShufflePartitions != bucket number

2019-11-05 Thread Yuming Wang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16968127#comment-16968127
 ] 

Yuming Wang commented on SPARK-29655:
-

I'm working on.

> Prefer bucket join if adaptive execution is enabled and 
> maxNumPostShufflePartitions != bucket number
> 
>
> Key: SPARK-29655
> URL: https://issues.apache.org/jira/browse/SPARK-29655
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> Prefer bucketing join if adaptive execution is enabled and 
> maxNumPostShufflePartitions != bucket number.  How to reproduce:
> {code:scala}
> import org.apache.spark.sql.SaveMode
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
> spark.conf.set("spark.sql.shuffle.partitions", 4)
> val bucketedTableName = "bucketed_table"
> spark.range(10).write.bucketBy(4, 
> "id").sortBy("id").mode(SaveMode.Overwrite).saveAsTable(bucketedTableName)
> val bucketedTable = spark.table(bucketedTableName)
> val df = spark.range(4)
> df.join(bucketedTable, "id").explain()
> spark.conf.set("spark.sql.adaptive.enabled", true)
> spark.conf.set("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions", 5)
> df.join(bucketedTable, "id").explain()
> {code}
> Output:
> {noformat}
> == Physical Plan ==
> AdaptiveSparkPlan(isFinalPlan=false)
> +- Project [id#5L]
>+- SortMergeJoin [id#5L], [id#3L], Inner
>   :- Sort [id#5L ASC NULLS FIRST], false, 0
>   :  +- Exchange hashpartitioning(id#5L, 5), true, [id=#92]
>   : +- Range (0, 4, step=1, splits=16)
>   +- Sort [id#3L ASC NULLS FIRST], false, 0
>  +- Exchange hashpartitioning(id#3L, 5), true, [id=#93]
> +- Project [id#3L]
>+- Filter isnotnull(id#3L)
>   +- FileScan parquet default.bucketed_table[id#3L] Batched: 
> true, DataFilters: [isnotnull(id#3L)], Format: Parquet, Location: 
> InMemoryFileIndex[file:/root/spark-3.0.0-preview-bin-hadoop3.2/spark-warehouse/bucketed_table],
>  PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
> struct, SelectedBucketsCount: 4 out of 4
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-29655) Prefer bucket join if adaptive execution is enabled and maxNumPostShufflePartitions != bucket number

2019-10-30 Thread Yuming Wang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-29655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16962893#comment-16962893
 ] 

Yuming Wang commented on SPARK-29655:
-

cc [~Jk_Self]

> Prefer bucket join if adaptive execution is enabled and 
> maxNumPostShufflePartitions != bucket number
> 
>
> Key: SPARK-29655
> URL: https://issues.apache.org/jira/browse/SPARK-29655
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.0.0
>Reporter: Yuming Wang
>Priority: Major
>
> Prefer bucketing join if adaptive execution is enabled and 
> maxNumPostShufflePartitions != bucket number.  How to reproduce:
> {code:scala}
> import org.apache.spark.sql.SaveMode
> spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
> spark.conf.set("spark.sql.shuffle.partitions", 4)
> val bucketedTableName = "bucketed_table"
> spark.range(10).write.bucketBy(4, 
> "id").sortBy("id").mode(SaveMode.Overwrite).saveAsTable(bucketedTableName)
> val bucketedTable = spark.table(bucketedTableName)
> val df = spark.range(4)
> df.join(bucketedTable, "id").explain()
> spark.conf.set("spark.sql.adaptive.enabled", true)
> spark.conf.set("spark.sql.adaptive.shuffle.maxNumPostShufflePartitions", 5)
> df.join(bucketedTable, "id").explain()
> {code}
> Output:
> {noformat}
> == Physical Plan ==
> AdaptiveSparkPlan(isFinalPlan=false)
> +- Project [id#5L]
>+- SortMergeJoin [id#5L], [id#3L], Inner
>   :- Sort [id#5L ASC NULLS FIRST], false, 0
>   :  +- Exchange hashpartitioning(id#5L, 5), true, [id=#92]
>   : +- Range (0, 4, step=1, splits=16)
>   +- Sort [id#3L ASC NULLS FIRST], false, 0
>  +- Exchange hashpartitioning(id#3L, 5), true, [id=#93]
> +- Project [id#3L]
>+- Filter isnotnull(id#3L)
>   +- FileScan parquet default.bucketed_table[id#3L] Batched: 
> true, DataFilters: [isnotnull(id#3L)], Format: Parquet, Location: 
> InMemoryFileIndex[file:/root/spark-3.0.0-preview-bin-hadoop3.2/spark-warehouse/bucketed_table],
>  PartitionFilters: [], PushedFilters: [IsNotNull(id)], ReadSchema: 
> struct, SelectedBucketsCount: 4 out of 4
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org