This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new babde31 [SPARK-38075][SQL] Fix `hasNext` in `HiveScriptTransformationExec`'s process output iterator babde31 is described below commit babde316d86e06800d3fe90a024e889349d63749 Author: Bruce Robbins <bersprock...@gmail.com> AuthorDate: Mon Jan 31 10:44:53 2022 -0800 [SPARK-38075][SQL] Fix `hasNext` in `HiveScriptTransformationExec`'s process output iterator ### What changes were proposed in this pull request? Fix hasNext in HiveScriptTransformationExec's process output iterator to always return false if it had previously returned false. ### Why are the changes needed? When hasNext on the process output iterator returns false, it leaves the iterator in a state (i.e., scriptOutputWritable is not null) such that the next call returns true. The Guava Ordering used in TakeOrderedAndProjectExec will call hasNext on the process output iterator even after an earlier call had returned false. This results in fake rows when script transform is used with `order by` and `limit`. For example: ``` create or replace temp view t as select * from values (1), (2), (3) as t(a); select transform(a) USING 'cat' AS (a int) FROM t order by a limit 10; ``` This returns: ``` NULL NULL NULL 1 2 3 ``` ### Does this PR introduce _any_ user-facing change? No, other than removing the correctness issue. ### How was this patch tested? New unit test. Closes #35368 from bersprockets/script_transformation_issue. Authored-by: Bruce Robbins <bersprock...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit 46885bef1a1254853ce9165862e3bd8f3a15071f) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../hive/execution/HiveScriptTransformationExec.scala | 7 ++++++- .../hive/execution/HiveScriptTransformationSuite.scala | 17 +++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala index 219b1a2..beb5583 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala @@ -64,7 +64,7 @@ private[hive] case class HiveScriptTransformationExec( outputSoi: StructObjectInspector, hadoopConf: Configuration): Iterator[InternalRow] = { new Iterator[InternalRow] with HiveInspectors { - var curLine: String = null + private var completed = false val scriptOutputStream = new DataInputStream(inputStream) val scriptOutputReader = @@ -78,6 +78,9 @@ private[hive] case class HiveScriptTransformationExec( lazy val unwrappers = outputSoi.getAllStructFieldRefs.asScala.map(unwrapperFor) override def hasNext: Boolean = { + if (completed) { + return false + } try { if (scriptOutputWritable == null) { scriptOutputWritable = reusedWritableObject @@ -85,6 +88,7 @@ private[hive] case class HiveScriptTransformationExec( if (scriptOutputReader != null) { if (scriptOutputReader.next(scriptOutputWritable) <= 0) { checkFailureAndPropagate(writerThread, null, proc, stderrBuffer) + completed = true return false } } else { @@ -97,6 +101,7 @@ private[hive] case class HiveScriptTransformationExec( // there can be a lag between EOF being written out and the process // being terminated. So explicitly waiting for the process to be done. checkFailureAndPropagate(writerThread, null, proc, stderrBuffer) + completed = true return false } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala index 24743e8..52c3652 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala @@ -619,4 +619,21 @@ class HiveScriptTransformationSuite extends BaseScriptTransformationSuite with T assert(e.contains("java.lang.ArithmeticException: long overflow")) } } + + test("SPARK-38075: ORDER BY with LIMIT should not add fake rows") { + withTempView("v") { + val df = Seq((1), (2), (3)).toDF("a") + df.createTempView("v") + checkAnswer(sql( + """ + |SELECT TRANSFORM(a) + | USING 'cat' AS (a) + |FROM v + |ORDER BY a + |LIMIT 10 + |""".stripMargin), + identity, + Row("1") :: Row("2") :: Row("3") :: Nil) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org