[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-08-04 Thread kai-zeng
Github user kai-zeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r36235723
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
ClusteredDistribution}
+import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, 
LeftOuter, RightOuter}
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+
+import scala.collection.JavaConversions._
--- End diff --

Yep. I agree with your first concern. It is really necessary to refactor 
the outer join as now we have both shuffled and broadcast outer join. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-08-04 Thread kai-zeng
Github user kai-zeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r36235248
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
ClusteredDistribution}
+import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, 
LeftOuter, RightOuter}
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+
+import scala.collection.JavaConversions._
+
+/**
+ * :: DeveloperApi ::
+ * Performs a hash based outer join for two child relations by shuffling 
the data using
+ * the join keys. This operator requires loading the associated partition 
in both side into memory.
+ */
+@DeveloperApi
+case class ShuffledHashOuterJoin(
+leftKeys: Seq[Expression],
+rightKeys: Seq[Expression],
+joinType: JoinType,
+condition: Option[Expression],
+left: SparkPlan,
+right: SparkPlan) extends BinaryNode with HashOuterJoin {
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: 
Nil
+
+  protected override def doExecute(): RDD[InternalRow] = {
+val joinedRow = new JoinedRow()
+left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) 
=>
+  // TODO this probably can be replaced by external sort (sort merged 
join?)
+  joinType match {
+case LeftOuter =>
--- End diff --

Here joinType matching is performed once per partition, not once per row. 
So I don't think it is a big issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-08-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r36234861
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
ClusteredDistribution}
+import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, 
LeftOuter, RightOuter}
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+
+import scala.collection.JavaConversions._
+
+/**
+ * :: DeveloperApi ::
+ * Performs a hash based outer join for two child relations by shuffling 
the data using
+ * the join keys. This operator requires loading the associated partition 
in both side into memory.
+ */
+@DeveloperApi
+case class ShuffledHashOuterJoin(
+leftKeys: Seq[Expression],
+rightKeys: Seq[Expression],
+joinType: JoinType,
+condition: Option[Expression],
+left: SparkPlan,
+right: SparkPlan) extends BinaryNode with HashOuterJoin {
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: 
Nil
+
+  protected override def doExecute(): RDD[InternalRow] = {
+val joinedRow = new JoinedRow()
+left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) 
=>
+  // TODO this probably can be replaced by external sort (sort merged 
join?)
+  joinType match {
+case LeftOuter =>
+  val rightHashTable = buildHashTable(rightIter, 
newProjection(rightKeys, right.output))
+  val keyGenerator = newProjection(leftKeys, left.output)
+  leftIter.flatMap( currentRow => {
+val rowKey = keyGenerator(currentRow)
+joinedRow.withLeft(currentRow)
+leftOuterIterator(rowKey, joinedRow, 
rightHashTable.getOrElse(rowKey, EMPTY_LIST))
+  })
+
+case RightOuter =>
+  val leftHashTable = buildHashTable(leftIter, 
newProjection(leftKeys, left.output))
+  val keyGenerator = newProjection(rightKeys, right.output)
+  rightIter.flatMap ( currentRow => {
+val rowKey = keyGenerator(currentRow)
+joinedRow.withRight(currentRow)
+rightOuterIterator(rowKey, leftHashTable.getOrElse(rowKey, 
EMPTY_LIST), joinedRow)
--- End diff --

It looks like Davies ended up cleaning up these instances of the MapWrapper 
issue as part of one of his Tungsten PRs and I'll fix the remaining FullOuter 
case as part of my PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-08-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r36234637
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
ClusteredDistribution}
+import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, 
LeftOuter, RightOuter}
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+
+import scala.collection.JavaConversions._
+
+/**
+ * :: DeveloperApi ::
+ * Performs a hash based outer join for two child relations by shuffling 
the data using
+ * the join keys. This operator requires loading the associated partition 
in both side into memory.
+ */
+@DeveloperApi
+case class ShuffledHashOuterJoin(
+leftKeys: Seq[Expression],
+rightKeys: Seq[Expression],
+joinType: JoinType,
+condition: Option[Expression],
+left: SparkPlan,
+right: SparkPlan) extends BinaryNode with HashOuterJoin {
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: 
Nil
+
+  protected override def doExecute(): RDD[InternalRow] = {
+val joinedRow = new JoinedRow()
+left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) 
=>
+  // TODO this probably can be replaced by external sort (sort merged 
join?)
+  joinType match {
+case LeftOuter =>
--- End diff --

Second possible perf. concern: I think that the `joinType` check should be 
merged out of the `zipPartitions` call so that we only perform it once, not 
once per row.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-08-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r36234539
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
ClusteredDistribution}
+import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, 
LeftOuter, RightOuter}
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+
+import scala.collection.JavaConversions._
+
+/**
+ * :: DeveloperApi ::
+ * Performs a hash based outer join for two child relations by shuffling 
the data using
+ * the join keys. This operator requires loading the associated partition 
in both side into memory.
+ */
+@DeveloperApi
+case class ShuffledHashOuterJoin(
+leftKeys: Seq[Expression],
+rightKeys: Seq[Expression],
+joinType: JoinType,
+condition: Option[Expression],
+left: SparkPlan,
+right: SparkPlan) extends BinaryNode with HashOuterJoin {
+
+  override def requiredChildDistribution: Seq[Distribution] =
+ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: 
Nil
+
+  protected override def doExecute(): RDD[InternalRow] = {
+val joinedRow = new JoinedRow()
+left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) 
=>
+  // TODO this probably can be replaced by external sort (sort merged 
join?)
+  joinType match {
+case LeftOuter =>
+  val rightHashTable = buildHashTable(rightIter, 
newProjection(rightKeys, right.output))
+  val keyGenerator = newProjection(leftKeys, left.output)
+  leftIter.flatMap( currentRow => {
+val rowKey = keyGenerator(currentRow)
+joinedRow.withLeft(currentRow)
+leftOuterIterator(rowKey, joinedRow, 
rightHashTable.getOrElse(rowKey, EMPTY_LIST))
+  })
+
+case RightOuter =>
+  val leftHashTable = buildHashTable(leftIter, 
newProjection(leftKeys, left.output))
+  val keyGenerator = newProjection(rightKeys, right.output)
+  rightIter.flatMap ( currentRow => {
+val rowKey = keyGenerator(currentRow)
+joinedRow.withRight(currentRow)
+rightOuterIterator(rowKey, leftHashTable.getOrElse(rowKey, 
EMPTY_LIST), joinedRow)
+  })
+
+case FullOuter =>
+  val leftHashTable = buildHashTable(leftIter, 
newProjection(leftKeys, left.output))
+  val rightHashTable = buildHashTable(rightIter, 
newProjection(rightKeys, right.output))
+  (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap 
{ key =>
+fullOuterIterator(key,
+  leftHashTable.getOrElse(key, EMPTY_LIST),
--- End diff --

Here, doing an explicit Java-style null check on the Java HashMaps would 
avoid the cost of allocating a MapWrapper for each row.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-08-04 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r36234484
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala
 ---
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
ClusteredDistribution}
+import org.apache.spark.sql.catalyst.plans.{FullOuter, JoinType, 
LeftOuter, RightOuter}
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+
+import scala.collection.JavaConversions._
--- End diff --

@kai-zeng I'm in the process of refactoring some parts of this file as part 
of another PR of mine and wanted to briefly call out two minor performance 
considerations that I noticed while looking at this code.

The first is the use of `JavaConversions` here, which allows for implicit 
conversion between Java and Scala classes. This leads to the creation of an 
unnecessary per-row MapWrapper in FullOuter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-06 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r34002025
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala ---
@@ -92,27 +153,25 @@ object SparkPlanTest {
* @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
*/
   def checkAnswer(
-  input: DataFrame,
-  planFunction: SparkPlan => SparkPlan,
+  input: Seq[DataFrame],
+  planFunction: Seq[SparkPlan] => SparkPlan,
   expectedAnswer: Seq[Row]): Option[String] = {
 
-val outputPlan = planFunction(input.queryExecution.sparkPlan)
+val outputPlan = planFunction(input.map(_.queryExecution.sparkPlan))
 
 // A very simple resolver to make writing tests easier. In contrast to 
the real resolver
 // this is always case sensitive and does not try to handle scoping or 
complex type resolution.
-val resolvedPlan = outputPlan transform {
-  case plan: SparkPlan =>
-val inputMap = plan.children.flatMap(_.output).zipWithIndex.map {
-  case (a, i) =>
-(a.name, BoundReference(i, a.dataType, a.nullable))
--- End diff --

@kai-zeng, I ended up discussing offline with Michael and reached the same 
conclusion.  Thanks for your help!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-06 Thread kai-zeng
Github user kai-zeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r34001466
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala ---
@@ -92,27 +153,25 @@ object SparkPlanTest {
* @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
*/
   def checkAnswer(
-  input: DataFrame,
-  planFunction: SparkPlan => SparkPlan,
+  input: Seq[DataFrame],
+  planFunction: Seq[SparkPlan] => SparkPlan,
   expectedAnswer: Seq[Row]): Option[String] = {
 
-val outputPlan = planFunction(input.queryExecution.sparkPlan)
+val outputPlan = planFunction(input.map(_.queryExecution.sparkPlan))
 
 // A very simple resolver to make writing tests easier. In contrast to 
the real resolver
 // this is always case sensitive and does not try to handle scoping or 
complex type resolution.
-val resolvedPlan = outputPlan transform {
-  case plan: SparkPlan =>
-val inputMap = plan.children.flatMap(_.output).zipWithIndex.map {
-  case (a, i) =>
-(a.name, BoundReference(i, a.dataType, a.nullable))
--- End diff --

@JoshRosen Hi Josh. I still prefer my way of resolving attributes, for two 
reasons:
(1) References are bound in each operator, that's certainly something we 
should test. So in my opinion, we shouldn't bind the references manually in the 
test suite.
(2) Manually binding the references isn't good for operators with two or 
more inputs. For these operators, there actually could be different ways to 
binding references depending on which implementation is used. The old 
implementation SparkPlanTest cannot handle operators with two or more inputs. 
We can certainly fix the old implementation by fix binding for binary 
operators, but that's gonna be tedious later, say if we are gona change some 
implementation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-06 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r33995787
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala ---
@@ -92,27 +153,25 @@ object SparkPlanTest {
* @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
*/
   def checkAnswer(
-  input: DataFrame,
-  planFunction: SparkPlan => SparkPlan,
+  input: Seq[DataFrame],
+  planFunction: Seq[SparkPlan] => SparkPlan,
   expectedAnswer: Seq[Row]): Option[String] = {
 
-val outputPlan = planFunction(input.queryExecution.sparkPlan)
+val outputPlan = planFunction(input.map(_.queryExecution.sparkPlan))
 
 // A very simple resolver to make writing tests easier. In contrast to 
the real resolver
 // this is always case sensitive and does not try to handle scoping or 
complex type resolution.
-val resolvedPlan = outputPlan transform {
-  case plan: SparkPlan =>
-val inputMap = plan.children.flatMap(_.output).zipWithIndex.map {
-  case (a, i) =>
-(a.name, BoundReference(i, a.dataType, a.nullable))
--- End diff --

After changing this new code to continue to generate BoundReferences, the 
test in this patch fails:

```
[info]== Exception ==
[info]org.apache.spark.SparkException: Job aborted due to stage 
failure: Task 0 in stage 1888.0 failed 1 times, most recent failure: Lost task 
0.0 in stage 1888.0 (TID 6596, localhost): 
java.lang.ArrayIndexOutOfBoundsException: 2
[info]  at 
org.apache.spark.sql.catalyst.expressions.ArrayBackedRow$class.apply(rows.scala:88)
[info]  at 
org.apache.spark.sql.catalyst.expressions.GenericInternalRow.apply(rows.scala:144)
[info]  at org.apache.spark.sql.Row$class.isNullAt(Row.scala:182)
[info]  at 
org.apache.spark.sql.catalyst.InternalRow.isNullAt(InternalRow.scala:28)
[info]  at SC$SpecificProjection.apply(Unknown Source)
[info]  at 
org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1$$anonfun$6$$anonfun$apply$3.apply(Exchange.scala:166)
[info]  at 
org.apache.spark.sql.execution.Exchange$$anonfun$doExecute$1$$anonfun$6$$anonfun$apply$3.apply(Exchange.scala:166)
[info]  at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
[info]  at 
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.insertAll(BypassMergeSortShuffleWriter.java:119)
[info]  at 
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
[info]  at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
[info]  at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
[info]  at org.apache.spark.scheduler.Task.run(Task.scala:70)
[info]  at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
[info]  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
[info]  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
[info]  at java.lang.Thread.run(Thread.java:745)
```

/cc @marmbrus 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-06 Thread JoshRosen
Github user JoshRosen commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r33995110
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanTest.scala ---
@@ -92,27 +153,25 @@ object SparkPlanTest {
* @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
*/
   def checkAnswer(
-  input: DataFrame,
-  planFunction: SparkPlan => SparkPlan,
+  input: Seq[DataFrame],
+  planFunction: Seq[SparkPlan] => SparkPlan,
   expectedAnswer: Seq[Row]): Option[String] = {
 
-val outputPlan = planFunction(input.queryExecution.sparkPlan)
+val outputPlan = planFunction(input.map(_.queryExecution.sparkPlan))
 
 // A very simple resolver to make writing tests easier. In contrast to 
the real resolver
 // this is always case sensitive and does not try to handle scoping or 
complex type resolution.
-val resolvedPlan = outputPlan transform {
-  case plan: SparkPlan =>
-val inputMap = plan.children.flatMap(_.output).zipWithIndex.map {
-  case (a, i) =>
-(a.name, BoundReference(i, a.dataType, a.nullable))
--- End diff --

@kai-zeng, why did you remove this code which creates `BoundReference`s?  
In the other half of the diff below, it looks like the new code is only mapping 
from the attribute's name back to the attribute itself rather than binding the 
reference.  This change caused a test case in my sorting patch to fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-06 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r33974944
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
 ---
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, 
RightOuter}
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.util.ThreadUtils
+
+import scala.collection.JavaConversions._
+import scala.concurrent._
+import scala.concurrent.duration._
+
+/**
+ * :: DeveloperApi ::
+ * Performs a outer hash join for two child relations.  When the output 
RDD of this operator is
+ * being constructed, a Spark job is asynchronously started to calculate 
the values for the
+ * broadcasted relation.  This data is then placed in a Spark broadcast 
variable.  The streamed
+ * relation is not shuffled.
+ */
+@DeveloperApi
+case class BroadcastHashOuterJoin(
+leftKeys: Seq[Expression],
+rightKeys: Seq[Expression],
+joinType: JoinType,
+condition: Option[Expression],
+left: SparkPlan,
+right: SparkPlan) extends BinaryNode with HashOuterJoin {
+
+  val timeout = {
+val timeoutValue = sqlContext.conf.broadcastTimeout
+if (timeoutValue < 0) {
+  Duration.Inf
+} else {
+  timeoutValue.seconds
+}
+  }
+
+  override def requiredChildDistribution: Seq[Distribution] =
+UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
+
+  private[this] lazy val (buildPlan, streamedPlan) = joinType match {
+case RightOuter => (left, right)
+case LeftOuter => (right, left)
+case x =>
+  throw new IllegalArgumentException(
+s"BroadcastHashOuterJoin should not take $x as the JoinType")
+  }
+
+  private[this] lazy val (buildKeys, streamedKeys) = joinType match {
+case RightOuter => (leftKeys, rightKeys)
+case LeftOuter => (rightKeys, leftKeys)
+case x =>
+  throw new IllegalArgumentException(
+s"BroadcastHashOuterJoin should not take $x as the JoinType")
+  }
+
+  @transient
+  private val broadcastFuture = future {
+// Note that we use .execute().collect() because we don't want to 
convert data to Scala types
+val input: Array[InternalRow] = 
buildPlan.execute().map(_.copy()).collect()
+// buildHashTable uses code-generated rows as keys, which are not 
serializable
+val hashed =
+  buildHashTable(input.iterator, new InterpretedProjection(buildKeys, 
buildPlan.output))
+sparkContext.broadcast(hashed)
+  }(BroadcastHashOuterJoin.broadcastHashOuterJoinExecutionContext)
+
+  override def doExecute(): RDD[InternalRow] = {
+val broadcastRelation = Await.result(broadcastFuture, timeout)
+
+streamedPlan.execute().mapPartitions { streamedIter =>
+  val joinedRow = new JoinedRow()
+  val hashTable = broadcastRelation.value
+  val keyGenerator = newProjection(streamedKeys, streamedPlan.output)
+
+  joinType match {
+case LeftOuter =>
+  streamedIter.flatMap(currentRow => {
+val rowKey = keyGenerator(currentRow)
+joinedRow.withLeft(currentRow)
+leftOuterIterator(rowKey, joinedRow, 
hashTable.getOrElse(rowKey, EMPTY_LIST))
+  })
+
+case RightOuter =>
+  streamedIter.flatMap(currentRow => {
+val rowKey = keyGenerator(currentRow)
+joinedRow.withRight(currentRow)
+rightOuterIterator(rowKey, hashTable.getOrE

[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-06 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/7162#issuecomment-118977442
  
Here are some performance results:

![screen shot 2015-07-06 at 12 46 32 
pm](https://cloud.githubusercontent.com/assets/527/8531650/c31266f4-23dd-11e5-849b-e3d000122f87.png)

Pretty significant speedups when one of the tables is much larger.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-06 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/7162


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-06 Thread marmbrus
Github user marmbrus commented on the pull request:

https://github.com/apache/spark/pull/7162#issuecomment-119003802
  
I'm going to go ahead and merge this to unblock other work on the plan 
tests.  We can take care of threadpool consolidation in a followup.  Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7162#issuecomment-118449132
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-03 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7162#issuecomment-118439069
  
  [Test build #36517 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36517/consoleFull)
 for   PR 7162 at commit 
[`3742359`](https://github.com/apache/spark/commit/3742359b674c355da233e4cbeef49b65faa792b6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7162#issuecomment-118438978
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-03 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7162#issuecomment-118438970
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-03 Thread kai-zeng
Github user kai-zeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r33883065
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, 
RightOuter}
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.util.ThreadUtils
+
+import scala.concurrent._
+import scala.concurrent.duration._
+
+/**
+ * :: DeveloperApi ::
+ * Performs a outer hash join for two child relations.  When the output 
RDD of this operator is
+ * being constructed, a Spark job is asynchronously started to calculate 
the values for the
+ * broadcasted relation.  This data is then placed in a Spark broadcast 
variable.  The streamed
+ * relation is not shuffled.
+ */
+@DeveloperApi
+case class BroadcastHashOuterJoin(
+leftKeys: Seq[Expression],
+rightKeys: Seq[Expression],
+joinType: JoinType,
+condition: Option[Expression],
+left: SparkPlan,
+right: SparkPlan) extends BinaryNode with HashOuterJoin {
+
+  val timeout = {
+val timeoutValue = sqlContext.conf.broadcastTimeout
+if (timeoutValue < 0) {
+  Duration.Inf
+} else {
+  timeoutValue.seconds
+}
+  }
+
+  override def requiredChildDistribution: Seq[Distribution] =
+UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
+
+  private[this] lazy val (buildPlan, streamedPlan) = joinType match {
+case RightOuter => (left, right)
+case LeftOuter => (right, left)
+case x =>
+  throw new IllegalArgumentException(
+s"BroadcastHashOuterJoin should not take $x as the JoinType")
+  }
+
+  private[this] lazy val (buildKeys, streamedKeys) = joinType match {
+case RightOuter => (leftKeys, rightKeys)
+case LeftOuter => (rightKeys, leftKeys)
+case x =>
+  throw new IllegalArgumentException(
+s"BroadcastHashOuterJoin should not take $x as the JoinType")
+  }
+
+  @transient
+  private val broadcastFuture = future {
+// Note that we use .execute().collect() because we don't want to 
convert data to Scala types
+val input: Array[InternalRow] = 
buildPlan.execute().map(_.copy()).collect()
+// buildHashTable uses code-generated rows as keys, which are not 
serializable
+val hashed = new GeneralHashedRelation(
+  buildHashTable(input.iterator, newProjection(buildKeys, 
buildPlan.output)))
--- End diff --

Sure


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-03 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r33883033
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, 
RightOuter}
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.util.ThreadUtils
+
+import scala.concurrent._
+import scala.concurrent.duration._
+
+/**
+ * :: DeveloperApi ::
+ * Performs a outer hash join for two child relations.  When the output 
RDD of this operator is
+ * being constructed, a Spark job is asynchronously started to calculate 
the values for the
+ * broadcasted relation.  This data is then placed in a Spark broadcast 
variable.  The streamed
+ * relation is not shuffled.
+ */
+@DeveloperApi
+case class BroadcastHashOuterJoin(
+leftKeys: Seq[Expression],
+rightKeys: Seq[Expression],
+joinType: JoinType,
+condition: Option[Expression],
+left: SparkPlan,
+right: SparkPlan) extends BinaryNode with HashOuterJoin {
+
+  val timeout = {
+val timeoutValue = sqlContext.conf.broadcastTimeout
+if (timeoutValue < 0) {
+  Duration.Inf
+} else {
+  timeoutValue.seconds
+}
+  }
+
+  override def requiredChildDistribution: Seq[Distribution] =
+UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
+
+  private[this] lazy val (buildPlan, streamedPlan) = joinType match {
+case RightOuter => (left, right)
+case LeftOuter => (right, left)
+case x =>
+  throw new IllegalArgumentException(
+s"BroadcastHashOuterJoin should not take $x as the JoinType")
+  }
+
+  private[this] lazy val (buildKeys, streamedKeys) = joinType match {
+case RightOuter => (leftKeys, rightKeys)
+case LeftOuter => (rightKeys, leftKeys)
+case x =>
+  throw new IllegalArgumentException(
+s"BroadcastHashOuterJoin should not take $x as the JoinType")
+  }
+
+  @transient
+  private val broadcastFuture = future {
+// Note that we use .execute().collect() because we don't want to 
convert data to Scala types
+val input: Array[InternalRow] = 
buildPlan.execute().map(_.copy()).collect()
+// buildHashTable uses code-generated rows as keys, which are not 
serializable
+val hashed = new GeneralHashedRelation(
+  buildHashTable(input.iterator, newProjection(buildKeys, 
buildPlan.output)))
--- End diff --

This still fails for me when I run it on a real cluster.  I'd just change 
this to `buildHashTable(input.iterator, new InterpretedProjection(buildKeys, 
buildPlan.output))) or we might even just change `newProjection` to always use 
`InterpretedProjection`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-03 Thread kai-zeng
Github user kai-zeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r33882839
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, 
RightOuter}
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.util.ThreadUtils
+
+import scala.concurrent._
+import scala.concurrent.duration._
+
+/**
+ * :: DeveloperApi ::
+ * Performs a outer hash join for two child relations.  When the output 
RDD of this operator is
+ * being constructed, a Spark job is asynchronously started to calculate 
the values for the
+ * broadcasted relation.  This data is then placed in a Spark broadcast 
variable.  The streamed
+ * relation is not shuffled.
+ */
+@DeveloperApi
+case class BroadcastHashOuterJoin(
+leftKeys: Seq[Expression],
+rightKeys: Seq[Expression],
+joinType: JoinType,
+condition: Option[Expression],
+left: SparkPlan,
+right: SparkPlan) extends BinaryNode with HashOuterJoin {
+
+  val timeout = {
+val timeoutValue = sqlContext.conf.broadcastTimeout
+if (timeoutValue < 0) {
+  Duration.Inf
+} else {
+  timeoutValue.seconds
+}
+  }
+
+  override def requiredChildDistribution: Seq[Distribution] =
+UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
+
+  private[this] lazy val (buildPlan, streamedPlan) = joinType match {
+case RightOuter => (left, right)
+case LeftOuter => (right, left)
+case x =>
+  throw new IllegalArgumentException(
+s"BroadcastHashOuterJoin should not take $x as the JoinType")
+  }
+
+  private[this] lazy val (buildKeys, streamedKeys) = joinType match {
+case RightOuter => (leftKeys, rightKeys)
+case LeftOuter => (rightKeys, leftKeys)
+case x =>
+  throw new IllegalArgumentException(
+s"BroadcastHashOuterJoin should not take $x as the JoinType")
+  }
+
+  @transient
+  private val broadcastFuture = future {
+// Note that we use .execute().collect() because we don't want to 
convert data to Scala types
+val input: Array[InternalRow] = 
buildPlan.execute().map(_.copy()).collect()
+// buildHashTable uses code-generated rows as keys, which are not 
serializable
+val hashed = new GeneralHashedRelation(
+  buildHashTable(input.iterator, newProjection(buildKeys, 
buildPlan.output)))
--- End diff --

@marmbrus Yeah, that's why I used GeneralHashedRelation to wrap the hash 
table. Which way do you think is better?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-03 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r33882776
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashOuterJoin.scala
 ---
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.joins
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{Distribution, 
UnspecifiedDistribution}
+import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, 
RightOuter}
+import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
+import org.apache.spark.util.ThreadUtils
+
+import scala.concurrent._
+import scala.concurrent.duration._
+
+/**
+ * :: DeveloperApi ::
+ * Performs a outer hash join for two child relations.  When the output 
RDD of this operator is
+ * being constructed, a Spark job is asynchronously started to calculate 
the values for the
+ * broadcasted relation.  This data is then placed in a Spark broadcast 
variable.  The streamed
+ * relation is not shuffled.
+ */
+@DeveloperApi
+case class BroadcastHashOuterJoin(
+leftKeys: Seq[Expression],
+rightKeys: Seq[Expression],
+joinType: JoinType,
+condition: Option[Expression],
+left: SparkPlan,
+right: SparkPlan) extends BinaryNode with HashOuterJoin {
+
+  val timeout = {
+val timeoutValue = sqlContext.conf.broadcastTimeout
+if (timeoutValue < 0) {
+  Duration.Inf
+} else {
+  timeoutValue.seconds
+}
+  }
+
+  override def requiredChildDistribution: Seq[Distribution] =
+UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
+
+  private[this] lazy val (buildPlan, streamedPlan) = joinType match {
+case RightOuter => (left, right)
+case LeftOuter => (right, left)
+case x =>
+  throw new IllegalArgumentException(
+s"BroadcastHashOuterJoin should not take $x as the JoinType")
+  }
+
+  private[this] lazy val (buildKeys, streamedKeys) = joinType match {
+case RightOuter => (leftKeys, rightKeys)
+case LeftOuter => (rightKeys, leftKeys)
+case x =>
+  throw new IllegalArgumentException(
+s"BroadcastHashOuterJoin should not take $x as the JoinType")
+  }
+
+  @transient
+  private val broadcastFuture = future {
+// Note that we use .execute().collect() because we don't want to 
convert data to Scala types
+val input: Array[InternalRow] = 
buildPlan.execute().map(_.copy()).collect()
+// buildHashTable uses code-generated rows as keys, which are not 
serializable
+val hashed = new GeneralHashedRelation(
+  buildHashTable(input.iterator, newProjection(buildKeys, 
buildPlan.output)))
--- End diff --

I think you need to use `new InterpretedProjection` here, otherwise you try 
to broadcast code-generated `SpecificRow`s, which fails when in non-local mode. 
See: #7213.

@davies / @rxin , I'm now officially in favor of removing 
`GenerateProjection`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-02 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7162#issuecomment-117963252
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-02 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7162#issuecomment-117938040
  
  [Test build #36360 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36360/consoleFull)
 for   PR 7162 at commit 
[`14e4bf8`](https://github.com/apache/spark/commit/14e4bf8184f27a262e0ac0355be090fa0c8c5a3c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-02 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7162#issuecomment-117937885
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-02 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7162#issuecomment-117937901
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-01 Thread rxin
Github user rxin commented on a diff in the pull request:

https://github.com/apache/spark/pull/7162#discussion_r33742969
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---
@@ -118,8 +118,19 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 condition.map(Filter(_, hashJoin)).getOrElse(hashJoin) :: Nil
 
   case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, 
left, right) =>
-joins.HashOuterJoin(
-  leftKeys, rightKeys, joinType, condition, planLater(left), 
planLater(right)) :: Nil
+joinType match {
+  case LeftOuter if sqlContext.conf.autoBroadcastJoinThreshold > 0 
&&
--- End diff --

can you use CanBroadcast here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-01 Thread kai-zeng
Github user kai-zeng commented on the pull request:

https://github.com/apache/spark/pull/7162#issuecomment-117839182
  
cc @marmbrus 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7162#issuecomment-117799843
  
Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-01 Thread SparkQA
Github user SparkQA commented on the pull request:

https://github.com/apache/spark/pull/7162#issuecomment-117764781
  
  [Test build #36285 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/36285/consoleFull)
 for   PR 7162 at commit 
[`dc5127e`](https://github.com/apache/spark/commit/dc5127e4eb00cfeafbb3b887001532abcb4b5920).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7162#issuecomment-117764343
  
 Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4485][SQL] (1) Add broadcast hash outer...

2015-07-01 Thread AmplabJenkins
Github user AmplabJenkins commented on the pull request:

https://github.com/apache/spark/pull/7162#issuecomment-117764373
  
Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org