This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6dcf09b  [SPARK-27971][SQL][R] MapPartitionsInRWithArrowExec.evaluate 
shouldn't eagerly read the first batch
6dcf09b is described below

commit 6dcf09becc24f47fc6487a57692bd4537983e1b1
Author: HyukjinKwon <gurwls...@apache.org>
AuthorDate: Sun Jun 9 11:40:20 2019 +0900

    [SPARK-27971][SQL][R] MapPartitionsInRWithArrowExec.evaluate shouldn't 
eagerly read the first batch
    
    ## What changes were proposed in this pull request?
    
    This PR is the same fix as https://github.com/apache/spark/pull/24816 but 
in vectorized `dapply` in SparkR.
    
    ## How was this patch tested?
    
    Manually tested.
    
    Closes #24818 from HyukjinKwon/SPARK-27971.
    
    Authored-by: HyukjinKwon <gurwls...@apache.org>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../org/apache/spark/sql/execution/objects.scala   | 27 ++++------------------
 1 file changed, 5 insertions(+), 22 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
index bedfa9c..202cbd0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala
@@ -243,28 +243,11 @@ case class MapPartitionsInRWithArrowExec(
       // binary in a batch due to the limitation of R API. See also ARROW-4512.
       val columnarBatchIter = runner.compute(batchIter, -1)
       val outputProject = UnsafeProjection.create(output, output)
-      new Iterator[InternalRow] {
-
-        private var currentIter = if (columnarBatchIter.hasNext) {
-          val batch = columnarBatchIter.next()
-          val actualDataTypes = (0 until batch.numCols()).map(i => 
batch.column(i).dataType())
-          assert(outputTypes == actualDataTypes, "Invalid schema from 
dapply(): " +
-            s"expected ${outputTypes.mkString(", ")}, got 
${actualDataTypes.mkString(", ")}")
-          batch.rowIterator.asScala
-        } else {
-          Iterator.empty
-        }
-
-        override def hasNext: Boolean = currentIter.hasNext || {
-          if (columnarBatchIter.hasNext) {
-            currentIter = columnarBatchIter.next().rowIterator.asScala
-            hasNext
-          } else {
-            false
-          }
-        }
-
-        override def next(): InternalRow = currentIter.next()
+      columnarBatchIter.flatMap { batch =>
+        val actualDataTypes = (0 until batch.numCols()).map(i => 
batch.column(i).dataType())
+        assert(outputTypes == actualDataTypes, "Invalid schema from dapply(): 
" +
+          s"expected ${outputTypes.mkString(", ")}, got 
${actualDataTypes.mkString(", ")}")
+        batch.rowIterator.asScala
       }.map(outputProject)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to