[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user kai-zeng commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r36235723 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, ClusteredDistribution} +import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} + +import scala.collection.JavaConversions._ --- End diff -- Yep. I agree with your first concern. It is really necessary to refactor the outer join as now we have both shuffled and broadcast outer join. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user kai-zeng commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r36235248 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, ClusteredDistribution} +import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} + +import scala.collection.JavaConversions._ + +/** + * :: DeveloperApi :: + * Performs a hash based outer join for two child relations by shuffling the data using + * the join keys. This operator requires loading the associated partition in both side into memory. + */ +@DeveloperApi +case class ShuffledHashOuterJoin( +leftKeys: Seq[Expression], +rightKeys: Seq[Expression], +joinType: JoinType, +condition: Option[Expression], +left: SparkPlan, +right: SparkPlan) extends BinaryNode with HashOuterJoin { + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + + protected override def doExecute(): RDD[InternalRow] = { +val joinedRow = new JoinedRow() +left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => + // TODO this probably can be replaced by external sort (sort merged join?) + joinType match { +case LeftOuter => --- End diff -- Here joinType matching is performed once per partition, not once per row. So I don't think it is a big issue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r36234861 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, ClusteredDistribution} +import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} + +import scala.collection.JavaConversions._ + +/** + * :: DeveloperApi :: + * Performs a hash based outer join for two child relations by shuffling the data using + * the join keys. This operator requires loading the associated partition in both side into memory. + */ +@DeveloperApi +case class ShuffledHashOuterJoin( +leftKeys: Seq[Expression], +rightKeys: Seq[Expression], +joinType: JoinType, +condition: Option[Expression], +left: SparkPlan, +right: SparkPlan) extends BinaryNode with HashOuterJoin { + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + + protected override def doExecute(): RDD[InternalRow] = { +val joinedRow = new JoinedRow() +left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => + // TODO this probably can be replaced by external sort (sort merged join?) + joinType match { +case LeftOuter => + val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output)) + val keyGenerator = newProjection(leftKeys, left.output) + leftIter.flatMap( currentRow => { +val rowKey = keyGenerator(currentRow) +joinedRow.withLeft(currentRow) +leftOuterIterator(rowKey, joinedRow, rightHashTable.getOrElse(rowKey, EMPTY_LIST)) + }) + +case RightOuter => + val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, left.output)) + val keyGenerator = newProjection(rightKeys, right.output) + rightIter.flatMap ( currentRow => { +val rowKey = keyGenerator(currentRow) +joinedRow.withRight(currentRow) +rightOuterIterator(rowKey, leftHashTable.getOrElse(rowKey, EMPTY_LIST), joinedRow) --- End diff -- It looks like Davies ended up cleaning up these instances of the MapWrapper issue as part of one of his Tungsten PRs and I'll fix the remaining FullOuter case as part of my PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r36234637 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, ClusteredDistribution} +import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} + +import scala.collection.JavaConversions._ + +/** + * :: DeveloperApi :: + * Performs a hash based outer join for two child relations by shuffling the data using + * the join keys. This operator requires loading the associated partition in both side into memory. + */ +@DeveloperApi +case class ShuffledHashOuterJoin( +leftKeys: Seq[Expression], +rightKeys: Seq[Expression], +joinType: JoinType, +condition: Option[Expression], +left: SparkPlan, +right: SparkPlan) extends BinaryNode with HashOuterJoin { + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + + protected override def doExecute(): RDD[InternalRow] = { +val joinedRow = new JoinedRow() +left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => + // TODO this probably can be replaced by external sort (sort merged join?) + joinType match { +case LeftOuter => --- End diff -- Second possible perf. concern: I think that the `joinType` check should be merged out of the `zipPartitions` call so that we only perform it once, not once per row. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r36234539 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, ClusteredDistribution} +import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} + +import scala.collection.JavaConversions._ + +/** + * :: DeveloperApi :: + * Performs a hash based outer join for two child relations by shuffling the data using + * the join keys. This operator requires loading the associated partition in both side into memory. + */ +@DeveloperApi +case class ShuffledHashOuterJoin( +leftKeys: Seq[Expression], +rightKeys: Seq[Expression], +joinType: JoinType, +condition: Option[Expression], +left: SparkPlan, +right: SparkPlan) extends BinaryNode with HashOuterJoin { + + override def requiredChildDistribution: Seq[Distribution] = +ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil + + protected override def doExecute(): RDD[InternalRow] = { +val joinedRow = new JoinedRow() +left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) => + // TODO this probably can be replaced by external sort (sort merged join?) + joinType match { +case LeftOuter => + val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output)) + val keyGenerator = newProjection(leftKeys, left.output) + leftIter.flatMap( currentRow => { +val rowKey = keyGenerator(currentRow) +joinedRow.withLeft(currentRow) +leftOuterIterator(rowKey, joinedRow, rightHashTable.getOrElse(rowKey, EMPTY_LIST)) + }) + +case RightOuter => + val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, left.output)) + val keyGenerator = newProjection(rightKeys, right.output) + rightIter.flatMap ( currentRow => { +val rowKey = keyGenerator(currentRow) +joinedRow.withRight(currentRow) +rightOuterIterator(rowKey, leftHashTable.getOrElse(rowKey, EMPTY_LIST), joinedRow) + }) + +case FullOuter => + val leftHashTable = buildHashTable(leftIter, newProjection(leftKeys, left.output)) + val rightHashTable = buildHashTable(rightIter, newProjection(rightKeys, right.output)) + (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key => +fullOuterIterator(key, + leftHashTable.getOrElse(key, EMPTY_LIST), --- End diff -- Here, doing an explicit Java-style null check on the Java HashMaps would avoid the cost of allocating a MapWrapper for each row. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r36234484 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, ClusteredDistribution} +import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} + +import scala.collection.JavaConversions._ --- End diff -- @kai-zeng I'm in the process of refactoring some parts of this file as part of another PR of mine and wanted to briefly call out two minor performance considerations that I noticed while looking at this code. The first is the use of `JavaConversions` here, which allows for implicit conversion between Java and Scala classes. This leads to the creation of an unnecessary per-row MapWrapper in FullOuter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r34002025 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala --- @@ -92,27 +153,25 @@ object SparkPlanTest { * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s. */ def checkAnswer( - input: DataFrame, - planFunction: SparkPlan => SparkPlan, + input: Seq[DataFrame], + planFunction: Seq[SparkPlan] => SparkPlan, expectedAnswer: Seq[Row]): Option[String] = { -val outputPlan = planFunction(input.queryExecution.sparkPlan) +val outputPlan = planFunction(input.map(_.queryExecution.sparkPlan)) // A very simple resolver to make writing tests easier. In contrast to the real resolver // this is always case sensitive and does not try to handle scoping or complex type resolution. -val resolvedPlan = outputPlan transform { - case plan: SparkPlan => -val inputMap = plan.children.flatMap(_.output).zipWithIndex.map { - case (a, i) => -(a.name, BoundReference(i, a.dataType, a.nullable)) --- End diff -- @kai-zeng, I ended up discussing offline with Michael and reached the same conclusion. Thanks for your help! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user kai-zeng commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r34001466 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala --- @@ -92,27 +153,25 @@ object SparkPlanTest { * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s. */ def checkAnswer( - input: DataFrame, - planFunction: SparkPlan => SparkPlan, + input: Seq[DataFrame], + planFunction: Seq[SparkPlan] => SparkPlan, expectedAnswer: Seq[Row]): Option[String] = { -val outputPlan = planFunction(input.queryExecution.sparkPlan) +val outputPlan = planFunction(input.map(_.queryExecution.sparkPlan)) // A very simple resolver to make writing tests easier. In contrast to the real resolver // this is always case sensitive and does not try to handle scoping or complex type resolution. -val resolvedPlan = outputPlan transform { - case plan: SparkPlan => -val inputMap = plan.children.flatMap(_.output).zipWithIndex.map { - case (a, i) => -(a.name, BoundReference(i, a.dataType, a.nullable)) --- End diff -- @JoshRosen Hi Josh. I still prefer my way of resolving attributes, for two reasons: (1) References are bound in each operator, that's certainly something we should test. So in my opinion, we shouldn't bind the references manually in the test suite. (2) Manually binding the references isn't good for operators with two or more inputs. For these operators, there actually could be different ways to binding references depending on which implementation is used. The old implementation SparkPlanTest cannot handle operators with two or more inputs. We can certainly fix the old implementation by fix binding for binary operators, but that's gonna be tedious later, say if we are gona change some implementation --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r33995787 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala --- @@ -92,27 +153,25 @@ object SparkPlanTest { * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s. */ def checkAnswer( - input: DataFrame, - planFunction: SparkPlan => SparkPlan, + input: Seq[DataFrame], + planFunction: Seq[SparkPlan] => SparkPlan, expectedAnswer: Seq[Row]): Option[String] = { -val outputPlan = planFunction(input.queryExecution.sparkPlan) +val outputPlan = planFunction(input.map(_.queryExecution.sparkPlan)) // A very simple resolver to make writing tests easier. In contrast to the real resolver // this is always case sensitive and does not try to handle scoping or complex type resolution. -val resolvedPlan = outputPlan transform { - case plan: SparkPlan => -val inputMap = plan.children.flatMap(_.output).zipWithIndex.map { - case (a, i) => -(a.name, BoundReference(i, a.dataType, a.nullable)) --- End diff -- After changing this new code to continue to generate BoundReferences, the test in this patch fails: ``` [info]== Exception == [info]org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1888.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1888.0 (TID 6596, localhost): java.lang.ArrayIndexOutOfBoundsException: 2 [info] at org.apache.spark.sql.catalyst.expressions.ArrayBackedRow$class.apply(rows.scala:88) [info] at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.apply(rows.scala:144) [info] at org.apache.spark.sql.Row$class.isNullAt(Row.scala:182) [info] at org.apache.spark.sql.catalyst.InternalRow.isNullAt(InternalRow.scala:28) [info] at SC$SpecificProjection.apply(Unknown Source) [info] at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1$$anonfun$6$$anonfun$apply$3.apply(Exchange.scala:166) [info] at org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1$$anonfun$6$$anonfun$apply$3.apply(Exchange.scala:166) [info] at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) [info] at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:119) [info] at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) [info] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) [info] at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) [info] at org.apache.spark.scheduler.Task.run(Task.scala:70) [info] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) [info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [info] at java.lang.Thread.run(Thread.java:745) ``` /cc @marmbrus --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user JoshRosen commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r33995110 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala --- @@ -92,27 +153,25 @@ object SparkPlanTest { * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s. */ def checkAnswer( - input: DataFrame, - planFunction: SparkPlan => SparkPlan, + input: Seq[DataFrame], + planFunction: Seq[SparkPlan] => SparkPlan, expectedAnswer: Seq[Row]): Option[String] = { -val outputPlan = planFunction(input.queryExecution.sparkPlan) +val outputPlan = planFunction(input.map(_.queryExecution.sparkPlan)) // A very simple resolver to make writing tests easier. In contrast to the real resolver // this is always case sensitive and does not try to handle scoping or complex type resolution. -val resolvedPlan = outputPlan transform { - case plan: SparkPlan => -val inputMap = plan.children.flatMap(_.output).zipWithIndex.map { - case (a, i) => -(a.name, BoundReference(i, a.dataType, a.nullable)) --- End diff -- @kai-zeng, why did you remove this code which creates `BoundReference`s? In the other half of the diff below, it looks like the new code is only mapping from the attribute's name back to the attribute itself rather than binding the reference. This change caused a test case in my sorting patch to fail. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r33974944 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala --- @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} +import org.apache.spark.util.ThreadUtils + +import scala.collection.JavaConversions._ +import scala.concurrent._ +import scala.concurrent.duration._ + +/** + * :: DeveloperApi :: + * Performs a outer hash join for two child relations. When the output RDD of this operator is + * being constructed, a Spark job is asynchronously started to calculate the values for the + * broadcasted relation. This data is then placed in a Spark broadcast variable. The streamed + * relation is not shuffled. + */ +@DeveloperApi +case class BroadcastHashOuterJoin( +leftKeys: Seq[Expression], +rightKeys: Seq[Expression], +joinType: JoinType, +condition: Option[Expression], +left: SparkPlan, +right: SparkPlan) extends BinaryNode with HashOuterJoin { + + val timeout = { +val timeoutValue = sqlContext.conf.broadcastTimeout +if (timeoutValue < 0) { + Duration.Inf +} else { + timeoutValue.seconds +} + } + + override def requiredChildDistribution: Seq[Distribution] = +UnspecifiedDistribution :: UnspecifiedDistribution :: Nil + + private[this] lazy val (buildPlan, streamedPlan) = joinType match { +case RightOuter => (left, right) +case LeftOuter => (right, left) +case x => + throw new IllegalArgumentException( +s"BroadcastHashOuterJoin should not take $x as the JoinType") + } + + private[this] lazy val (buildKeys, streamedKeys) = joinType match { +case RightOuter => (leftKeys, rightKeys) +case LeftOuter => (rightKeys, leftKeys) +case x => + throw new IllegalArgumentException( +s"BroadcastHashOuterJoin should not take $x as the JoinType") + } + + @transient + private val broadcastFuture = future { +// Note that we use .execute().collect() because we don't want to convert data to Scala types +val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect() +// buildHashTable uses code-generated rows as keys, which are not serializable +val hashed = + buildHashTable(input.iterator, new InterpretedProjection(buildKeys, buildPlan.output)) +sparkContext.broadcast(hashed) + }(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext) + + override def doExecute(): RDD[InternalRow] = { +val broadcastRelation = Await.result(broadcastFuture, timeout) + +streamedPlan.execute().mapPartitions { streamedIter => + val joinedRow = new JoinedRow() + val hashTable = broadcastRelation.value + val keyGenerator = newProjection(streamedKeys, streamedPlan.output) + + joinType match { +case LeftOuter => + streamedIter.flatMap(currentRow => { +val rowKey = keyGenerator(currentRow) +joinedRow.withLeft(currentRow) +leftOuterIterator(rowKey, joinedRow, hashTable.getOrElse(rowKey, EMPTY_LIST)) + }) + +case RightOuter => + streamedIter.flatMap(currentRow => { +val rowKey = keyGenerator(currentRow) +joinedRow.withRight(currentRow) +rightOuterIterator(rowKey, hashTable.getOrE
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/7162#issuecomment-118977442 Here are some performance results: ![screen shot 2015-07-06 at 12 46 32 pm](https://cloud.githubusercontent.com/assets/527/8531650/c31266f4-23dd-11e5-849b-e3d000122f87.png) Pretty significant speedups when one of the tables is much larger. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/7162 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user marmbrus commented on the pull request: https://github.com/apache/spark/pull/7162#issuecomment-119003802 I'm going to go ahead and merge this to unblock other work on the plan tests. We can take care of threadpool consolidation in a followup. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7162#issuecomment-118449132 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7162#issuecomment-118439069 [Test build #36517 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36517/consoleFull) for PR 7162 at commit [`3742359`](https://github.com/apache/spark/commit/3742359b674c355da233e4cbeef49b65faa792b6). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7162#issuecomment-118438978 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7162#issuecomment-118438970 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user kai-zeng commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r33883065 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} +import org.apache.spark.util.ThreadUtils + +import scala.concurrent._ +import scala.concurrent.duration._ + +/** + * :: DeveloperApi :: + * Performs a outer hash join for two child relations. When the output RDD of this operator is + * being constructed, a Spark job is asynchronously started to calculate the values for the + * broadcasted relation. This data is then placed in a Spark broadcast variable. The streamed + * relation is not shuffled. + */ +@DeveloperApi +case class BroadcastHashOuterJoin( +leftKeys: Seq[Expression], +rightKeys: Seq[Expression], +joinType: JoinType, +condition: Option[Expression], +left: SparkPlan, +right: SparkPlan) extends BinaryNode with HashOuterJoin { + + val timeout = { +val timeoutValue = sqlContext.conf.broadcastTimeout +if (timeoutValue < 0) { + Duration.Inf +} else { + timeoutValue.seconds +} + } + + override def requiredChildDistribution: Seq[Distribution] = +UnspecifiedDistribution :: UnspecifiedDistribution :: Nil + + private[this] lazy val (buildPlan, streamedPlan) = joinType match { +case RightOuter => (left, right) +case LeftOuter => (right, left) +case x => + throw new IllegalArgumentException( +s"BroadcastHashOuterJoin should not take $x as the JoinType") + } + + private[this] lazy val (buildKeys, streamedKeys) = joinType match { +case RightOuter => (leftKeys, rightKeys) +case LeftOuter => (rightKeys, leftKeys) +case x => + throw new IllegalArgumentException( +s"BroadcastHashOuterJoin should not take $x as the JoinType") + } + + @transient + private val broadcastFuture = future { +// Note that we use .execute().collect() because we don't want to convert data to Scala types +val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect() +// buildHashTable uses code-generated rows as keys, which are not serializable +val hashed = new GeneralHashedRelation( + buildHashTable(input.iterator, newProjection(buildKeys, buildPlan.output))) --- End diff -- Sure --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r33883033 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} +import org.apache.spark.util.ThreadUtils + +import scala.concurrent._ +import scala.concurrent.duration._ + +/** + * :: DeveloperApi :: + * Performs a outer hash join for two child relations. When the output RDD of this operator is + * being constructed, a Spark job is asynchronously started to calculate the values for the + * broadcasted relation. This data is then placed in a Spark broadcast variable. The streamed + * relation is not shuffled. + */ +@DeveloperApi +case class BroadcastHashOuterJoin( +leftKeys: Seq[Expression], +rightKeys: Seq[Expression], +joinType: JoinType, +condition: Option[Expression], +left: SparkPlan, +right: SparkPlan) extends BinaryNode with HashOuterJoin { + + val timeout = { +val timeoutValue = sqlContext.conf.broadcastTimeout +if (timeoutValue < 0) { + Duration.Inf +} else { + timeoutValue.seconds +} + } + + override def requiredChildDistribution: Seq[Distribution] = +UnspecifiedDistribution :: UnspecifiedDistribution :: Nil + + private[this] lazy val (buildPlan, streamedPlan) = joinType match { +case RightOuter => (left, right) +case LeftOuter => (right, left) +case x => + throw new IllegalArgumentException( +s"BroadcastHashOuterJoin should not take $x as the JoinType") + } + + private[this] lazy val (buildKeys, streamedKeys) = joinType match { +case RightOuter => (leftKeys, rightKeys) +case LeftOuter => (rightKeys, leftKeys) +case x => + throw new IllegalArgumentException( +s"BroadcastHashOuterJoin should not take $x as the JoinType") + } + + @transient + private val broadcastFuture = future { +// Note that we use .execute().collect() because we don't want to convert data to Scala types +val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect() +// buildHashTable uses code-generated rows as keys, which are not serializable +val hashed = new GeneralHashedRelation( + buildHashTable(input.iterator, newProjection(buildKeys, buildPlan.output))) --- End diff -- This still fails for me when I run it on a real cluster. I'd just change this to `buildHashTable(input.iterator, new InterpretedProjection(buildKeys, buildPlan.output))) or we might even just change `newProjection` to always use `InterpretedProjection`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user kai-zeng commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r33882839 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} +import org.apache.spark.util.ThreadUtils + +import scala.concurrent._ +import scala.concurrent.duration._ + +/** + * :: DeveloperApi :: + * Performs a outer hash join for two child relations. When the output RDD of this operator is + * being constructed, a Spark job is asynchronously started to calculate the values for the + * broadcasted relation. This data is then placed in a Spark broadcast variable. The streamed + * relation is not shuffled. + */ +@DeveloperApi +case class BroadcastHashOuterJoin( +leftKeys: Seq[Expression], +rightKeys: Seq[Expression], +joinType: JoinType, +condition: Option[Expression], +left: SparkPlan, +right: SparkPlan) extends BinaryNode with HashOuterJoin { + + val timeout = { +val timeoutValue = sqlContext.conf.broadcastTimeout +if (timeoutValue < 0) { + Duration.Inf +} else { + timeoutValue.seconds +} + } + + override def requiredChildDistribution: Seq[Distribution] = +UnspecifiedDistribution :: UnspecifiedDistribution :: Nil + + private[this] lazy val (buildPlan, streamedPlan) = joinType match { +case RightOuter => (left, right) +case LeftOuter => (right, left) +case x => + throw new IllegalArgumentException( +s"BroadcastHashOuterJoin should not take $x as the JoinType") + } + + private[this] lazy val (buildKeys, streamedKeys) = joinType match { +case RightOuter => (leftKeys, rightKeys) +case LeftOuter => (rightKeys, leftKeys) +case x => + throw new IllegalArgumentException( +s"BroadcastHashOuterJoin should not take $x as the JoinType") + } + + @transient + private val broadcastFuture = future { +// Note that we use .execute().collect() because we don't want to convert data to Scala types +val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect() +// buildHashTable uses code-generated rows as keys, which are not serializable +val hashed = new GeneralHashedRelation( + buildHashTable(input.iterator, newProjection(buildKeys, buildPlan.output))) --- End diff -- @marmbrus Yeah, that's why I used GeneralHashedRelation to wrap the hash table. Which way do you think is better? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r33882776 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala --- @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical.{Distribution, UnspecifiedDistribution} +import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter} +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} +import org.apache.spark.util.ThreadUtils + +import scala.concurrent._ +import scala.concurrent.duration._ + +/** + * :: DeveloperApi :: + * Performs a outer hash join for two child relations. When the output RDD of this operator is + * being constructed, a Spark job is asynchronously started to calculate the values for the + * broadcasted relation. This data is then placed in a Spark broadcast variable. The streamed + * relation is not shuffled. + */ +@DeveloperApi +case class BroadcastHashOuterJoin( +leftKeys: Seq[Expression], +rightKeys: Seq[Expression], +joinType: JoinType, +condition: Option[Expression], +left: SparkPlan, +right: SparkPlan) extends BinaryNode with HashOuterJoin { + + val timeout = { +val timeoutValue = sqlContext.conf.broadcastTimeout +if (timeoutValue < 0) { + Duration.Inf +} else { + timeoutValue.seconds +} + } + + override def requiredChildDistribution: Seq[Distribution] = +UnspecifiedDistribution :: UnspecifiedDistribution :: Nil + + private[this] lazy val (buildPlan, streamedPlan) = joinType match { +case RightOuter => (left, right) +case LeftOuter => (right, left) +case x => + throw new IllegalArgumentException( +s"BroadcastHashOuterJoin should not take $x as the JoinType") + } + + private[this] lazy val (buildKeys, streamedKeys) = joinType match { +case RightOuter => (leftKeys, rightKeys) +case LeftOuter => (rightKeys, leftKeys) +case x => + throw new IllegalArgumentException( +s"BroadcastHashOuterJoin should not take $x as the JoinType") + } + + @transient + private val broadcastFuture = future { +// Note that we use .execute().collect() because we don't want to convert data to Scala types +val input: Array[InternalRow] = buildPlan.execute().map(_.copy()).collect() +// buildHashTable uses code-generated rows as keys, which are not serializable +val hashed = new GeneralHashedRelation( + buildHashTable(input.iterator, newProjection(buildKeys, buildPlan.output))) --- End diff -- I think you need to use `new InterpretedProjection` here, otherwise you try to broadcast code-generated `SpecificRow`s, which fails when in non-local mode. See: #7213. @davies / @rxin , I'm now officially in favor of removing `GenerateProjection`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7162#issuecomment-117963252 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7162#issuecomment-117938040 [Test build #36360 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36360/consoleFull) for PR 7162 at commit [`14e4bf8`](https://github.com/apache/spark/commit/14e4bf8184f27a262e0ac0355be090fa0c8c5a3c). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7162#issuecomment-117937885 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7162#issuecomment-117937901 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user rxin commented on a diff in the pull request: https://github.com/apache/spark/pull/7162#discussion_r33742969 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala --- @@ -118,8 +118,19 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, right) => -joins.HashOuterJoin( - leftKeys, rightKeys, joinType, condition, planLater(left), planLater(right)) :: Nil +joinType match { + case LeftOuter if sqlContext.conf.autoBroadcastJoinThreshold > 0 && --- End diff -- can you use CanBroadcast here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user kai-zeng commented on the pull request: https://github.com/apache/spark/pull/7162#issuecomment-117839182 cc @marmbrus --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7162#issuecomment-117799843 Merged build finished. Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/7162#issuecomment-117764781 [Test build #36285 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36285/consoleFull) for PR 7162 at commit [`dc5127e`](https://github.com/apache/spark/commit/dc5127e4eb00cfeafbb3b887001532abcb4b5920). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7162#issuecomment-117764343 Merged build triggered. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/7162#issuecomment-117764373 Merged build started. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org