[jira] [Commented] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

2022-10-10 Thread Apache Spark (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615379#comment-17615379
 ] 

Apache Spark commented on SPARK-40703:
--

User 'sunchao' has created a pull request for this issue:
https://github.com/apache/spark/pull/38196

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> --
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Bryan Keller
>Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt, test.py
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

2022-10-10 Thread Bryan Keller (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615264#comment-17615264
 ] 

Bryan Keller commented on SPARK-40703:
--

Sounds good, thanks.

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> --
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Bryan Keller
>Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt, test.py
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

2022-10-10 Thread Chao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615255#comment-17615255
 ] 

Chao Sun commented on SPARK-40703:
--

Thanks [~bryanck] . Now I see where the issue is.

In your pyspark example, one side reports {{UnknownPartitioning}} while another 
side reports {{{}SinglePartition{}}}. Later on, Spark will insert shuffle for 
{{UnknownPartitioning}} so it becomes {{{}HashPartitioning{}}}. In this 
particular case, when Spark is deciding which side to insert shuffle, it'll 
pick the {{HashPartitioning}} again and convert it into the same 
{{HashPartitioning}} but with {{{}numPartitions = 1{}}}.

Before:
{code}
 ShuffleExchange(HashPartition(200))  <-->  SinglePartition
{code}
(suppose {{spark.sql.shuffle.partitions}} is 200)

After:
{code}
 ShuffleExchange(HashPartition(1))  <-->  SinglePartition
{code}
 
The reason Spark chooses to do in this way is because there is a trade-off 
between shuffle cost and parallelism. At the moment, when Spark sees that one 
side of the join has {{ShuffleExchange}} (meaning it needs to be shuffled 
anyways), and the other side doesn't, it'll try to avoid shuffling the other 
side. 

This makes more sense if we have:
{code}
ShuffleExchange(HashPartition(200)) <-> HashPartition(150)
{code}

as in this case, Spark will avoid shuffle the right hand side and instead just 
change the number of shuffle partitions on the left:
{code}
ShuffleExchange(HashPartition(150) <-> HashPartition(150)
{code}

I feel we can treat the {{SinglePartition}} as a special case here. Let me see 
if I can come up with a PR.

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> --
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Bryan Keller
>Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt, test.py
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

2022-10-09 Thread Bryan Keller (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614738#comment-17614738
 ] 

Bryan Keller commented on SPARK-40703:
--

[~csun] I added a pyspark script that shows the difference between DSv1 and 
DSv2, in case that helps.

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> --
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Bryan Keller
>Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt, test.py
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

2022-10-07 Thread Chao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614303#comment-17614303
 ] 

Chao Sun commented on SPARK-40703:
--

Hmm somehow in the unit test I was able to see that changing 
{{spark.sql.requireAllClusterKeysForCoPartition}} to false fixed the issue:
{code:java}
  test("EnsureRequirements should respect spark.sql.shuffle.partitions with 
SinglePartition") {
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> 10.toString) {
  val plan1: SparkPlan = DummySparkPlan(
outputPartitioning = HashPartitioning(exprA :: Nil, 15))
  val plan2 = DummySparkPlan(outputPartitioning = SinglePartition)
  val smjExec = SortMergeJoinExec(
exprA :: exprB :: Nil, exprC :: exprD :: Nil, Inner, None, plan1, plan2)
  applyEnsureRequirementsWithSubsetKeys(smjExec) match {
case SortMergeJoinExec(_, _, _, _,
SortExec(_, _, DummySparkPlan(_, _, left: HashPartitioning, _, _), _),
SortExec(_, _, ShuffleExchangeExec(right: HashPartitioning, _, _), _), 
_) =>
  assert(left.expressions === Seq(exprA))
  assert(right.expressions === Seq(exprC))
  assert(left.numPartitions == 15)
  assert(right.numPartitions == 15)
case other => fail(other.toString)
  }
}
  }
{code}

(This was added in {{EnsureRequirementsSuite}}.

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> --
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Bryan Keller
>Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

2022-10-07 Thread Bryan Keller (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614302#comment-17614302
 ] 

Bryan Keller commented on SPARK-40703:
--

Wouldn't that have the same problem if 
{{spark.sql.requireAllClusterKeysForCoPartition}} is set?

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> --
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Bryan Keller
>Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

2022-10-07 Thread Chao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614298#comment-17614298
 ] 

Chao Sun commented on SPARK-40703:
--

(one idea is that {{SinglePartitionSpec#canCreatePartitioing}} should also 
consider {{spark.sql.requireAllClusterKeysForCoPartition}} instead of always 
returning true)

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> --
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Bryan Keller
>Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

2022-10-07 Thread Chao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614296#comment-17614296
 ] 

Chao Sun commented on SPARK-40703:
--

Hmm interesting. Let me try to come up with a unit test and check what happened 
underneath. 

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> --
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Bryan Keller
>Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

2022-10-07 Thread Bryan Keller (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614291#comment-17614291
 ] 

Bryan Keller commented on SPARK-40703:
--

I gave that a try, but setting 
{{spark.sql.requireAllClusterKeysForCoPartition=false}} doesn't seem to help, I 
still am seeing a single partition selected for the join

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> --
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Bryan Keller
>Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

2022-10-07 Thread Chao Sun (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614289#comment-17614289
 ] 

Chao Sun commented on SPARK-40703:
--

I see. The reason HashPartitioning is not picked as the best candidate in this 
case could be that {{spark.sql.requireAllClusterKeysForCoPartition}} is turned 
on by default, so its {{canCreatePartitioing}} returns false. One idea is to 
try setting {{spark.sql.requireAllClusterKeysForCoPartition}} to false.

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> --
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Bryan Keller
>Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

2022-10-07 Thread Bryan Keller (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614210#comment-17614210
 ] 

Bryan Keller commented on SPARK-40703:
--

DSv1 DataSourceScanExec returns UnknownPartitioning(0) for non-bucketed joins, 
and I believe this is why DSv1 isn't impacted

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> --
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Bryan Keller
>Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

2022-10-07 Thread Bryan Keller (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614207#comment-17614207
 ] 

Bryan Keller commented on SPARK-40703:
--

I tried a fix/hack that returns an UnknownPartitioning(0) instead of 
SinglePartition, and that fixes this issue, but not sure if that has adverse 
effects elsewhere.

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> --
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Bryan Keller
>Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

2022-10-07 Thread Bryan Keller (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614205#comment-17614205
 ] 

Bryan Keller commented on SPARK-40703:
--

[~sunchao] , the other side is HashPartitioning with num partitions of 200. 
There is a check shouldConsiderMinParallelism which ends up being false, 
because the SinglePartition returns true for canCreatePartitioing. The 
canCreatePartitioning also results in the SinglePartition being selected as the 
best candidate.

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> --
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
>  Issue Type: Bug
>  Components: SQL
>Affects Versions: 3.3.0
>Reporter: Bryan Keller
>Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

2022-10-07 Thread Sun Chao (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614200#comment-17614200
 ] 

Sun Chao commented on SPARK-40703:
--

[~bryanck] In the above case, what is the partition spec for the other side of 
the join? Normally Spark should pick the partition spec with the most number of 
partitions and use that to shuffle the other side. 

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> --
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: Bryan Keller
>Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-40703) Performance regression for joins in Spark 3.3 vs Spark 3.2

2022-10-07 Thread Sun Chao (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17614199#comment-17614199
 ] 

Sun Chao commented on SPARK-40703:
--

cc @yumwang

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> --
>
> Key: SPARK-40703
> URL: https://issues.apache.org/jira/browse/SPARK-40703
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: Bryan Keller
>Priority: Major
> Attachments: spark32-plan.txt, spark33-plan.txt
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org