This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 3a5b737526a [SPARK-44287][SQL][FOLLOWUP] Do not trigger execution too 
early
3a5b737526a is described below

commit 3a5b737526af8b33f7c456d73133729dc159c0f6
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Mon Jul 31 09:16:02 2023 +0900

    [SPARK-44287][SQL][FOLLOWUP] Do not trigger execution too early
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/41839 , to fix 
an unintentional change. That PR added an optimization to return an empty 
iterator directly if the input iterator is empty. However, checking 
`inputIterator.hasNext` may trigger query execution, which is different than 
before. It should be completely lazy and wait for the root operator's iterator 
to trigger the execution.
    
    ### Why are the changes needed?
    
    fix unintentional change
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #42226 from cloud-fan/fo.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 0f9cca5b419b09f25c45904aa81bf0515f9e7c44)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../sql/execution/ColumnarEvaluatorFactory.scala   | 57 ++++++++++------------
 1 file changed, 26 insertions(+), 31 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala
index 949722d3cc2..960d4b74a1b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala
@@ -70,42 +70,37 @@ class RowToColumnarEvaluatorFactory(
         inputs: Iterator[InternalRow]*): Iterator[ColumnarBatch] = {
       assert(inputs.length == 1)
       val rowIterator = inputs.head
+      new Iterator[ColumnarBatch] {
+        private lazy val converters = new RowToColumnConverter(schema)
+        private lazy val vectors: Seq[WritableColumnVector] = if 
(enableOffHeapColumnVector) {
+          OffHeapColumnVector.allocateColumns(numRows, schema)
+        } else {
+          OnHeapColumnVector.allocateColumns(numRows, schema)
+        }
+        private lazy val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)
 
-      if (rowIterator.hasNext) {
-        new Iterator[ColumnarBatch] {
-          private val converters = new RowToColumnConverter(schema)
-          private val vectors: Seq[WritableColumnVector] = if 
(enableOffHeapColumnVector) {
-            OffHeapColumnVector.allocateColumns(numRows, schema)
-          } else {
-            OnHeapColumnVector.allocateColumns(numRows, schema)
-          }
-          private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)
-
-          TaskContext.get().addTaskCompletionListener[Unit] { _ =>
-            cb.close()
-          }
+        TaskContext.get().addTaskCompletionListener[Unit] { _ =>
+          cb.close()
+        }
 
-          override def hasNext: Boolean = {
-            rowIterator.hasNext
-          }
+        override def hasNext: Boolean = {
+          rowIterator.hasNext
+        }
 
-          override def next(): ColumnarBatch = {
-            cb.setNumRows(0)
-            vectors.foreach(_.reset())
-            var rowCount = 0
-            while (rowCount < numRows && rowIterator.hasNext) {
-              val row = rowIterator.next()
-              converters.convert(row, vectors.toArray)
-              rowCount += 1
-            }
-            cb.setNumRows(rowCount)
-            numInputRows += rowCount
-            numOutputBatches += 1
-            cb
+        override def next(): ColumnarBatch = {
+          cb.setNumRows(0)
+          vectors.foreach(_.reset())
+          var rowCount = 0
+          while (rowCount < numRows && rowIterator.hasNext) {
+            val row = rowIterator.next()
+            converters.convert(row, vectors.toArray)
+            rowCount += 1
           }
+          cb.setNumRows(rowCount)
+          numInputRows += rowCount
+          numOutputBatches += 1
+          cb
         }
-      } else {
-        Iterator.empty
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to