Repository: spark
Updated Branches:
  refs/heads/master f0169a1c6 -> 6b44c4d63


[SPARK-20534][SQL] Make outer generate exec return empty rows

## What changes were proposed in this pull request?
Generate exec does not produce `null` values if the generator for the input row 
is empty and the generate operates in outer mode without join. This is caused 
by the fact that the `join=false` code path is different from the `join=true` 
code path, and that the `join=false` code path did deal with outer properly. 
This PR addresses this issue.

## How was this patch tested?
Updated `outer*` tests in `GeneratorFunctionSuite`.

Author: Herman van Hovell <hvanhov...@databricks.com>

Closes #17810 from hvanhovell/SPARK-20534.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b44c4d6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b44c4d6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b44c4d6

Branch: refs/heads/master
Commit: 6b44c4d63ab14162e338c5f1ac77333956870a90
Parents: f0169a1
Author: Herman van Hovell <hvanhov...@databricks.com>
Authored: Mon May 1 09:46:35 2017 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Mon May 1 09:46:35 2017 -0700

----------------------------------------------------------------------
 .../sql/catalyst/optimizer/Optimizer.scala      |  3 +-
 .../plans/logical/basicLogicalOperators.scala   |  2 +-
 .../spark/sql/execution/GenerateExec.scala      | 33 +++++++++++---------
 .../spark/sql/GeneratorFunctionSuite.scala      | 12 +++----
 4 files changed, 26 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6b44c4d6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index dd768d1..f2b9764 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -441,8 +441,7 @@ object ColumnPruning extends Rule[LogicalPlan] {
       g.copy(child = prunedChild(g.child, g.references))
 
     // Turn off `join` for Generate if no column from it's child is used
-    case p @ Project(_, g: Generate)
-        if g.join && !g.outer && p.references.subsetOf(g.generatedSet) =>
+    case p @ Project(_, g: Generate) if g.join && 
p.references.subsetOf(g.generatedSet) =>
       p.copy(child = g.copy(join = false))
 
     // Eliminate unneeded attributes from right side of a Left Existence Join.

http://git-wip-us.apache.org/repos/asf/spark/blob/6b44c4d6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 3ad757e..f663d7b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -83,7 +83,7 @@ case class Project(projectList: Seq[NamedExpression], child: 
LogicalPlan) extend
  * @param join  when true, each output row is implicitly joined with the input 
tuple that produced
  *              it.
  * @param outer when true, each input row will be output at least once, even 
if the output of the
- *              given `generator` is empty. `outer` has no effect when `join` 
is false.
+ *              given `generator` is empty.
  * @param qualifier Qualifier for the attributes of generator(UDTF)
  * @param generatorOutput The output schema of the Generator.
  * @param child Children logical plan node

http://git-wip-us.apache.org/repos/asf/spark/blob/6b44c4d6/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
index f87d058..1812a11 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.types.{ArrayType, DataType, 
MapType, StructType}
 private[execution] sealed case class LazyIterator(func: () => 
TraversableOnce[InternalRow])
   extends Iterator[InternalRow] {
 
-  lazy val results = func().toIterator
+  lazy val results: Iterator[InternalRow] = func().toIterator
   override def hasNext: Boolean = results.hasNext
   override def next(): InternalRow = results.next()
 }
@@ -50,7 +50,7 @@ private[execution] sealed case class LazyIterator(func: () => 
TraversableOnce[In
  * @param join  when true, each output row is implicitly joined with the input 
tuple that produced
  *              it.
  * @param outer when true, each input row will be output at least once, even 
if the output of the
- *              given `generator` is empty. `outer` has no effect when `join` 
is false.
+ *              given `generator` is empty.
  * @param generatorOutput the qualified output attributes of the generator of 
this node, which
  *                        constructed in analysis phase, and we can not change 
it, as the
  *                        parent node bound with it already.
@@ -78,15 +78,15 @@ case class GenerateExec(
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
-  val boundGenerator = BindReferences.bindReference(generator, child.output)
+  val boundGenerator: Generator = BindReferences.bindReference(generator, 
child.output)
 
   protected override def doExecute(): RDD[InternalRow] = {
     // boundGenerator.terminate() should be triggered after all of the rows in 
the partition
-    val rows = if (join) {
-      child.execute().mapPartitionsInternal { iter =>
-        val generatorNullRow = new 
GenericInternalRow(generator.elementSchema.length)
+    val numOutputRows = longMetric("numOutputRows")
+    child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
+      val generatorNullRow = new 
GenericInternalRow(generator.elementSchema.length)
+      val rows = if (join) {
         val joinedRow = new JoinedRow
-
         iter.flatMap { row =>
           // we should always set the left (child output)
           joinedRow.withLeft(row)
@@ -101,18 +101,21 @@ case class GenerateExec(
           // keep it the same as Hive does
           joinedRow.withRight(row)
         }
+      } else {
+        iter.flatMap { row =>
+          val outputRows = boundGenerator.eval(row)
+          if (outer && outputRows.isEmpty) {
+            Seq(generatorNullRow)
+          } else {
+            outputRows
+          }
+        } ++ LazyIterator(boundGenerator.terminate)
       }
-    } else {
-      child.execute().mapPartitionsInternal { iter =>
-        iter.flatMap(boundGenerator.eval) ++ 
LazyIterator(boundGenerator.terminate)
-      }
-    }
 
-    val numOutputRows = longMetric("numOutputRows")
-    rows.mapPartitionsWithIndexInternal { (index, iter) =>
+      // Convert the rows to unsafe rows.
       val proj = UnsafeProjection.create(output, output)
       proj.initialize(index)
-      iter.map { r =>
+      rows.map { r =>
         numOutputRows += 1
         proj(r)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/6b44c4d6/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
index cef5bbf..b9871af 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/GeneratorFunctionSuite.scala
@@ -91,7 +91,7 @@ class GeneratorFunctionSuite extends QueryTest with 
SharedSQLContext {
     val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")
     checkAnswer(
       df.select(explode_outer('intList)),
-      Row(1) :: Row(2) :: Row(3) :: Nil)
+      Row(1) :: Row(2) :: Row(3) :: Row(null) :: Nil)
   }
 
   test("single posexplode") {
@@ -105,7 +105,7 @@ class GeneratorFunctionSuite extends QueryTest with 
SharedSQLContext {
     val df = Seq((1, Seq(1, 2, 3)), (2, Seq())).toDF("a", "intList")
     checkAnswer(
       df.select(posexplode_outer('intList)),
-      Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Nil)
+      Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Row(null, null) :: Nil)
   }
 
   test("explode and other columns") {
@@ -161,7 +161,7 @@ class GeneratorFunctionSuite extends QueryTest with 
SharedSQLContext {
 
     checkAnswer(
       df.select(explode_outer('intList).as('int)).select('int),
-      Row(1) :: Row(2) :: Row(3) :: Nil)
+      Row(1) :: Row(2) :: Row(3) :: Row(null) :: Nil)
 
     checkAnswer(
       df.select(explode('intList).as('int)).select(sum('int)),
@@ -182,7 +182,7 @@ class GeneratorFunctionSuite extends QueryTest with 
SharedSQLContext {
 
     checkAnswer(
       df.select(explode_outer('map)),
-      Row("a", "b") :: Row("c", "d") :: Nil)
+      Row("a", "b") :: Row(null, null) :: Row("c", "d") :: Nil)
   }
 
   test("explode on map with aliases") {
@@ -198,7 +198,7 @@ class GeneratorFunctionSuite extends QueryTest with 
SharedSQLContext {
 
     checkAnswer(
       df.select(explode_outer('map).as("key1" :: "value1" :: 
Nil)).select("key1", "value1"),
-      Row("a", "b") :: Nil)
+      Row("a", "b") :: Row(null, null) :: Nil)
   }
 
   test("self join explode") {
@@ -279,7 +279,7 @@ class GeneratorFunctionSuite extends QueryTest with 
SharedSQLContext {
     )
     checkAnswer(
       df2.selectExpr("inline_outer(col1)"),
-      Row(3, "4") :: Row(5, "6") :: Nil
+      Row(null, null) :: Row(3, "4") :: Row(5, "6") :: Nil
     )
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to