Repository: spark
Updated Branches:
  refs/heads/master 64f04154e -> 217e49644


[SPARK-9996] [SPARK-9997] [SQL] Add local expand and NestedLoopJoin operators

This PR is in conflict with #8535 and #8573. Will update this one when they are 
merged.

Author: zsxwing <zsxw...@gmail.com>

Closes #8642 from zsxwing/expand-nest-join.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/217e4964
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/217e4964
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/217e4964

Branch: refs/heads/master
Commit: 217e4964444f4e07b894b1bca768a0cbbe799ea0
Parents: 64f0415
Author: zsxwing <zsxw...@gmail.com>
Authored: Mon Sep 14 15:00:27 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Mon Sep 14 15:00:27 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/local/ExpandNode.scala  |  60 +++++
 .../spark/sql/execution/local/LocalNode.scala   |  55 ++++-
 .../execution/local/NestedLoopJoinNode.scala    | 156 ++++++++++++
 .../sql/execution/local/ExpandNodeSuite.scala   |  51 ++++
 .../sql/execution/local/HashJoinNodeSuite.scala |  14 --
 .../sql/execution/local/LocalNodeTest.scala     |  14 ++
 .../local/NestedLoopJoinNodeSuite.scala         | 239 +++++++++++++++++++
 7 files changed, 574 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/217e4964/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
new file mode 100644
index 0000000..2aff156
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/ExpandNode.scala
@@ -0,0 +1,60 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
Projection}
+
+case class ExpandNode(
+    conf: SQLConf,
+    projections: Seq[Seq[Expression]],
+    output: Seq[Attribute],
+    child: LocalNode) extends UnaryLocalNode(conf) {
+
+  assert(projections.size > 0)
+
+  private[this] var result: InternalRow = _
+  private[this] var idx: Int = _
+  private[this] var input: InternalRow = _
+  private[this] var groups: Array[Projection] = _
+
+  override def open(): Unit = {
+    child.open()
+    groups = projections.map(ee => newProjection(ee, child.output)).toArray
+    idx = groups.length
+  }
+
+  override def next(): Boolean = {
+    if (idx >= groups.length) {
+      if (child.next()) {
+        input = child.fetch()
+        idx = 0
+      } else {
+        return false
+      }
+    }
+    result = groups(idx)(input)
+    idx += 1
+    true
+  }
+
+  override def fetch(): InternalRow = result
+
+  override def close(): Unit = child.close()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/217e4964/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
index e540ef8..9840080 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
@@ -23,7 +23,7 @@ import org.apache.spark.Logging
 import org.apache.spark.sql.{SQLConf, Row}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions._
-import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
+import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.types.StructType
 
@@ -69,6 +69,18 @@ abstract class LocalNode(conf: SQLConf) extends 
TreeNode[LocalNode] with Logging
    */
   def close(): Unit
 
+  /** Specifies whether this operator outputs UnsafeRows */
+  def outputsUnsafeRows: Boolean = false
+
+  /** Specifies whether this operator is capable of processing UnsafeRows */
+  def canProcessUnsafeRows: Boolean = false
+
+  /**
+   * Specifies whether this operator is capable of processing 
Java-object-based Rows (i.e. rows
+   * that are not UnsafeRows).
+   */
+  def canProcessSafeRows: Boolean = true
+
   /**
    * Returns the content through the [[Iterator]] interface.
    */
@@ -91,6 +103,28 @@ abstract class LocalNode(conf: SQLConf) extends 
TreeNode[LocalNode] with Logging
     result
   }
 
+  protected def newProjection(
+      expressions: Seq[Expression],
+      inputSchema: Seq[Attribute]): Projection = {
+    log.debug(
+      s"Creating Projection: $expressions, inputSchema: $inputSchema, 
codegen:$codegenEnabled")
+    if (codegenEnabled) {
+      try {
+        GenerateProjection.generate(expressions, inputSchema)
+      } catch {
+        case NonFatal(e) =>
+          if (isTesting) {
+            throw e
+          } else {
+            log.error("Failed to generate projection, fallback to interpret", 
e)
+            new InterpretedProjection(expressions, inputSchema)
+          }
+      }
+    } else {
+      new InterpretedProjection(expressions, inputSchema)
+    }
+  }
+
   protected def newMutableProjection(
       expressions: Seq[Expression],
       inputSchema: Seq[Attribute]): () => MutableProjection = {
@@ -113,6 +147,25 @@ abstract class LocalNode(conf: SQLConf) extends 
TreeNode[LocalNode] with Logging
     }
   }
 
+  protected def newPredicate(
+      expression: Expression,
+      inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
+    if (codegenEnabled) {
+      try {
+        GeneratePredicate.generate(expression, inputSchema)
+      } catch {
+        case NonFatal(e) =>
+          if (isTesting) {
+            throw e
+          } else {
+            log.error("Failed to generate predicate, fallback to interpreted", 
e)
+            InterpretedPredicate.create(expression, inputSchema)
+          }
+      }
+    } else {
+      InterpretedPredicate.create(expression, inputSchema)
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/217e4964/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
new file mode 100644
index 0000000..7321fc6
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNode.scala
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.{FullOuter, RightOuter, LeftOuter, 
JoinType}
+import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
+import org.apache.spark.util.collection.{BitSet, CompactBuffer}
+
+case class NestedLoopJoinNode(
+    conf: SQLConf,
+    left: LocalNode,
+    right: LocalNode,
+    buildSide: BuildSide,
+    joinType: JoinType,
+    condition: Option[Expression]) extends BinaryLocalNode(conf) {
+
+  override def output: Seq[Attribute] = {
+    joinType match {
+      case LeftOuter =>
+        left.output ++ right.output.map(_.withNullability(true))
+      case RightOuter =>
+        left.output.map(_.withNullability(true)) ++ right.output
+      case FullOuter =>
+        left.output.map(_.withNullability(true)) ++ 
right.output.map(_.withNullability(true))
+      case x =>
+        throw new IllegalArgumentException(
+          s"NestedLoopJoin should not take $x as the JoinType")
+    }
+  }
+
+  private[this] def genResultProjection: InternalRow => InternalRow = {
+    if (outputsUnsafeRows) {
+      UnsafeProjection.create(schema)
+    } else {
+      identity[InternalRow]
+    }
+  }
+
+  private[this] var currentRow: InternalRow = _
+
+  private[this] var iterator: Iterator[InternalRow] = _
+
+  override def open(): Unit = {
+    val (streamed, build) = buildSide match {
+      case BuildRight => (left, right)
+      case BuildLeft => (right, left)
+    }
+    build.open()
+    val buildRelation = new CompactBuffer[InternalRow]
+    while (build.next()) {
+      buildRelation += build.fetch().copy()
+    }
+    build.close()
+
+    val boundCondition =
+      newPredicate(condition.getOrElse(Literal(true)), left.output ++ 
right.output)
+
+    val leftNulls = new GenericMutableRow(left.output.size)
+    val rightNulls = new GenericMutableRow(right.output.size)
+    val joinedRow = new JoinedRow
+    val matchedBuildTuples = new BitSet(buildRelation.size)
+    val resultProj = genResultProjection
+    streamed.open()
+
+    // streamedRowMatches also contains null rows if using outer join
+    val streamedRowMatches: Iterator[InternalRow] = 
streamed.asIterator.flatMap { streamedRow =>
+      val matchedRows = new CompactBuffer[InternalRow]
+
+      var i = 0
+      var streamRowMatched = false
+
+      // Scan the build relation to look for matches for each streamed row
+      while (i < buildRelation.size) {
+        val buildRow = buildRelation(i)
+        buildSide match {
+          case BuildRight => joinedRow(streamedRow, buildRow)
+          case BuildLeft => joinedRow(buildRow, streamedRow)
+        }
+        if (boundCondition(joinedRow)) {
+          matchedRows += resultProj(joinedRow).copy()
+          streamRowMatched = true
+          matchedBuildTuples.set(i)
+        }
+        i += 1
+      }
+
+      // If this row had no matches and we're using outer join, join it with 
the null rows
+      if (!streamRowMatched) {
+        (joinType, buildSide) match {
+          case (LeftOuter | FullOuter, BuildRight) =>
+            matchedRows += resultProj(joinedRow(streamedRow, 
rightNulls)).copy()
+          case (RightOuter | FullOuter, BuildLeft) =>
+            matchedRows += resultProj(joinedRow(leftNulls, streamedRow)).copy()
+          case _ =>
+        }
+      }
+
+      matchedRows.iterator
+    }
+
+    // If we're using outer join, find rows on the build side that didn't 
match anything
+    // and join them with the null row
+    lazy val unmatchedBuildRows: Iterator[InternalRow] = {
+      var i = 0
+      buildRelation.filter { row =>
+        val r = !matchedBuildTuples.get(i)
+        i += 1
+        r
+      }.iterator
+    }
+    iterator = (joinType, buildSide) match {
+      case (RightOuter | FullOuter, BuildRight) =>
+        streamedRowMatches ++
+          unmatchedBuildRows.map { buildRow => resultProj(joinedRow(leftNulls, 
buildRow)) }
+      case (LeftOuter | FullOuter, BuildLeft) =>
+        streamedRowMatches ++
+          unmatchedBuildRows.map { buildRow => resultProj(joinedRow(buildRow, 
rightNulls)) }
+      case _ => streamedRowMatches
+    }
+  }
+
+  override def next(): Boolean = {
+    if (iterator.hasNext) {
+      currentRow = iterator.next()
+      true
+    } else {
+      false
+    }
+  }
+
+  override def fetch(): InternalRow = currentRow
+
+  override def close(): Unit = {
+    left.close()
+    right.close()
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/217e4964/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala
new file mode 100644
index 0000000..cfa7f3f
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/ExpandNodeSuite.scala
@@ -0,0 +1,51 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+class ExpandNodeSuite extends LocalNodeTest {
+
+  import testImplicits._
+
+  test("expand") {
+    val input = Seq((1, 1), (2, 2), (3, 3), (4, 4), (5, 5)).toDF("key", 
"value")
+    checkAnswer(
+      input,
+      node =>
+        ExpandNode(conf, Seq(
+          Seq(
+            input.col("key") + input.col("value"), input.col("key") - 
input.col("value")
+          ).map(_.expr),
+          Seq(
+            input.col("key") * input.col("value"), input.col("key") / 
input.col("value")
+          ).map(_.expr)
+        ), node.output, node),
+      Seq(
+        (2, 0),
+        (1, 1),
+        (4, 0),
+        (4, 1),
+        (6, 0),
+        (9, 1),
+        (8, 0),
+        (16, 1),
+        (10, 0),
+        (25, 1)
+      ).toDF().collect()
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/217e4964/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
index 43b6f06..78d8913 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/HashJoinNodeSuite.scala
@@ -24,20 +24,6 @@ class HashJoinNodeSuite extends LocalNodeTest {
 
   import testImplicits._
 
-  private def wrapForUnsafe(
-      f: (LocalNode, LocalNode) => LocalNode): (LocalNode, LocalNode) => 
LocalNode = {
-    if (conf.unsafeEnabled) {
-      (left: LocalNode, right: LocalNode) => {
-        val _left = ConvertToUnsafeNode(conf, left)
-        val _right = ConvertToUnsafeNode(conf, right)
-        val r = f(_left, _right)
-        ConvertToSafeNode(conf, r)
-      }
-    } else {
-      f
-    }
-  }
-
   def joinSuite(suiteName: String, confPairs: (String, String)*): Unit = {
     test(s"$suiteName: inner join with one match per row") {
       withSQLConf(confPairs: _*) {

http://git-wip-us.apache.org/repos/asf/spark/blob/217e4964/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
index b95d4ea..86dd280 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/LocalNodeTest.scala
@@ -27,6 +27,20 @@ class LocalNodeTest extends SparkFunSuite with 
SharedSQLContext {
 
   def conf: SQLConf = sqlContext.conf
 
+  protected def wrapForUnsafe(
+      f: (LocalNode, LocalNode) => LocalNode): (LocalNode, LocalNode) => 
LocalNode = {
+    if (conf.unsafeEnabled) {
+      (left: LocalNode, right: LocalNode) => {
+        val _left = ConvertToUnsafeNode(conf, left)
+        val _right = ConvertToUnsafeNode(conf, right)
+        val r = f(_left, _right)
+        ConvertToSafeNode(conf, r)
+      }
+    } else {
+      f
+    }
+  }
+
   /**
    * Runs the LocalNode and makes sure the answer matches the expected result.
    * @param input the input data to be used.

http://git-wip-us.apache.org/repos/asf/spark/blob/217e4964/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
new file mode 100644
index 0000000..b1ef26b
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/local/NestedLoopJoinNodeSuite.scala
@@ -0,0 +1,239 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql.execution.local
+
+import org.apache.spark.sql.SQLConf
+import org.apache.spark.sql.catalyst.plans.{FullOuter, LeftOuter, RightOuter}
+import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight, BuildSide}
+
+class NestedLoopJoinNodeSuite extends LocalNodeTest {
+
+  import testImplicits._
+
+  private def joinSuite(
+      suiteName: String, buildSide: BuildSide, confPairs: (String, String)*): 
Unit = {
+    test(s"$suiteName: left outer join") {
+      withSQLConf(confPairs: _*) {
+        checkAnswer2(
+          upperCaseData,
+          lowerCaseData,
+          wrapForUnsafe(
+            (node1, node2) => NestedLoopJoinNode(
+              conf,
+              node1,
+              node2,
+              buildSide,
+              LeftOuter,
+              Some((upperCaseData.col("N") === lowerCaseData.col("n")).expr))
+          ),
+          upperCaseData.join(lowerCaseData, $"n" === $"N", "left").collect())
+
+        checkAnswer2(
+          upperCaseData,
+          lowerCaseData,
+          wrapForUnsafe(
+            (node1, node2) => NestedLoopJoinNode(
+              conf,
+              node1,
+              node2,
+              buildSide,
+              LeftOuter,
+              Some(
+                (upperCaseData.col("N") === lowerCaseData.col("n") &&
+                  lowerCaseData.col("n") > 1).expr))
+          ),
+          upperCaseData.join(lowerCaseData, $"n" === $"N" && $"n" > 1, 
"left").collect())
+
+        checkAnswer2(
+          upperCaseData,
+          lowerCaseData,
+          wrapForUnsafe(
+            (node1, node2) => NestedLoopJoinNode(
+              conf,
+              node1,
+              node2,
+              buildSide,
+              LeftOuter,
+              Some(
+                (upperCaseData.col("N") === lowerCaseData.col("n") &&
+                  upperCaseData.col("N") > 1).expr))
+          ),
+          upperCaseData.join(lowerCaseData, $"n" === $"N" && $"N" > 1, 
"left").collect())
+
+        checkAnswer2(
+          upperCaseData,
+          lowerCaseData,
+          wrapForUnsafe(
+            (node1, node2) => NestedLoopJoinNode(
+              conf,
+              node1,
+              node2,
+              buildSide,
+              LeftOuter,
+              Some(
+                (upperCaseData.col("N") === lowerCaseData.col("n") &&
+                  lowerCaseData.col("l") > upperCaseData.col("L")).expr))
+          ),
+          upperCaseData.join(lowerCaseData, $"n" === $"N" && $"l" > $"L", 
"left").collect())
+      }
+    }
+
+    test(s"$suiteName: right outer join") {
+      withSQLConf(confPairs: _*) {
+        checkAnswer2(
+          lowerCaseData,
+          upperCaseData,
+          wrapForUnsafe(
+            (node1, node2) => NestedLoopJoinNode(
+              conf,
+              node1,
+              node2,
+              buildSide,
+              RightOuter,
+              Some((lowerCaseData.col("n") === upperCaseData.col("N")).expr))
+          ),
+          lowerCaseData.join(upperCaseData, $"n" === $"N", "right").collect())
+
+        checkAnswer2(
+          lowerCaseData,
+          upperCaseData,
+          wrapForUnsafe(
+            (node1, node2) => NestedLoopJoinNode(
+              conf,
+              node1,
+              node2,
+              buildSide,
+              RightOuter,
+              Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+                lowerCaseData.col("n") > 1).expr))
+          ),
+          lowerCaseData.join(upperCaseData, $"n" === $"N" && $"n" > 1, 
"right").collect())
+
+        checkAnswer2(
+          lowerCaseData,
+          upperCaseData,
+          wrapForUnsafe(
+            (node1, node2) => NestedLoopJoinNode(
+              conf,
+              node1,
+              node2,
+              buildSide,
+              RightOuter,
+              Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+                upperCaseData.col("N") > 1).expr))
+          ),
+          lowerCaseData.join(upperCaseData, $"n" === $"N" && $"N" > 1, 
"right").collect())
+
+        checkAnswer2(
+          lowerCaseData,
+          upperCaseData,
+          wrapForUnsafe(
+            (node1, node2) => NestedLoopJoinNode(
+              conf,
+              node1,
+              node2,
+              buildSide,
+              RightOuter,
+              Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+                lowerCaseData.col("l") > upperCaseData.col("L")).expr))
+          ),
+          lowerCaseData.join(upperCaseData, $"n" === $"N" && $"l" > $"L", 
"right").collect())
+      }
+    }
+
+    test(s"$suiteName: full outer join") {
+      withSQLConf(confPairs: _*) {
+        checkAnswer2(
+          lowerCaseData,
+          upperCaseData,
+          wrapForUnsafe(
+            (node1, node2) => NestedLoopJoinNode(
+              conf,
+              node1,
+              node2,
+              buildSide,
+              FullOuter,
+              Some((lowerCaseData.col("n") === upperCaseData.col("N")).expr))
+          ),
+          lowerCaseData.join(upperCaseData, $"n" === $"N", "full").collect())
+
+        checkAnswer2(
+          lowerCaseData,
+          upperCaseData,
+          wrapForUnsafe(
+            (node1, node2) => NestedLoopJoinNode(
+              conf,
+              node1,
+              node2,
+              buildSide,
+              FullOuter,
+              Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+                lowerCaseData.col("n") > 1).expr))
+          ),
+          lowerCaseData.join(upperCaseData, $"n" === $"N" && $"n" > 1, 
"full").collect())
+
+        checkAnswer2(
+          lowerCaseData,
+          upperCaseData,
+          wrapForUnsafe(
+            (node1, node2) => NestedLoopJoinNode(
+              conf,
+              node1,
+              node2,
+              buildSide,
+              FullOuter,
+              Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+                upperCaseData.col("N") > 1).expr))
+          ),
+          lowerCaseData.join(upperCaseData, $"n" === $"N" && $"N" > 1, 
"full").collect())
+
+        checkAnswer2(
+          lowerCaseData,
+          upperCaseData,
+          wrapForUnsafe(
+            (node1, node2) => NestedLoopJoinNode(
+              conf,
+              node1,
+              node2,
+              buildSide,
+              FullOuter,
+              Some((lowerCaseData.col("n") === upperCaseData.col("N") &&
+                lowerCaseData.col("l") > upperCaseData.col("L")).expr))
+          ),
+          lowerCaseData.join(upperCaseData, $"n" === $"N" && $"l" > $"L", 
"full").collect())
+      }
+    }
+  }
+
+  joinSuite(
+    "general-build-left",
+    BuildLeft,
+    SQLConf.CODEGEN_ENABLED.key -> "false", SQLConf.UNSAFE_ENABLED.key -> 
"false")
+  joinSuite(
+    "general-build-right",
+    BuildRight,
+    SQLConf.CODEGEN_ENABLED.key -> "false", SQLConf.UNSAFE_ENABLED.key -> 
"false")
+  joinSuite(
+    "tungsten-build-left",
+    BuildLeft,
+    SQLConf.CODEGEN_ENABLED.key -> "true", SQLConf.UNSAFE_ENABLED.key -> 
"true")
+  joinSuite(
+    "tungsten-build-right",
+    BuildRight,
+    SQLConf.CODEGEN_ENABLED.key -> "true", SQLConf.UNSAFE_ENABLED.key -> 
"true")
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to