[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-08-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16985


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-08-11 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r132715845
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---
@@ -543,6 +551,68 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils {
 )
   }
 
+  test("SPARK-19122 Re-order join predicates if they match with the 
child's output partitioning") {
+val bucketedTableTestSpec = BucketedTableTestSpec(
+  Some(BucketSpec(8, Seq("i", "j", "k"), Seq("i", "j", "k"))),
+  numPartitions = 1,
+  expectedShuffle = false,
+  expectedSort = false)
+
+def testBucketingWithPredicate(
+joinCondition: (DataFrame, DataFrame) => Column,
+expectedResult: Option[Array[Row]]): Array[Row] = {
+  testBucketing(
+bucketedTableTestSpecLeft = bucketedTableTestSpec,
+bucketedTableTestSpecRight = bucketedTableTestSpec,
+joinCondition = joinCondition,
+expectedResult = expectedResult
+  )
+}
+
+// Irrespective of the ordering of keys in the join predicate, the 
query plan and
+// query results should always be the same
--- End diff --

removed validation of query result


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-08-10 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r132624167
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---
@@ -543,6 +551,68 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils {
 )
   }
 
+  test("SPARK-19122 Re-order join predicates if they match with the 
child's output partitioning") {
+val bucketedTableTestSpec = BucketedTableTestSpec(
+  Some(BucketSpec(8, Seq("i", "j", "k"), Seq("i", "j", "k"))),
+  numPartitions = 1,
+  expectedShuffle = false,
+  expectedSort = false)
+
+def testBucketingWithPredicate(
+joinCondition: (DataFrame, DataFrame) => Column,
+expectedResult: Option[Array[Row]]): Array[Row] = {
+  testBucketing(
+bucketedTableTestSpecLeft = bucketedTableTestSpec,
+bucketedTableTestSpecRight = bucketedTableTestSpec,
+joinCondition = joinCondition,
+expectedResult = expectedResult
+  )
+}
+
+// Irrespective of the ordering of keys in the join predicate, the 
query plan and
+// query results should always be the same
--- End diff --

I think we don't need to test this here, this is an existing property of 
join implementation in Spark SQL, and should have already been well tested. 
Then we don't need to change `testBucketing`




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-08-10 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r132620278
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ReorderJoinPredicates.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
+
+/**
+ * When the physical operators are created for JOIN, the ordering of join 
keys is based on order
+ * in which the join keys appear in the user query. That might not match 
with the output
+ * partitioning of the join node's children (thus leading to extra sort / 
shuffle being
+ * introduced). This rule will change the ordering of the join keys to 
match with the
+ * partitioning of the join nodes' children.
+ */
+class ReorderJoinPredicates extends Rule[SparkPlan] {
+  private def reorderJoinKeys(
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  leftPartitioning: Partitioning,
+  rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) 
= {
+
+def reorder(expectedOrderOfKeys: Seq[Expression],
+currentOrderOfKeys: Seq[Expression]): (Seq[Expression], 
Seq[Expression]) = {
+  val leftKeysBuffer = ArrayBuffer[Expression]()
+  val rightKeysBuffer = ArrayBuffer[Expression]()
+
+  expectedOrderOfKeys.foreach(expression => {
+val index = currentOrderOfKeys.indexWhere(e => 
e.semanticEquals(expression))
+leftKeysBuffer.append(leftKeys(index))
+rightKeysBuffer.append(rightKeys(index))
+  })
+  (leftKeysBuffer, rightKeysBuffer)
+}
+
+if (leftKeys.forall(_.deterministic) && 
rightKeys.forall(_.deterministic)) {
+  leftPartitioning match {
+case HashPartitioning(leftExpressions, _)
+  if leftExpressions.length == leftKeys.length &&
--- End diff --

The contract for reordering is that the set of join keys must be equal to 
the set of child's partitioning columns (implemented at L58-L59 in this file). 
Thus there won't be reordering for the case you pointed out. I have added a 
test case of the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-05-12 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r116345617
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ReorderJoinPredicates.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
+
+/**
+ * When the physical operators are created for JOIN, the ordering of join 
keys is based on order
+ * in which the join keys appear in the user query. That might not match 
with the output
+ * partitioning of the join node's children (thus leading to extra sort / 
shuffle being
+ * introduced). This rule will change the ordering of the join keys to 
match with the
+ * partitioning of the join nodes' children.
+ */
+class ReorderJoinPredicates extends Rule[SparkPlan] {
+  private def reorderJoinKeys(
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  leftPartitioning: Partitioning,
+  rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) 
= {
+
+def reorder(expectedOrderOfKeys: Seq[Expression],
+currentOrderOfKeys: Seq[Expression]): (Seq[Expression], 
Seq[Expression]) = {
+  val leftKeysBuffer = ArrayBuffer[Expression]()
+  val rightKeysBuffer = ArrayBuffer[Expression]()
+
+  expectedOrderOfKeys.foreach(expression => {
+val index = currentOrderOfKeys.indexWhere(e => 
e.semanticEquals(expression))
+leftKeysBuffer.append(leftKeys(index))
+rightKeysBuffer.append(rightKeys(index))
+  })
+  (leftKeysBuffer, rightKeysBuffer)
+}
+
+if (leftKeys.forall(_.deterministic) && 
rightKeys.forall(_.deterministic)) {
+  leftPartitioning match {
+case HashPartitioning(leftExpressions, _)
+  if leftExpressions.length == leftKeys.length &&
--- End diff --

`EnsureRequirements` would still add a shuffle in either case even if we 
reorder.

JOIN would expect data to be distributed over `b, a, c, d` (or `a,b,c,d` if 
you reorder) which maps to HashPartitioning(`a,b,c,d`) : 
https://github.com/apache/spark/blob/e9c91badce64731ffd3e53cbcd9f044a7593e6b8/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala#L185

But the child nodes won't have matching partitioning ie. they will have 
HashPartitioning(`a,b`).




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-05-12 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r116176419
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ReorderJoinPredicates.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
+
+/**
+ * When the physical operators are created for JOIN, the ordering of join 
keys is based on order
+ * in which the join keys appear in the user query. That might not match 
with the output
+ * partitioning of the join node's children (thus leading to extra sort / 
shuffle being
+ * introduced). This rule will change the ordering of the join keys to 
match with the
+ * partitioning of the join nodes' children.
+ */
+class ReorderJoinPredicates extends Rule[SparkPlan] {
+  private def reorderJoinKeys(
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  leftPartitioning: Partitioning,
+  rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) 
= {
+
+def reorder(expectedOrderOfKeys: Seq[Expression],
+currentOrderOfKeys: Seq[Expression]): (Seq[Expression], 
Seq[Expression]) = {
+  val leftKeysBuffer = ArrayBuffer[Expression]()
+  val rightKeysBuffer = ArrayBuffer[Expression]()
+
+  expectedOrderOfKeys.foreach(expression => {
+val index = currentOrderOfKeys.indexWhere(e => 
e.semanticEquals(expression))
+leftKeysBuffer.append(leftKeys(index))
+rightKeysBuffer.append(rightKeys(index))
+  })
+  (leftKeysBuffer, rightKeysBuffer)
+}
+
+if (leftKeys.forall(_.deterministic) && 
rightKeys.forall(_.deterministic)) {
+  leftPartitioning match {
+case HashPartitioning(leftExpressions, _)
+  if leftExpressions.length == leftKeys.length &&
--- End diff --

oh sorry I made a mistake.

if the child partitioning is `a, b` and the join key is `b, a, c, d`, does 
it make sense to reorder it as `a, b ,c ,d`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-05-12 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r116163888
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ReorderJoinPredicates.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
+
+/**
+ * When the physical operators are created for JOIN, the ordering of join 
keys is based on order
+ * in which the join keys appear in the user query. That might not match 
with the output
+ * partitioning of the join node's children (thus leading to extra sort / 
shuffle being
+ * introduced). This rule will change the ordering of the join keys to 
match with the
+ * partitioning of the join nodes' children.
+ */
+class ReorderJoinPredicates extends Rule[SparkPlan] {
+  private def reorderJoinKeys(
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  leftPartitioning: Partitioning,
+  rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) 
= {
+
+def reorder(expectedOrderOfKeys: Seq[Expression],
--- End diff --

fixed this. 

PS: I always manually fix this. If there is some predefined checkstyle 
which I could import in Intelij, that will save this trouble for me and 
reviewers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-05-12 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r116163611
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ReorderJoinPredicates.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
+
+/**
+ * When the physical operators are created for JOIN, the ordering of join 
keys is based on order
+ * in which the join keys appear in the user query. That might not match 
with the output
+ * partitioning of the join node's children (thus leading to extra sort / 
shuffle being
+ * introduced). This rule will change the ordering of the join keys to 
match with the
+ * partitioning of the join nodes' children.
+ */
+class ReorderJoinPredicates extends Rule[SparkPlan] {
+  private def reorderJoinKeys(
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  leftPartitioning: Partitioning,
+  rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) 
= {
+
+def reorder(expectedOrderOfKeys: Seq[Expression],
+currentOrderOfKeys: Seq[Expression]): (Seq[Expression], 
Seq[Expression]) = {
+  val leftKeysBuffer = ArrayBuffer[Expression]()
+  val rightKeysBuffer = ArrayBuffer[Expression]()
+
+  expectedOrderOfKeys.foreach(expression => {
+val index = currentOrderOfKeys.indexWhere(e => 
e.semanticEquals(expression))
+leftKeysBuffer.append(leftKeys(index))
+rightKeysBuffer.append(rightKeys(index))
+  })
+  (leftKeysBuffer, rightKeysBuffer)
+}
+
+if (leftKeys.forall(_.deterministic) && 
rightKeys.forall(_.deterministic)) {
+  leftPartitioning match {
+case HashPartitioning(leftExpressions, _)
+  if leftExpressions.length == leftKeys.length &&
--- End diff --

I don't think that would be right thing to do. If child is partitioned on 
`a, b, c, d`, its basically means rows are distributed over hash of `a, b, c, 
d`. Lets say we have two rows with values of `a, b, c, d` as:
- row1 : 1,1,1,1 ==> hash(1,1,1,1) = x
- row2 : 1,1,2,2 ==> hash(1,1,2,2) = y

If the join key `b,a` is reordered as `a,b` and we want to avid shuffle, 
that would mean that we expect the child to have same values of `a,b` in the 
same partition. But if you look at row1 and row2 above, even if values of `a` 
and `b` are the same, there is no guarantee that they would belong to the same 
partition... as the partition is based on hash of all `a,b,c,d`.

If the join keys are a subset of the partitioning, then there needs to be a 
shuffle to be done. There is only one exception to this (more of a corner case) 
: https://issues.apache.org/jira/browse/SPARK-18067


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-05-11 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r116162386
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---
@@ -315,8 +317,14 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils {
 assert(
   joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == 
sortRight,
   s"expected sort in the right child to be $sortRight but 
found\n${joinOperator.right}")
+
+if (expectedResult.isDefined) {
+  checkAnswer(joined, expectedResult.get)
--- End diff --

This will let us validate against some predefined expected output. 
L296 will recompute the result with the exact same `joinCondition`... but 
for my test case I don't want to reuse the same `joinCondition`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-05-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r116158389
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ReorderJoinPredicates.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
+
+/**
+ * When the physical operators are created for JOIN, the ordering of join 
keys is based on order
+ * in which the join keys appear in the user query. That might not match 
with the output
+ * partitioning of the join node's children (thus leading to extra sort / 
shuffle being
+ * introduced). This rule will change the ordering of the join keys to 
match with the
+ * partitioning of the join nodes' children.
+ */
+class ReorderJoinPredicates extends Rule[SparkPlan] {
+  private def reorderJoinKeys(
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  leftPartitioning: Partitioning,
+  rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) 
= {
+
+def reorder(expectedOrderOfKeys: Seq[Expression],
+currentOrderOfKeys: Seq[Expression]): (Seq[Expression], 
Seq[Expression]) = {
+  val leftKeysBuffer = ArrayBuffer[Expression]()
+  val rightKeysBuffer = ArrayBuffer[Expression]()
+
+  expectedOrderOfKeys.foreach(expression => {
+val index = currentOrderOfKeys.indexWhere(e => 
e.semanticEquals(expression))
+leftKeysBuffer.append(leftKeys(index))
+rightKeysBuffer.append(rightKeys(index))
+  })
+  (leftKeysBuffer, rightKeysBuffer)
+}
+
+if (leftKeys.forall(_.deterministic) && 
rightKeys.forall(_.deterministic)) {
+  leftPartitioning match {
+case HashPartitioning(leftExpressions, _)
+  if leftExpressions.length == leftKeys.length &&
--- End diff --

why do we need the same length? Let's say the child partitioning is `a, b, 
c, d` and the join key is `b, a`, we can reorder the join key to avoid shuffle, 
right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-05-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r116158180
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ReorderJoinPredicates.scala
 ---
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, 
Partitioning}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.SparkPlan
+
+/**
+ * When the physical operators are created for JOIN, the ordering of join 
keys is based on order
+ * in which the join keys appear in the user query. That might not match 
with the output
+ * partitioning of the join node's children (thus leading to extra sort / 
shuffle being
+ * introduced). This rule will change the ordering of the join keys to 
match with the
+ * partitioning of the join nodes' children.
+ */
+class ReorderJoinPredicates extends Rule[SparkPlan] {
+  private def reorderJoinKeys(
+  leftKeys: Seq[Expression],
+  rightKeys: Seq[Expression],
+  leftPartitioning: Partitioning,
+  rightPartitioning: Partitioning): (Seq[Expression], Seq[Expression]) 
= {
+
+def reorder(expectedOrderOfKeys: Seq[Expression],
--- End diff --

nit:
```
def reorder(
expectedOrderOfKeys: Seq[Expression],
currentOrderOfKeys: Seq[Expression]): (Seq[Expression], Seq[Expression])
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-05-11 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r116158076
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---
@@ -315,8 +317,14 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils {
 assert(
   joinOperator.right.find(_.isInstanceOf[SortExec]).isDefined == 
sortRight,
   s"expected sort in the right child to be $sortRight but 
found\n${joinOperator.right}")
+
+if (expectedResult.isDefined) {
+  checkAnswer(joined, expectedResult.get)
--- End diff --

In L296 we already have an `assert` to check the result, do we really need 
this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-05-08 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r115356786
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -41,6 +41,42 @@ case class SortMergeJoinExec(
 left: SparkPlan,
 right: SparkPlan) extends BinaryExecNode with CodegenSupport {
 
+  lazy val (reorderedLeftKeys, reorderedRightKeys) = {
--- End diff --

I looked at #17339 and its doing something orthogonal to whats done here.

#17339 is ensuring that the join outputs' sort ordering has attributes from 
both relations. 
This PR is ensuring that the order of join kets (in both distribution and 
sort order) is not blindly picked from the order of occurrence in the query 
string.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-04-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r109434210
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -48,6 +49,10 @@ abstract class Exchange extends UnaryExecNode {
 case class ReusedExchangeExec(override val output: Seq[Attribute], child: 
Exchange)
   extends LeafExecNode {
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
--- End diff --

good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-04-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r109434138
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala ---
@@ -253,12 +253,15 @@ abstract class BucketedReadSuite extends QueryTest 
with SQLTestUtils {
   bucketedTableTestSpecLeft: BucketedTableTestSpec,
   bucketedTableTestSpecRight: BucketedTableTestSpec,
   joinType: String = "inner",
-  joinCondition: (DataFrame, DataFrame) => Column): Unit = {
+  joinCondition: (DataFrame, DataFrame) => Column,
+  returnJoinResult: Boolean = false,
+  expectedResult: Option[Array[Row]] = None): Option[Array[Row]] = {
--- End diff --

Since this is test, I think it's ok to always return the result


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-04-03 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r109433666
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -41,6 +41,42 @@ case class SortMergeJoinExec(
 left: SparkPlan,
 right: SparkPlan) extends BinaryExecNode with CodegenSupport {
 
+  lazy val (reorderedLeftKeys, reorderedRightKeys) = {
--- End diff --

is it fixed by https://github.com/apache/spark/pull/17339 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-03-14 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r106055916
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/Exchange.scala 
---
@@ -48,6 +49,10 @@ abstract class Exchange extends UnaryExecNode {
 case class ReusedExchangeExec(override val output: Seq[Attribute], child: 
Exchange)
   extends LeafExecNode {
 
+  override def outputPartitioning: Partitioning = child.outputPartitioning
--- End diff --

@hvanhovell : continuing our discussion at 
https://github.com/apache/spark/pull/16985#discussion_r101905768 : 

I found that `ReusedExchangeExec` does not use child's partitioning which 
was causing the tests to fail. This version of the PR is with your suggestion + 
works in all cases (all unit tests are passing).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-02-24 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r102992758
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -33,8 +33,8 @@ import org.apache.spark.util.collection.BitSet
  * Performs a sort merge join of two child relations.
  */
 case class SortMergeJoinExec(
-leftKeys: Seq[Expression],
-rightKeys: Seq[Expression],
+var leftKeys: Seq[Expression],
--- End diff --

@tejasapatil I see your point. Let me think of another way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-02-24 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r102974682
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -33,8 +33,8 @@ import org.apache.spark.util.collection.BitSet
  * Performs a sort merge join of two child relations.
  */
 case class SortMergeJoinExec(
-leftKeys: Seq[Expression],
-rightKeys: Seq[Expression],
+var leftKeys: Seq[Expression],
--- End diff --

@hvanhovell : ping !!! As expected, that doesn't work in all cases and few 
unit test are failing. I would go back to my original version if you have don't 
any other idea(s).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-02-21 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r102245647
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -33,8 +33,8 @@ import org.apache.spark.util.collection.BitSet
  * Performs a sort merge join of two child relations.
  */
 case class SortMergeJoinExec(
-leftKeys: Seq[Expression],
-rightKeys: Seq[Expression],
+var leftKeys: Seq[Expression],
--- End diff --

@hvanhovell : I had tried that but for some class of queries that didn't 
work. When I try to get the `outputPartitioning` for a SMB node, in [case of 
inner-join it 
is](https://github.com/apache/spark/blob/02f203107b8eda1f1576e36c4f12b0e3bc5e910e/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala#L68)
 `PartioniningCollection`. Now one of its children can have a `ReusedExchange` 
which is yet to be resolved but if the other child is resolved, then [this 
check](https://github.com/apache/spark/blob/39e2bad6a866d27c3ca594d15e574a1da3ee84cc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala#L325)
 fails.

Example query:
```
SELECT a.i, b.i, c.i
FROM mytable a, mytable b, mytable c
where a.i = b.i and a.i = c.i
```

Example query plan:
```
:- *SortMergeJoin [i#8], [i#9], Inner
:  :- *Sort [i#8 ASC NULLS FIRST], false, 0
:  :  +- Exchange hashpartitioning(i#8, 200)
:  : +- *Project [i#8]
:  :+- *Filter isnotnull(i#8)
:  :   +- *FileScan orc default.one_column[i#8] Batched: false, 
Format: ORC, Location: 
InMemoryFileIndex[file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/one_column],
 PartitionFilters: [], PushedFilters: [IsNotNull(i)], ReadSchema: struct
:  +- *Sort [i#9 ASC NULLS FIRST], false, 0
: +- ReusedExchange [i#9], Exchange hashpartitioning(i#8, 200)
```
Just to be on the same page, sharing what I had tried (will update the PR 
with the change anyways. I know that there would be some unit tests which would 
fail):
```
case class SortMergeJoinExec(
leftKeys: Seq[Expression],
rightKeys: Seq[Expression],
) {

  lazy val (reorderedLeftKeys, reorderedRightKeys) = {
def reorder(
expectedOrderOfKeys: Seq[Expression],
currentOrderOfKeys: Seq[Expression]): (Seq[Expression], 
Seq[Expression]) = {

  val leftKeysBuffer = ArrayBuffer[Expression]()
  val rightKeysBuffer = ArrayBuffer[Expression]()

  expectedOrderOfKeys.foreach(expression => {
val index = currentOrderOfKeys.indexWhere(e => 
e.semanticEquals(expression))

leftKeysBuffer.append(leftKeys(index))
rightKeysBuffer.append(rightKeys(index))
  })

  (leftKeysBuffer, rightKeysBuffer)
}

left.outputPartitioning match {
  case HashPartitioning(leftExpressions, _)
if leftExpressions.length == leftKeys.length &&
  leftKeys.forall(x => leftExpressions.exists(_.semanticEquals(x))) 
=>
reorder(leftExpressions, leftKeys)

  case _ => right.outputPartitioning match {
case HashPartitioning(rightExpressions, _)
  if rightExpressions.length == rightKeys.length &&
rightKeys.forall(x => 
rightExpressions.exists(_.semanticEquals(x))) =>

  reorder(rightExpressions, rightKeys)

case _ => (leftKeys, rightKeys)
  }
}
  }
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-02-20 Thread hvanhovell
Github user hvanhovell commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r101973786
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -33,8 +33,8 @@ import org.apache.spark.util.collection.BitSet
  * Performs a sort merge join of two child relations.
  */
 case class SortMergeJoinExec(
-leftKeys: Seq[Expression],
-rightKeys: Seq[Expression],
+var leftKeys: Seq[Expression],
--- End diff --

What information are you missing? The SortMergeExec is replaced after each 
planning iteration.

I would prefer that we use a `lazy val` here instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16985: [SPARK-19122][SQL] Unnecessary shuffle+sort added...

2017-02-18 Thread tejasapatil
Github user tejasapatil commented on a diff in the pull request:

https://github.com/apache/spark/pull/16985#discussion_r101905768
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala
 ---
@@ -33,8 +33,8 @@ import org.apache.spark.util.collection.BitSet
  * Performs a sort merge join of two child relations.
  */
 case class SortMergeJoinExec(
-leftKeys: Seq[Expression],
-rightKeys: Seq[Expression],
+var leftKeys: Seq[Expression],
--- End diff --

This seems ugly but I can't think of a better way. The problem is: I want 
to mutate this ordering at some point in the query planning. I cannot do that 
when `SortMergeJoinExec` object is generated because there wont be ample 
information available at that time.

I tried to add class attributes which would be altered and don't mutate 
this. Doing that, I saw that that tasks on executor do not see the updated 
values of the local class attributes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org