This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new babde31  [SPARK-38075][SQL] Fix `hasNext` in 
`HiveScriptTransformationExec`'s process output iterator
babde31 is described below

commit babde316d86e06800d3fe90a024e889349d63749
Author: Bruce Robbins <bersprock...@gmail.com>
AuthorDate: Mon Jan 31 10:44:53 2022 -0800

    [SPARK-38075][SQL] Fix `hasNext` in `HiveScriptTransformationExec`'s 
process output iterator
    
    ### What changes were proposed in this pull request?
    
    Fix hasNext in HiveScriptTransformationExec's process output iterator to 
always return false if it had previously returned false.
    
    ### Why are the changes needed?
    
    When hasNext on the process output iterator returns false, it leaves the 
iterator in a state (i.e., scriptOutputWritable is not null) such that the next 
call returns true.
    
    The Guava Ordering used in TakeOrderedAndProjectExec will call hasNext on 
the process output iterator even after an earlier call had returned false. This 
results in fake rows when script transform is used with `order by` and `limit`. 
For example:
    
    ```
    create or replace temp view t as
    select * from values
    (1),
    (2),
    (3)
    as t(a);
    
    select transform(a)
    USING 'cat' AS (a int)
    FROM t order by a limit 10;
    ```
    This returns:
    ```
    NULL
    NULL
    NULL
    1
    2
    3
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, other than removing the correctness issue.
    
    ### How was this patch tested?
    
    New unit test.
    
    Closes #35368 from bersprockets/script_transformation_issue.
    
    Authored-by: Bruce Robbins <bersprock...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit 46885bef1a1254853ce9165862e3bd8f3a15071f)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../hive/execution/HiveScriptTransformationExec.scala   |  7 ++++++-
 .../hive/execution/HiveScriptTransformationSuite.scala  | 17 +++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala
index 219b1a2..beb5583 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala
@@ -64,7 +64,7 @@ private[hive] case class HiveScriptTransformationExec(
       outputSoi: StructObjectInspector,
       hadoopConf: Configuration): Iterator[InternalRow] = {
     new Iterator[InternalRow] with HiveInspectors {
-      var curLine: String = null
+      private var completed = false
       val scriptOutputStream = new DataInputStream(inputStream)
 
       val scriptOutputReader =
@@ -78,6 +78,9 @@ private[hive] case class HiveScriptTransformationExec(
       lazy val unwrappers = 
outputSoi.getAllStructFieldRefs.asScala.map(unwrapperFor)
 
       override def hasNext: Boolean = {
+        if (completed) {
+          return false
+        }
         try {
           if (scriptOutputWritable == null) {
             scriptOutputWritable = reusedWritableObject
@@ -85,6 +88,7 @@ private[hive] case class HiveScriptTransformationExec(
             if (scriptOutputReader != null) {
               if (scriptOutputReader.next(scriptOutputWritable) <= 0) {
                 checkFailureAndPropagate(writerThread, null, proc, 
stderrBuffer)
+                completed = true
                 return false
               }
             } else {
@@ -97,6 +101,7 @@ private[hive] case class HiveScriptTransformationExec(
                   // there can be a lag between EOF being written out and the 
process
                   // being terminated. So explicitly waiting for the process 
to be done.
                   checkFailureAndPropagate(writerThread, null, proc, 
stderrBuffer)
+                  completed = true
                   return false
               }
             }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala
index 24743e8..52c3652 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala
@@ -619,4 +619,21 @@ class HiveScriptTransformationSuite extends 
BaseScriptTransformationSuite with T
       assert(e.contains("java.lang.ArithmeticException: long overflow"))
     }
   }
+
+  test("SPARK-38075: ORDER BY with LIMIT should not add fake rows") {
+    withTempView("v") {
+      val df = Seq((1), (2), (3)).toDF("a")
+      df.createTempView("v")
+      checkAnswer(sql(
+        """
+          |SELECT TRANSFORM(a)
+          |  USING 'cat' AS (a)
+          |FROM v
+          |ORDER BY a
+          |LIMIT 10
+          |""".stripMargin),
+        identity,
+        Row("1") :: Row("2") :: Row("3") :: Nil)
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to