[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r430047108 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInEquiJoin.scala ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project, UnaryNode} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * Wraps `LogicalRelation` to provide the number of buckets for coalescing. + */ +case class CoalesceBuckets( +numCoalescedBuckets: Int, +child: LogicalRelation) extends UnaryNode { + require(numCoalescedBuckets > 0, +s"Number of coalesced buckets ($numCoalescedBuckets) must be positive.") + + override def output: Seq[Attribute] = child.output +} + +/** + * This rule adds a `CoalesceBuckets` logical plan if the following conditions are met: + * - Two bucketed tables are joined. + * - Join is the equi-join. + * - The larger bucket number is divisible by the smaller bucket number. + * - "spark.sql.bucketing.coalesceBucketsInJoin.enabled" is set to true. + * - The difference in the number of buckets is less than the value set in + * "spark.sql.bucketing.coalesceBucketsInJoin.maxNumBucketsDiff". + */ +object CoalesceBucketsInEquiJoin extends Rule[LogicalPlan] { + private def isPlanEligible(plan: LogicalPlan): Boolean = { +def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = { + p(plan) && plan.children.forall(forall(_)(p)) +} + +forall(plan) { + case _: Filter | _: Project | _: LogicalRelation => true + case _ => false +} + } + + private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = { +if (isPlanEligible(plan)) { + plan.collectFirst { +case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) if r.bucketSpec.nonEmpty => + r.bucketSpec.get + } +} else { + None +} + } + + private def mayCoalesce(numBuckets1: Int, numBuckets2: Int, conf: SQLConf): Option[Int] = { +assert(numBuckets1 != numBuckets2) +val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) +// A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller +// number of buckets because bucket id is calculated by modding the total number of buckets. +if ((large % small == 0) && ((large - small) <= conf.coalesceBucketsInJoinMaxNumBucketsDiff)) { + Some(small) +} else { + None +} + } + + private def addCoalesceBuckets(plan: LogicalPlan, numCoalescedBuckets: Int): LogicalPlan = { +plan.transformUp { + case l @ LogicalRelation(_: HadoopFsRelation, _, _, _) => +CoalesceBuckets(numCoalescedBuckets, l) +} + } + + object ExtractJoinWithBuckets { +def unapply(plan: LogicalPlan): Option[(Join, Int, Int)] = { + plan match { +case join @ ExtractEquiJoinKeys(_, _, _, _, left, right, _) => Review comment: Updated to use a physical rule. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r428435196 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInEquiJoin.scala ## @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project, UnaryNode} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * Wraps `LogicalRelation` to provide the number of buckets for coalescing. + */ +case class CoalesceBuckets( +numCoalescedBuckets: Int, +child: LogicalRelation) extends UnaryNode { + require(numCoalescedBuckets > 0, +s"Number of coalesced buckets ($numCoalescedBuckets) must be positive.") + + override def output: Seq[Attribute] = child.output +} + +/** + * This rule adds a `CoalesceBuckets` logical plan if the following conditions are met: + * - Two bucketed tables are joined. + * - Join is the equi-join. + * - The larger bucket number is divisible by the smaller bucket number. + * - "spark.sql.bucketing.coalesceBucketsInJoin.enabled" is set to true. + * - The difference in the number of buckets is less than the value set in + * "spark.sql.bucketing.coalesceBucketsInJoin.maxNumBucketsDiff". + */ +object CoalesceBucketsInEquiJoin extends Rule[LogicalPlan] { + private def isPlanEligible(plan: LogicalPlan): Boolean = { +def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = { + p(plan) && plan.children.forall(forall(_)(p)) +} + +forall(plan) { + case _: Filter | _: Project | _: LogicalRelation => true + case _ => false +} + } + + private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = { +if (isPlanEligible(plan)) { + plan.collectFirst { +case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) if r.bucketSpec.nonEmpty => + r.bucketSpec.get + } +} else { + None +} + } + + private def mayCoalesce(numBuckets1: Int, numBuckets2: Int, conf: SQLConf): Option[Int] = { +assert(numBuckets1 != numBuckets2) +val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) +// A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller +// number of buckets because bucket id is calculated by modding the total number of buckets. +if ((large % small == 0) && ((large - small) <= conf.coalesceBucketsInJoinMaxNumBucketsDiff)) { + Some(small) +} else { + None +} + } + + private def addCoalesceBuckets(plan: LogicalPlan, numCoalescedBuckets: Int): LogicalPlan = { +plan.transformUp { + case l @ LogicalRelation(_: HadoopFsRelation, _, _, _) => +CoalesceBuckets(numCoalescedBuckets, l) +} + } + + object ExtractJoinWithBuckets { +def unapply(plan: LogicalPlan): Option[(Join, Int, Int)] = { + plan match { +case join @ ExtractEquiJoinKeys(_, _, _, _, left, right, _) => Review comment: Sure, I guess the concern here is the unnecessarily reduced parallelism if broadcast join is picked. Is `QueryExecution.preparations` a reasonable place to insert the rule? (That seems to be the only place where you can plug in a rule that takes in a `SparkPlan` and returns a `SparkPlan`.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail:
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r419830462 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule adds a `CoalesceBuckets` logical plan if one side of two bucketed tables can be + * coalesced when the two bucketed tables are joined and they differ in the number of buckets. + */ +object CoalesceBucketsInJoin extends Rule[LogicalPlan] { Review comment: done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r419830393 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala ## @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule adds a `CoalesceBuckets` logical plan if one side of two bucketed tables can be + * coalesced when the two bucketed tables are joined and they differ in the number of buckets. Review comment: Added more comments This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r419830249 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala ## @@ -221,3 +223,22 @@ object FileSourceStrategy extends Strategy with Logging { case _ => Nil } } + +/** + * Extractor that handles `CoalesceBuckets` in the child plan extracted from `ScanOperation`. + */ +object ScanOperationWithCoalescedBuckets { Review comment: I added it in CoalesceBucketsInEquiJoinSuite.scala. (Please let me know if it makes more sense to have it in FileSourceStrategySuite.scala) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r417724687 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule injects a hint if one side of two bucketed tables can be coalesced + * when the two bucketed tables are inner-joined and they differ in the number of buckets. + */ +object CoalesceBucketsInJoin extends Rule[LogicalPlan] { + private def isPlanEligible(plan: LogicalPlan): Boolean = { +def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = { + p(plan) && plan.children.forall(forall(_)(p)) +} + +forall(plan) { + case _: Filter | _: Project | _: LogicalRelation => true + case _ => false +} + } + + private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = { +if (isPlanEligible(plan)) { + plan.collectFirst { +case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) if r.bucketSpec.nonEmpty => + r.bucketSpec.get + } +} else { + None +} + } + + private def mayCoalesce( + numBuckets1: Int, + numBuckets2: Int, + maxNumBucketsDiff: Int): Option[Int] = { +assert(numBuckets1 != numBuckets2) +val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) +// A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller +// number of buckets because bucket id is calculated by modding the total number of buckets. +if ((large % small == 0) && + ((large - small) <= maxNumBucketsDiff)) { + Some(small) +} else { + None +} + } + + private def addCoalesceBuckets(plan: LogicalPlan, numCoalescedBuckets: Int): LogicalPlan = { +plan.transformUp { + case l @ LogicalRelation(_: HadoopFsRelation, _, _, _) => +CoalesceBuckets(numCoalescedBuckets, l) +} + } + + object ExtractJoinWithBuckets { +def unapply(plan: LogicalPlan): Option[(Join, Int, Int)] = { + plan match { +case join: Join => Review comment: Yes, good idea. I changed it to apply coalescing only for equi-joins using `ExtractEquiJoinKeys`. Please let me know what you think. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r417052767 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule injects a hint if one side of two bucketed tables can be coalesced + * when the two bucketed tables are inner-joined and they differ in the number of buckets. + */ +object CoalesceBucketsInJoin extends Rule[LogicalPlan] { + private def isPlanEligible(plan: LogicalPlan): Boolean = { +def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = { + p(plan) && plan.children.forall(forall(_)(p)) +} + +forall(plan) { + case _: Filter | _: Project | _: LogicalRelation => true + case _ => false +} + } + + private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = { +if (isPlanEligible(plan)) { + plan.collectFirst { +case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) if r.bucketSpec.nonEmpty => + r.bucketSpec.get + } +} else { + None +} + } + + private def mayCoalesce( + numBuckets1: Int, + numBuckets2: Int, + maxNumBucketsDiff: Int): Option[Int] = { +assert(numBuckets1 != numBuckets2) +val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) +// A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller +// number of buckets because bucket id is calculated by modding the total number of buckets. +if ((large % small == 0) && + ((large - small) <= maxNumBucketsDiff)) { + Some(small) +} else { + None +} + } + + private def addCoalesceBuckets(plan: LogicalPlan, numCoalescedBuckets: Int): LogicalPlan = { +plan.transformUp { + case l @ LogicalRelation(_: HadoopFsRelation, _, _, _) => +CoalesceBuckets(numCoalescedBuckets, l) +} + } + + object ExtractJoinWithBuckets { +def unapply(plan: LogicalPlan): Option[(Join, Int, Int)] = { + plan match { +case join: Join => Review comment: It will remove shuffle for full-outer join as well: With the feature off: ``` scala> t1.join(t2, t1("i") === t2("i"), "full_outer").explain == Physical Plan == SortMergeJoin [i#67], [i#73], FullOuter :- *(2) Sort [i#67 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(i#67, 200), true, [id=#418] : +- *(1) ColumnarToRow :+- FileScan parquet default.t1[i#67,j#68,k#69] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [], ReadSchema: struct, SelectedBucketsCount: 8 out of 8 +- *(4) Sort [i#73 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(i#73, 200), true, [id=#425] +- *(3) ColumnarToRow +- FileScan parquet default.t2[i#73,j#74,k#75] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [], ReadSchema: struct, SelectedBucketsCount: 4 out of 4 ``` With the feature on: ``` scala> t1.join(t2, t1("i") === t2("i"), "full_outer").explain == Physical Plan == SortMergeJoin [i#67], [i#73], FullOuter :- *(1) Sort [i#67 ASC NULLS FIRST], false, 0 : +- *(1) ColumnarToRow : +- FileScan parquet default.t1[i#67,j#68,k#69] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[], PartitionFilters: [], PushedFilters: [], ReadSchema: struct, SelectedBucketsCount: 8 out of 8 +- *(2) Sort [i#73 ASC NULLS FIRST], false, 0 +- *(2) ColumnarToRow +- FileScan parquet
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r417052075 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala ## @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule injects a hint if one side of two bucketed tables can be coalesced + * when the two bucketed tables are inner-joined and they differ in the number of buckets. + */ +object CoalesceBucketsInJoin extends Rule[LogicalPlan] { + private def isPlanEligible(plan: LogicalPlan): Boolean = { +def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = { + p(plan) && plan.children.forall(forall(_)(p)) +} + +forall(plan) { + case _: Filter | _: Project | _: LogicalRelation => true + case _ => false +} + } + + private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = { +if (isPlanEligible(plan)) { + plan.collectFirst { +case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) if r.bucketSpec.nonEmpty => + r.bucketSpec.get + } +} else { + None +} + } + + private def mayCoalesce( + numBuckets1: Int, + numBuckets2: Int, + maxNumBucketsDiff: Int): Option[Int] = { Review comment: I changed it to pass sqlConf (I guess that's what you were leaning toward?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r414267327 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ## @@ -855,4 +874,30 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } } + + test("bucket coalescing eliminates shuffle") { Review comment: Added more tests. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r411825255 ## File path: sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ## @@ -855,4 +874,30 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } } + + test("bucket coalescing eliminates shuffle") { Review comment: Still working on this, will update tests in the next iteration. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r411825015 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -2574,6 +2574,27 @@ object SQLConf { .booleanConf .createWithDefault(false) + val COALESCE_BUCKET_IN_JOIN_ENABLED = +buildConf("spark.sql.bucketing.coalesceBucketInJoin.enabled") + .internal() + .doc("When true, if two bucketed tables with a different number of buckets are joined, " + +"the side with a bigger number of buckets will be coalesced to have the same number " + +"of buckets as the other side. This bucket coalescing can happen only when the bigger " + +"number of buckets is divisible by the smaller number of buckets.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + val COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF = +buildConf("spark.sql.bucketing.coalesceBucketInJoin.maxNumBucketsDiff") Review comment: The reason I picked a diff is we could have ratio of 2, but the actual difference could be large (e.g, 1000 / 500 => ratio of 2 but diff is 500). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r411824674 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule injects a hint if one side of two bucketed tables can be coalesced + * when the two bucketed tables are inner-joined and they differ in the number of buckets. + */ +object CoalesceBucketInJoin extends Rule[LogicalPlan] { + val JOIN_HINT_COALESCED_NUM_BUCKETS: String = "JoinHintCoalescedNumBuckets" + + private val sqlConf = SQLConf.get + + private def isPlanEligible(plan: LogicalPlan): Boolean = { +def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = { + p(plan) && plan.children.forall(forall(_)(p)) +} + +forall(plan) { + case _: Filter | _: Project | _: LogicalRelation => true + case _ => false +} + } + + private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = { +if (isPlanEligible(plan)) { + plan.collectFirst { +case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) + if r.bucketSpec.nonEmpty && !r.options.contains(JOIN_HINT_COALESCED_NUM_BUCKETS) => + r.bucketSpec.get + } +} else { + None +} + } + + private def mayCoalesce(numBuckets1: Int, numBuckets2: Int): Option[Int] = { +assert(numBuckets1 != numBuckets2) +val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) +// A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller +// number of buckets because bucket id is calculated by modding the total number of buckets. +if ((large % small == 0) && + (large - small) <= sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF)) { + Some(small) +} else { + None +} + } + + private def addBucketHint(plan: LogicalPlan, hint: (String, String)): LogicalPlan = { +plan.transformUp { + case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) => +l.copy(relation = r.copy(options = r.options + hint)(r.sparkSession)) +} + } + + def apply(plan: LogicalPlan): LogicalPlan = { +if (!sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_ENABLED)) { + return plan +} + +plan transform { + case join: Join if join.joinType == Inner => Review comment: I am not quite sure if this would have a negative impact on full outer joins? Could you explain? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r411823635 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -2574,6 +2574,27 @@ object SQLConf { .booleanConf .createWithDefault(false) + val COALESCE_BUCKET_IN_JOIN_ENABLED = +buildConf("spark.sql.bucketing.coalesceBucketInJoin.enabled") + .internal() Review comment: Removed `internal` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r411823364 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -2574,6 +2574,17 @@ object SQLConf { .booleanConf .createWithDefault(false) + val BUCKETING_COALESCE_ENABLED = +buildConf("spark.sql.bucketing.coalesce") Review comment: Changed to `coalesceBucketsInJoin` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r411823192 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala ## @@ -159,6 +159,14 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { children.foldLeft(Option.empty[BaseType]) { (l, r) => l.orElse(r.find(f)) } } + /** + * Tests whether a predicate holds for all nodes. + * @param p the predicate function to be applied to each node in the tree. + */ + def forall(p: BaseType => Boolean): Boolean = { Review comment: Moved to the caller side. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r411823045 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ## @@ -309,7 +328,8 @@ case class FileSourceScanExec( files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) -if (singleFilePartitions) { +// TODO Sort order is currently ignored if buckets are coalesced. Review comment: Added. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r411809034 ## File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ## @@ -2574,6 +2574,27 @@ object SQLConf { .booleanConf .createWithDefault(false) + val COALESCE_BUCKET_IN_JOIN_ENABLED = +buildConf("spark.sql.bucketing.coalesceBucketInJoin.enabled") + .internal() + .doc("When true, if two bucketed tables with a different number of buckets are joined, " + +"the side with a bigger number of buckets will be coalesced to have the same number " + +"of buckets as the other side. This bucket coalescing can happen only when the bigger " + +"number of buckets is divisible by the smaller number of buckets.") + .version("3.1.0") + .booleanConf + .createWithDefault(false) + + val COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF = +buildConf("spark.sql.bucketing.coalesceBucketInJoin.maxNumBucketsDiff") + .doc("The difference in count of two buckets being coalesced should be less than or " + +"equal to this value for bucket coalescing to be applied. This configuration only " + +s"has an effect when '${COALESCE_BUCKET_IN_JOIN_ENABLED.key}' is set to true.") + .version("3.1.0") + .intConf + .checkValue(_ > 0, "The minimum number of partitions must be positive.") + .createWithDefault(256) Review comment: This was the power of 2 that's close to default shuffle value (200), and was used for some of the benchmarks I ran (512 -> 256). Let me know if you have some default number in mind :). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: URL: https://github.com/apache/spark/pull/28123#discussion_r411038403 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule injects a hint if one side of two bucketed tables can be coalesced + * when the two bucketed tables are inner-joined and they differ in the number of buckets. + */ +object CoalesceBucketInJoin extends Rule[LogicalPlan] { + val JOIN_HINT_COALESCED_NUM_BUCKETS: String = "JoinHintCoalescedNumBuckets" + + private val sqlConf = SQLConf.get + + private def isPlanEligible(plan: LogicalPlan): Boolean = { +def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = { + p(plan) && plan.children.forall(forall(_)(p)) +} + +forall(plan) { + case _: Filter | _: Project | _: LogicalRelation => true + case _ => false +} + } + + private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = { +if (isPlanEligible(plan)) { + plan.collectFirst { +case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) + if r.bucketSpec.nonEmpty && !r.options.contains(JOIN_HINT_COALESCED_NUM_BUCKETS) => + r.bucketSpec.get + } +} else { + None +} + } + + private def mayCoalesce(numBuckets1: Int, numBuckets2: Int): Option[Int] = { +assert(numBuckets1 != numBuckets2) +val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) +// A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller +// number of buckets because bucket id is calculated by modding the total number of buckets. +if ((large % small == 0) && + (large - small) <= sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF)) { + Some(small) +} else { + None +} + } + + private def addBucketHint(plan: LogicalPlan, hint: (String, String)): LogicalPlan = { +plan.transformUp { + case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) => +l.copy(relation = r.copy(options = r.options + hint)(r.sparkSession)) Review comment: Thanks, that should work! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable URL: https://github.com/apache/spark/pull/28123#discussion_r408368607 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule injects a hint if one side of two bucketed tables can be coalesced + * when the two bucketed tables are inner-joined and they differ in the number of buckets. + */ +object CoalesceBucketInJoin extends Rule[LogicalPlan] { + val JOIN_HINT_COALESCED_NUM_BUCKETS: String = "JoinHintCoalescedNumBuckets" + + private val sqlConf = SQLConf.get + + private def isPlanEligible(plan: LogicalPlan): Boolean = { +def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = { + p(plan) && plan.children.forall(forall(_)(p)) +} + +forall(plan) { + case _: Filter | _: Project | _: LogicalRelation => true + case _ => false +} + } + + private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = { +if (isPlanEligible(plan)) { + plan.collectFirst { +case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) + if r.bucketSpec.nonEmpty && !r.options.contains(JOIN_HINT_COALESCED_NUM_BUCKETS) => + r.bucketSpec.get + } +} else { + None +} + } + + private def mayCoalesce(numBuckets1: Int, numBuckets2: Int): Option[Int] = { +assert(numBuckets1 != numBuckets2) +val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) +// A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller +// number of buckets because bucket id is calculated by modding the total number of buckets. +if ((large % small == 0) && + (large - small) <= sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF)) { + Some(small) +} else { + None +} + } + + private def addBucketHint(plan: LogicalPlan, hint: (String, String)): LogicalPlan = { +plan.transformUp { + case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) => +l.copy(relation = r.copy(options = r.options + hint)(r.sparkSession)) Review comment: There are the options I can think of: 1. Handle if `JOIN_HINT_COALESCED_NUM_BUCKETS` is incorrectly set. 2. Update `HadoopFsRelation` to have an extra field. 3. Update `LogicalRelation` to have a `hint` similar to `Join`. What do you think? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable URL: https://github.com/apache/spark/pull/28123#discussion_r407866696 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule injects a hint if one side of two bucketed tables can be coalesced + * when the two bucketed tables are inner-joined and they differ in the number of buckets. + */ +object CoalesceBucketInJoin extends Rule[LogicalPlan] { + val JOIN_HINT_COALESCED_NUM_BUCKETS: String = "JoinHintCoalescedNumBuckets" + + private val sqlConf = SQLConf.get + + private def isPlanEligible(plan: LogicalPlan): Boolean = { +def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = { + p(plan) && plan.children.forall(forall(_)(p)) +} + +forall(plan) { + case _: Filter | _: Project | _: LogicalRelation => true + case _ => false +} + } + + private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = { +if (isPlanEligible(plan)) { + plan.collectFirst { +case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) + if r.bucketSpec.nonEmpty && !r.options.contains(JOIN_HINT_COALESCED_NUM_BUCKETS) => + r.bucketSpec.get + } +} else { + None +} + } + + private def mayCoalesce(numBuckets1: Int, numBuckets2: Int): Option[Int] = { +assert(numBuckets1 != numBuckets2) +val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) +// A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller +// number of buckets because bucket id is calculated by modding the total number of buckets. +if ((large % small == 0) && + (large - small) <= sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF)) { + Some(small) +} else { + None +} + } + + private def addBucketHint(plan: LogicalPlan, hint: (String, String)): LogicalPlan = { +plan.transformUp { + case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) => +l.copy(relation = r.copy(options = r.options + hint)(r.sparkSession)) Review comment: > You cannot extract a join plan pattern, e.g., `Join(LogicalRelation(HadoopFsRelation), LogicalRelation(HadoopFsRelation))`, just like `ScanOperation`? If I match a `Join` in `FileSourceStrategy`, I need to return `SparkPlan` for `Join`, no? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable URL: https://github.com/apache/spark/pull/28123#discussion_r407864011 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule injects a hint if one side of two bucketed tables can be coalesced + * when the two bucketed tables are inner-joined and they differ in the number of buckets. + */ +object CoalesceBucketInJoin extends Rule[LogicalPlan] { + val JOIN_HINT_COALESCED_NUM_BUCKETS: String = "JoinHintCoalescedNumBuckets" + + private val sqlConf = SQLConf.get + + private def isPlanEligible(plan: LogicalPlan): Boolean = { +def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = { + p(plan) && plan.children.forall(forall(_)(p)) +} + +forall(plan) { + case _: Filter | _: Project | _: LogicalRelation => true + case _ => false +} + } + + private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = { +if (isPlanEligible(plan)) { + plan.collectFirst { +case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) + if r.bucketSpec.nonEmpty && !r.options.contains(JOIN_HINT_COALESCED_NUM_BUCKETS) => + r.bucketSpec.get + } +} else { + None +} + } + + private def mayCoalesce(numBuckets1: Int, numBuckets2: Int): Option[Int] = { +assert(numBuckets1 != numBuckets2) +val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) +// A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller +// number of buckets because bucket id is calculated by modding the total number of buckets. +if ((large % small == 0) && + (large - small) <= sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF)) { + Some(small) +} else { + None +} + } + + private def addBucketHint(plan: LogicalPlan, hint: (String, String)): LogicalPlan = { +plan.transformUp { + case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) => +l.copy(relation = r.copy(options = r.options + hint)(r.sparkSession)) +} + } + + def apply(plan: LogicalPlan): LogicalPlan = { +if (!sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_ENABLED)) { + return plan +} + +plan transform { + case join: Join if join.joinType == Inner => Review comment: Thanks for pointing this out. Actually, this should apply for any join types; we don't check the join types when buckets are used. This optimization would help in equi-join cases but I don't think this will negatively impact the non equi-join cases. (please correct me if I am wrong here) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable URL: https://github.com/apache/spark/pull/28123#discussion_r407836362 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala ## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.bucketing + +import org.apache.spark.sql.catalyst.catalog.BucketSpec +import org.apache.spark.sql.catalyst.plans.Inner +import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf + +/** + * This rule injects a hint if one side of two bucketed tables can be coalesced + * when the two bucketed tables are inner-joined and they differ in the number of buckets. + */ +object CoalesceBucketInJoin extends Rule[LogicalPlan] { + val JOIN_HINT_COALESCED_NUM_BUCKETS: String = "JoinHintCoalescedNumBuckets" + + private val sqlConf = SQLConf.get + + private def isPlanEligible(plan: LogicalPlan): Boolean = { +def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = { + p(plan) && plan.children.forall(forall(_)(p)) +} + +forall(plan) { + case _: Filter | _: Project | _: LogicalRelation => true + case _ => false +} + } + + private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = { +if (isPlanEligible(plan)) { + plan.collectFirst { +case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) + if r.bucketSpec.nonEmpty && !r.options.contains(JOIN_HINT_COALESCED_NUM_BUCKETS) => + r.bucketSpec.get + } +} else { + None +} + } + + private def mayCoalesce(numBuckets1: Int, numBuckets2: Int): Option[Int] = { +assert(numBuckets1 != numBuckets2) +val (small, large) = (math.min(numBuckets1, numBuckets2), math.max(numBuckets1, numBuckets2)) +// A bucket can be coalesced only if the bigger number of buckets is divisible by the smaller +// number of buckets because bucket id is calculated by modding the total number of buckets. +if ((large % small == 0) && + (large - small) <= sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF)) { + Some(small) +} else { + None +} + } + + private def addBucketHint(plan: LogicalPlan, hint: (String, String)): LogicalPlan = { +plan.transformUp { + case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) => +l.copy(relation = r.copy(options = r.options + hint)(r.sparkSession)) Review comment: Ah, good point. But I am not sure if I can do something similar in a strategy; for example, if I see a `Join` in a strategy, I need to act on it, and if I see a scan node, I cannot traverse up the `TreeNode`. Modifying `BucketSpec` is not desirable either, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable URL: https://github.com/apache/spark/pull/28123#discussion_r406579248 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ## @@ -309,7 +328,8 @@ case class FileSourceScanExec( files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) -if (singleFilePartitions) { +// TODO Sort order is currently ignored if buckets are coalesced. Review comment: Yes. This will eliminate the sort when there are multiple files per bucket (and if each file is sorted). And it seems that the work will be orthogonal to this PR anyway, so I will give it a shot and create a PR if I see some improvements in benchmark. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable
imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable URL: https://github.com/apache/spark/pull/28123#discussion_r403584484 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ## @@ -309,7 +328,8 @@ case class FileSourceScanExec( files.map(_.getPath.getName).groupBy(file => BucketingUtils.getBucketId(file)) val singleFilePartitions = bucketToFilesGrouping.forall(p => p._2.length <= 1) -if (singleFilePartitions) { +// TODO Sort order is currently ignored if buckets are coalesced. Review comment: I can work on preserving the sort order as a separate PR if this PR makes sense. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org