[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-05-26 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r430047108



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInEquiJoin.scala
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, 
Project, UnaryNode}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Wraps `LogicalRelation` to provide the number of buckets for coalescing.
+ */
+case class CoalesceBuckets(
+numCoalescedBuckets: Int,
+child: LogicalRelation) extends UnaryNode {
+  require(numCoalescedBuckets > 0,
+s"Number of coalesced buckets ($numCoalescedBuckets) must be positive.")
+
+  override def output: Seq[Attribute] = child.output
+}
+
+/**
+ * This rule adds a `CoalesceBuckets` logical plan if the following conditions 
are met:
+ *   - Two bucketed tables are joined.
+ *   - Join is the equi-join.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - "spark.sql.bucketing.coalesceBucketsInJoin.enabled" is set to true.
+ *   - The difference in the number of buckets is less than the value set in
+ * "spark.sql.bucketing.coalesceBucketsInJoin.maxNumBucketsDiff".
+ */
+object CoalesceBucketsInEquiJoin extends Rule[LogicalPlan]  {
+  private def isPlanEligible(plan: LogicalPlan): Boolean = {
+def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = {
+  p(plan) && plan.children.forall(forall(_)(p))
+}
+
+forall(plan) {
+  case _: Filter | _: Project | _: LogicalRelation => true
+  case _ => false
+}
+  }
+
+  private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = {
+if (isPlanEligible(plan)) {
+  plan.collectFirst {
+case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) if 
r.bucketSpec.nonEmpty =>
+  r.bucketSpec.get
+  }
+} else {
+  None
+}
+  }
+
+  private def mayCoalesce(numBuckets1: Int, numBuckets2: Int, conf: SQLConf): 
Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if ((large % small == 0) && ((large - small) <= 
conf.coalesceBucketsInJoinMaxNumBucketsDiff)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def addCoalesceBuckets(plan: LogicalPlan, numCoalescedBuckets: Int): 
LogicalPlan = {
+plan.transformUp {
+  case l @ LogicalRelation(_: HadoopFsRelation, _, _, _) =>
+CoalesceBuckets(numCoalescedBuckets, l)
+}
+  }
+
+  object ExtractJoinWithBuckets {
+def unapply(plan: LogicalPlan): Option[(Join, Int, Int)] = {
+  plan match {
+case join @ ExtractEquiJoinKeys(_, _, _, _, left, right, _) =>

Review comment:
   Updated to use a physical rule.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-05-20 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r428435196



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInEquiJoin.scala
##
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, 
Project, UnaryNode}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * Wraps `LogicalRelation` to provide the number of buckets for coalescing.
+ */
+case class CoalesceBuckets(
+numCoalescedBuckets: Int,
+child: LogicalRelation) extends UnaryNode {
+  require(numCoalescedBuckets > 0,
+s"Number of coalesced buckets ($numCoalescedBuckets) must be positive.")
+
+  override def output: Seq[Attribute] = child.output
+}
+
+/**
+ * This rule adds a `CoalesceBuckets` logical plan if the following conditions 
are met:
+ *   - Two bucketed tables are joined.
+ *   - Join is the equi-join.
+ *   - The larger bucket number is divisible by the smaller bucket number.
+ *   - "spark.sql.bucketing.coalesceBucketsInJoin.enabled" is set to true.
+ *   - The difference in the number of buckets is less than the value set in
+ * "spark.sql.bucketing.coalesceBucketsInJoin.maxNumBucketsDiff".
+ */
+object CoalesceBucketsInEquiJoin extends Rule[LogicalPlan]  {
+  private def isPlanEligible(plan: LogicalPlan): Boolean = {
+def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = {
+  p(plan) && plan.children.forall(forall(_)(p))
+}
+
+forall(plan) {
+  case _: Filter | _: Project | _: LogicalRelation => true
+  case _ => false
+}
+  }
+
+  private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = {
+if (isPlanEligible(plan)) {
+  plan.collectFirst {
+case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) if 
r.bucketSpec.nonEmpty =>
+  r.bucketSpec.get
+  }
+} else {
+  None
+}
+  }
+
+  private def mayCoalesce(numBuckets1: Int, numBuckets2: Int, conf: SQLConf): 
Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if ((large % small == 0) && ((large - small) <= 
conf.coalesceBucketsInJoinMaxNumBucketsDiff)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def addCoalesceBuckets(plan: LogicalPlan, numCoalescedBuckets: Int): 
LogicalPlan = {
+plan.transformUp {
+  case l @ LogicalRelation(_: HadoopFsRelation, _, _, _) =>
+CoalesceBuckets(numCoalescedBuckets, l)
+}
+  }
+
+  object ExtractJoinWithBuckets {
+def unapply(plan: LogicalPlan): Option[(Join, Int, Int)] = {
+  plan match {
+case join @ ExtractEquiJoinKeys(_, _, _, _, left, right, _) =>

Review comment:
   Sure, I guess the concern here is the unnecessarily reduced parallelism 
if broadcast join is picked.
   
   Is `QueryExecution.preparations` a reasonable place to insert the rule? 
(That seems to be the only place where you can plug in a rule that takes in a 
`SparkPlan` and returns a `SparkPlan`.)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: 

[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-05-04 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r419830462



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule adds a `CoalesceBuckets` logical plan if one side of two bucketed 
tables can be
+ * coalesced when the two bucketed tables are joined and they differ in the 
number of buckets.
+ */
+object CoalesceBucketsInJoin extends Rule[LogicalPlan]  {

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-05-04 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r419830393



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
##
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule adds a `CoalesceBuckets` logical plan if one side of two bucketed 
tables can be
+ * coalesced when the two bucketed tables are joined and they differ in the 
number of buckets.

Review comment:
   Added more comments





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-05-04 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r419830249



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
##
@@ -221,3 +223,22 @@ object FileSourceStrategy extends Strategy with Logging {
 case _ => Nil
   }
 }
+
+/**
+ * Extractor that handles `CoalesceBuckets` in the child plan extracted from 
`ScanOperation`.
+ */
+object ScanOperationWithCoalescedBuckets {

Review comment:
   I added it in CoalesceBucketsInEquiJoinSuite.scala. (Please let me know 
if it makes more sense to have it in FileSourceStrategySuite.scala)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-29 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r417724687



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule injects a hint if one side of two bucketed tables can be coalesced
+ * when the two bucketed tables are inner-joined and they differ in the number 
of buckets.
+ */
+object CoalesceBucketsInJoin extends Rule[LogicalPlan]  {
+  private def isPlanEligible(plan: LogicalPlan): Boolean = {
+def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = {
+  p(plan) && plan.children.forall(forall(_)(p))
+}
+
+forall(plan) {
+  case _: Filter | _: Project | _: LogicalRelation => true
+  case _ => false
+}
+  }
+
+  private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = {
+if (isPlanEligible(plan)) {
+  plan.collectFirst {
+case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) if 
r.bucketSpec.nonEmpty =>
+  r.bucketSpec.get
+  }
+} else {
+  None
+}
+  }
+
+  private def mayCoalesce(
+  numBuckets1: Int,
+  numBuckets2: Int,
+  maxNumBucketsDiff: Int): Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if ((large % small == 0) &&
+  ((large - small) <= maxNumBucketsDiff)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def addCoalesceBuckets(plan: LogicalPlan, numCoalescedBuckets: Int): 
LogicalPlan = {
+plan.transformUp {
+  case l @ LogicalRelation(_: HadoopFsRelation, _, _, _) =>
+CoalesceBuckets(numCoalescedBuckets, l)
+}
+  }
+
+  object ExtractJoinWithBuckets {
+def unapply(plan: LogicalPlan): Option[(Join, Int, Int)] = {
+  plan match {
+case join: Join =>

Review comment:
   Yes, good idea. I changed it to apply coalescing only for equi-joins 
using `ExtractEquiJoinKeys`. Please let me know what you think.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-28 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r417052767



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule injects a hint if one side of two bucketed tables can be coalesced
+ * when the two bucketed tables are inner-joined and they differ in the number 
of buckets.
+ */
+object CoalesceBucketsInJoin extends Rule[LogicalPlan]  {
+  private def isPlanEligible(plan: LogicalPlan): Boolean = {
+def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = {
+  p(plan) && plan.children.forall(forall(_)(p))
+}
+
+forall(plan) {
+  case _: Filter | _: Project | _: LogicalRelation => true
+  case _ => false
+}
+  }
+
+  private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = {
+if (isPlanEligible(plan)) {
+  plan.collectFirst {
+case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) if 
r.bucketSpec.nonEmpty =>
+  r.bucketSpec.get
+  }
+} else {
+  None
+}
+  }
+
+  private def mayCoalesce(
+  numBuckets1: Int,
+  numBuckets2: Int,
+  maxNumBucketsDiff: Int): Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if ((large % small == 0) &&
+  ((large - small) <= maxNumBucketsDiff)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def addCoalesceBuckets(plan: LogicalPlan, numCoalescedBuckets: Int): 
LogicalPlan = {
+plan.transformUp {
+  case l @ LogicalRelation(_: HadoopFsRelation, _, _, _) =>
+CoalesceBuckets(numCoalescedBuckets, l)
+}
+  }
+
+  object ExtractJoinWithBuckets {
+def unapply(plan: LogicalPlan): Option[(Join, Int, Int)] = {
+  plan match {
+case join: Join =>

Review comment:
   It will remove shuffle for full-outer join as well:
   With the feature off:
   ```
   scala> t1.join(t2, t1("i") === t2("i"), "full_outer").explain
   == Physical Plan ==
   SortMergeJoin [i#67], [i#73], FullOuter
   :- *(2) Sort [i#67 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(i#67, 200), true, [id=#418]
   : +- *(1) ColumnarToRow
   :+- FileScan parquet default.t1[i#67,j#68,k#69] Batched: true, 
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct, SelectedBucketsCount: 8 out of 8
   +- *(4) Sort [i#73 ASC NULLS FIRST], false, 0
  +- Exchange hashpartitioning(i#73, 200), true, [id=#425]
 +- *(3) ColumnarToRow
+- FileScan parquet default.t2[i#73,j#74,k#75] Batched: true, 
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct, SelectedBucketsCount: 4 out of 4
   ```
   With the feature on:
   ```
   scala> t1.join(t2, t1("i") === t2("i"), "full_outer").explain
   == Physical Plan ==
   SortMergeJoin [i#67], [i#73], FullOuter
   :- *(1) Sort [i#67 ASC NULLS FIRST], false, 0
   :  +- *(1) ColumnarToRow
   : +- FileScan parquet default.t1[i#67,j#68,k#69] Batched: true, 
DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[], 
PartitionFilters: [], PushedFilters: [], ReadSchema: 
struct, SelectedBucketsCount: 8 out of 8
   +- *(2) Sort [i#73 ASC NULLS FIRST], false, 0
  +- *(2) ColumnarToRow
 +- FileScan parquet 

[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-28 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r417052075



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketsInJoin.scala
##
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule injects a hint if one side of two bucketed tables can be coalesced
+ * when the two bucketed tables are inner-joined and they differ in the number 
of buckets.
+ */
+object CoalesceBucketsInJoin extends Rule[LogicalPlan]  {
+  private def isPlanEligible(plan: LogicalPlan): Boolean = {
+def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = {
+  p(plan) && plan.children.forall(forall(_)(p))
+}
+
+forall(plan) {
+  case _: Filter | _: Project | _: LogicalRelation => true
+  case _ => false
+}
+  }
+
+  private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = {
+if (isPlanEligible(plan)) {
+  plan.collectFirst {
+case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _) if 
r.bucketSpec.nonEmpty =>
+  r.bucketSpec.get
+  }
+} else {
+  None
+}
+  }
+
+  private def mayCoalesce(
+  numBuckets1: Int,
+  numBuckets2: Int,
+  maxNumBucketsDiff: Int): Option[Int] = {

Review comment:
   I changed it to pass sqlConf (I guess that's what you were leaning 
toward?)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-23 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r414267327



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
##
@@ -855,4 +874,30 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils {
   }
 }
   }
+
+  test("bucket coalescing eliminates shuffle") {

Review comment:
   Added more tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-20 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r411825255



##
File path: 
sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
##
@@ -855,4 +874,30 @@ abstract class BucketedReadSuite extends QueryTest with 
SQLTestUtils {
   }
 }
   }
+
+  test("bucket coalescing eliminates shuffle") {

Review comment:
   Still working on this, will update tests in the next iteration.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-20 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r411825015



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2574,6 +2574,27 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val COALESCE_BUCKET_IN_JOIN_ENABLED =
+buildConf("spark.sql.bucketing.coalesceBucketInJoin.enabled")
+  .internal()
+  .doc("When true, if two bucketed tables with a different number of 
buckets are joined, " +
+"the side with a bigger number of buckets will be coalesced to have 
the same number " +
+"of buckets as the other side. This bucket coalescing can happen only 
when the bigger " +
+"number of buckets is divisible by the smaller number of buckets.")
+  .version("3.1.0")
+  .booleanConf
+  .createWithDefault(false)
+
+  val COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF =
+buildConf("spark.sql.bucketing.coalesceBucketInJoin.maxNumBucketsDiff")

Review comment:
   The reason I picked a diff is we could have ratio of 2, but the actual 
difference could be large (e.g, 1000 / 500 => ratio of 2 but diff is 500).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-20 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r411824674



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule injects a hint if one side of two bucketed tables can be coalesced
+ * when the two bucketed tables are inner-joined and they differ in the number 
of buckets.
+ */
+object CoalesceBucketInJoin extends Rule[LogicalPlan]  {
+  val JOIN_HINT_COALESCED_NUM_BUCKETS: String = "JoinHintCoalescedNumBuckets"
+
+  private val sqlConf = SQLConf.get
+
+  private def isPlanEligible(plan: LogicalPlan): Boolean = {
+def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = {
+  p(plan) && plan.children.forall(forall(_)(p))
+}
+
+forall(plan) {
+  case _: Filter | _: Project | _: LogicalRelation => true
+  case _ => false
+}
+  }
+
+  private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = {
+if (isPlanEligible(plan)) {
+  plan.collectFirst {
+case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _)
+  if r.bucketSpec.nonEmpty && 
!r.options.contains(JOIN_HINT_COALESCED_NUM_BUCKETS) =>
+  r.bucketSpec.get
+  }
+} else {
+  None
+}
+  }
+
+  private def mayCoalesce(numBuckets1: Int, numBuckets2: Int): Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if ((large % small == 0) &&
+  (large - small) <= 
sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def addBucketHint(plan: LogicalPlan, hint: (String, String)): 
LogicalPlan = {
+plan.transformUp {
+  case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) =>
+l.copy(relation = r.copy(options = r.options + hint)(r.sparkSession))
+}
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_ENABLED)) {
+  return plan
+}
+
+plan transform {
+  case join: Join if join.joinType == Inner =>

Review comment:
   I am not quite sure if this would have a negative impact on full outer 
joins? Could you explain?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-20 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r411823635



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2574,6 +2574,27 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val COALESCE_BUCKET_IN_JOIN_ENABLED =
+buildConf("spark.sql.bucketing.coalesceBucketInJoin.enabled")
+  .internal()

Review comment:
   Removed `internal`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-20 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r411823364



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2574,6 +2574,17 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val BUCKETING_COALESCE_ENABLED =
+buildConf("spark.sql.bucketing.coalesce")

Review comment:
   Changed to `coalesceBucketsInJoin`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-20 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r411823192



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
##
@@ -159,6 +159,14 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
 children.foldLeft(Option.empty[BaseType]) { (l, r) => l.orElse(r.find(f)) }
   }
 
+  /**
+   * Tests whether a predicate holds for all nodes.
+   * @param p the predicate function to be applied to each node in the tree.
+   */
+  def forall(p: BaseType => Boolean): Boolean = {

Review comment:
   Moved to the caller side.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-20 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r411823045



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
##
@@ -309,7 +328,8 @@ case class FileSourceScanExec(
   files.map(_.getPath.getName).groupBy(file => 
BucketingUtils.getBucketId(file))
 val singleFilePartitions = bucketToFilesGrouping.forall(p => 
p._2.length <= 1)
 
-if (singleFilePartitions) {
+// TODO Sort order is currently ignored if buckets are coalesced.

Review comment:
   Added.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-20 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r411809034



##
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##
@@ -2574,6 +2574,27 @@ object SQLConf {
   .booleanConf
   .createWithDefault(false)
 
+  val COALESCE_BUCKET_IN_JOIN_ENABLED =
+buildConf("spark.sql.bucketing.coalesceBucketInJoin.enabled")
+  .internal()
+  .doc("When true, if two bucketed tables with a different number of 
buckets are joined, " +
+"the side with a bigger number of buckets will be coalesced to have 
the same number " +
+"of buckets as the other side. This bucket coalescing can happen only 
when the bigger " +
+"number of buckets is divisible by the smaller number of buckets.")
+  .version("3.1.0")
+  .booleanConf
+  .createWithDefault(false)
+
+  val COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF =
+buildConf("spark.sql.bucketing.coalesceBucketInJoin.maxNumBucketsDiff")
+  .doc("The difference in count of two buckets being coalesced should be 
less than or " +
+"equal to this value for bucket coalescing to be applied. This 
configuration only " +
+s"has an effect when '${COALESCE_BUCKET_IN_JOIN_ENABLED.key}' is set 
to true.")
+  .version("3.1.0")
+  .intConf
+  .checkValue(_ > 0, "The minimum number of partitions must be positive.")
+  .createWithDefault(256)

Review comment:
   This was the power of 2 that's close to default shuffle value (200), and 
was used for some of the benchmarks I ran (512 -> 256). Let me know if you have 
some default number in mind :).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-19 Thread GitBox


imback82 commented on a change in pull request #28123:
URL: https://github.com/apache/spark/pull/28123#discussion_r411038403



##
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala
##
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule injects a hint if one side of two bucketed tables can be coalesced
+ * when the two bucketed tables are inner-joined and they differ in the number 
of buckets.
+ */
+object CoalesceBucketInJoin extends Rule[LogicalPlan]  {
+  val JOIN_HINT_COALESCED_NUM_BUCKETS: String = "JoinHintCoalescedNumBuckets"
+
+  private val sqlConf = SQLConf.get
+
+  private def isPlanEligible(plan: LogicalPlan): Boolean = {
+def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = {
+  p(plan) && plan.children.forall(forall(_)(p))
+}
+
+forall(plan) {
+  case _: Filter | _: Project | _: LogicalRelation => true
+  case _ => false
+}
+  }
+
+  private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = {
+if (isPlanEligible(plan)) {
+  plan.collectFirst {
+case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _)
+  if r.bucketSpec.nonEmpty && 
!r.options.contains(JOIN_HINT_COALESCED_NUM_BUCKETS) =>
+  r.bucketSpec.get
+  }
+} else {
+  None
+}
+  }
+
+  private def mayCoalesce(numBuckets1: Int, numBuckets2: Int): Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if ((large % small == 0) &&
+  (large - small) <= 
sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def addBucketHint(plan: LogicalPlan, hint: (String, String)): 
LogicalPlan = {
+plan.transformUp {
+  case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) =>
+l.copy(relation = r.copy(options = r.options + hint)(r.sparkSession))

Review comment:
   Thanks, that should work!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-14 Thread GitBox
imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] 
Coalesce bucketed tables for join if applicable
URL: https://github.com/apache/spark/pull/28123#discussion_r408368607
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule injects a hint if one side of two bucketed tables can be coalesced
+ * when the two bucketed tables are inner-joined and they differ in the number 
of buckets.
+ */
+object CoalesceBucketInJoin extends Rule[LogicalPlan]  {
+  val JOIN_HINT_COALESCED_NUM_BUCKETS: String = "JoinHintCoalescedNumBuckets"
+
+  private val sqlConf = SQLConf.get
+
+  private def isPlanEligible(plan: LogicalPlan): Boolean = {
+def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = {
+  p(plan) && plan.children.forall(forall(_)(p))
+}
+
+forall(plan) {
+  case _: Filter | _: Project | _: LogicalRelation => true
+  case _ => false
+}
+  }
+
+  private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = {
+if (isPlanEligible(plan)) {
+  plan.collectFirst {
+case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _)
+  if r.bucketSpec.nonEmpty && 
!r.options.contains(JOIN_HINT_COALESCED_NUM_BUCKETS) =>
+  r.bucketSpec.get
+  }
+} else {
+  None
+}
+  }
+
+  private def mayCoalesce(numBuckets1: Int, numBuckets2: Int): Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if ((large % small == 0) &&
+  (large - small) <= 
sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def addBucketHint(plan: LogicalPlan, hint: (String, String)): 
LogicalPlan = {
+plan.transformUp {
+  case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) =>
+l.copy(relation = r.copy(options = r.options + hint)(r.sparkSession))
 
 Review comment:
   There are the options I can think of:
   1. Handle if `JOIN_HINT_COALESCED_NUM_BUCKETS` is incorrectly set.
   2. Update `HadoopFsRelation` to have an extra field.
   3. Update `LogicalRelation` to have a `hint` similar to `Join`.
   What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-13 Thread GitBox
imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] 
Coalesce bucketed tables for join if applicable
URL: https://github.com/apache/spark/pull/28123#discussion_r407866696
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule injects a hint if one side of two bucketed tables can be coalesced
+ * when the two bucketed tables are inner-joined and they differ in the number 
of buckets.
+ */
+object CoalesceBucketInJoin extends Rule[LogicalPlan]  {
+  val JOIN_HINT_COALESCED_NUM_BUCKETS: String = "JoinHintCoalescedNumBuckets"
+
+  private val sqlConf = SQLConf.get
+
+  private def isPlanEligible(plan: LogicalPlan): Boolean = {
+def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = {
+  p(plan) && plan.children.forall(forall(_)(p))
+}
+
+forall(plan) {
+  case _: Filter | _: Project | _: LogicalRelation => true
+  case _ => false
+}
+  }
+
+  private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = {
+if (isPlanEligible(plan)) {
+  plan.collectFirst {
+case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _)
+  if r.bucketSpec.nonEmpty && 
!r.options.contains(JOIN_HINT_COALESCED_NUM_BUCKETS) =>
+  r.bucketSpec.get
+  }
+} else {
+  None
+}
+  }
+
+  private def mayCoalesce(numBuckets1: Int, numBuckets2: Int): Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if ((large % small == 0) &&
+  (large - small) <= 
sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def addBucketHint(plan: LogicalPlan, hint: (String, String)): 
LogicalPlan = {
+plan.transformUp {
+  case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) =>
+l.copy(relation = r.copy(options = r.options + hint)(r.sparkSession))
 
 Review comment:
   > You cannot extract a join plan pattern, e.g., 
`Join(LogicalRelation(HadoopFsRelation), LogicalRelation(HadoopFsRelation))`, 
just like `ScanOperation`?
   
   If I match a `Join` in `FileSourceStrategy`, I need to return `SparkPlan` 
for `Join`, no? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-13 Thread GitBox
imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] 
Coalesce bucketed tables for join if applicable
URL: https://github.com/apache/spark/pull/28123#discussion_r407864011
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule injects a hint if one side of two bucketed tables can be coalesced
+ * when the two bucketed tables are inner-joined and they differ in the number 
of buckets.
+ */
+object CoalesceBucketInJoin extends Rule[LogicalPlan]  {
+  val JOIN_HINT_COALESCED_NUM_BUCKETS: String = "JoinHintCoalescedNumBuckets"
+
+  private val sqlConf = SQLConf.get
+
+  private def isPlanEligible(plan: LogicalPlan): Boolean = {
+def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = {
+  p(plan) && plan.children.forall(forall(_)(p))
+}
+
+forall(plan) {
+  case _: Filter | _: Project | _: LogicalRelation => true
+  case _ => false
+}
+  }
+
+  private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = {
+if (isPlanEligible(plan)) {
+  plan.collectFirst {
+case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _)
+  if r.bucketSpec.nonEmpty && 
!r.options.contains(JOIN_HINT_COALESCED_NUM_BUCKETS) =>
+  r.bucketSpec.get
+  }
+} else {
+  None
+}
+  }
+
+  private def mayCoalesce(numBuckets1: Int, numBuckets2: Int): Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if ((large % small == 0) &&
+  (large - small) <= 
sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def addBucketHint(plan: LogicalPlan, hint: (String, String)): 
LogicalPlan = {
+plan.transformUp {
+  case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) =>
+l.copy(relation = r.copy(options = r.options + hint)(r.sparkSession))
+}
+  }
+
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (!sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_ENABLED)) {
+  return plan
+}
+
+plan transform {
+  case join: Join if join.joinType == Inner =>
 
 Review comment:
   Thanks for pointing this out. Actually, this should apply for any join 
types; we don't check the join types when buckets are used. This optimization 
would help in equi-join cases but I don't think this will negatively impact the 
non equi-join cases. (please correct me if I am wrong here)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-13 Thread GitBox
imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] 
Coalesce bucketed tables for join if applicable
URL: https://github.com/apache/spark/pull/28123#discussion_r407836362
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/CoalesceBucketInJoin.scala
 ##
 @@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.bucketing
+
+import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.plans.Inner
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+
+/**
+ * This rule injects a hint if one side of two bucketed tables can be coalesced
+ * when the two bucketed tables are inner-joined and they differ in the number 
of buckets.
+ */
+object CoalesceBucketInJoin extends Rule[LogicalPlan]  {
+  val JOIN_HINT_COALESCED_NUM_BUCKETS: String = "JoinHintCoalescedNumBuckets"
+
+  private val sqlConf = SQLConf.get
+
+  private def isPlanEligible(plan: LogicalPlan): Boolean = {
+def forall(plan: LogicalPlan)(p: LogicalPlan => Boolean): Boolean = {
+  p(plan) && plan.children.forall(forall(_)(p))
+}
+
+forall(plan) {
+  case _: Filter | _: Project | _: LogicalRelation => true
+  case _ => false
+}
+  }
+
+  private def getBucketSpec(plan: LogicalPlan): Option[BucketSpec] = {
+if (isPlanEligible(plan)) {
+  plan.collectFirst {
+case _ @ LogicalRelation(r: HadoopFsRelation, _, _, _)
+  if r.bucketSpec.nonEmpty && 
!r.options.contains(JOIN_HINT_COALESCED_NUM_BUCKETS) =>
+  r.bucketSpec.get
+  }
+} else {
+  None
+}
+  }
+
+  private def mayCoalesce(numBuckets1: Int, numBuckets2: Int): Option[Int] = {
+assert(numBuckets1 != numBuckets2)
+val (small, large) = (math.min(numBuckets1, numBuckets2), 
math.max(numBuckets1, numBuckets2))
+// A bucket can be coalesced only if the bigger number of buckets is 
divisible by the smaller
+// number of buckets because bucket id is calculated by modding the total 
number of buckets.
+if ((large % small == 0) &&
+  (large - small) <= 
sqlConf.getConf(SQLConf.COALESCE_BUCKET_IN_JOIN_MAX_NUM_BUCKETS_DIFF)) {
+  Some(small)
+} else {
+  None
+}
+  }
+
+  private def addBucketHint(plan: LogicalPlan, hint: (String, String)): 
LogicalPlan = {
+plan.transformUp {
+  case l @ LogicalRelation(r: HadoopFsRelation, _, _, _) =>
+l.copy(relation = r.copy(options = r.options + hint)(r.sparkSession))
 
 Review comment:
   Ah, good point. But I am not sure if I can do something similar in a 
strategy; for example, if I see a `Join` in a strategy, I need to act on it, 
and if I see a scan node, I cannot traverse up the `TreeNode`. Modifying 
`BucketSpec` is not desirable either, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-09 Thread GitBox
imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] 
Coalesce bucketed tables for join if applicable
URL: https://github.com/apache/spark/pull/28123#discussion_r406579248
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 ##
 @@ -309,7 +328,8 @@ case class FileSourceScanExec(
   files.map(_.getPath.getName).groupBy(file => 
BucketingUtils.getBucketId(file))
 val singleFilePartitions = bucketToFilesGrouping.forall(p => 
p._2.length <= 1)
 
-if (singleFilePartitions) {
+// TODO Sort order is currently ignored if buckets are coalesced.
 
 Review comment:
   Yes. This will eliminate the sort when there are multiple files per bucket 
(and if each file is sorted).
   
   And it seems that the work will be orthogonal to this PR anyway, so I will 
give it a shot and create a PR if I see some improvements in benchmark.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] Coalesce bucketed tables for join if applicable

2020-04-04 Thread GitBox
imback82 commented on a change in pull request #28123: [SPARK-31350][SQL] 
Coalesce bucketed tables for join if applicable
URL: https://github.com/apache/spark/pull/28123#discussion_r403584484
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 ##
 @@ -309,7 +328,8 @@ case class FileSourceScanExec(
   files.map(_.getPath.getName).groupBy(file => 
BucketingUtils.getBucketId(file))
 val singleFilePartitions = bucketToFilesGrouping.forall(p => 
p._2.length <= 1)
 
-if (singleFilePartitions) {
+// TODO Sort order is currently ignored if buckets are coalesced.
 
 Review comment:
   I can work on preserving the sort order as a separate PR if this PR makes 
sense.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org