This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 034cb13  [SPARK-27778][PYTHON] Fix toPandas conversion of empty 
DataFrame with Arrow enabled
034cb13 is described below

commit 034cb139a1eadb455cb6909a3ac3e73a509d324e
Author: David Vogelbacher <dvogelbac...@palantir.com>
AuthorDate: Wed May 22 13:21:26 2019 +0900

    [SPARK-27778][PYTHON] Fix toPandas conversion of empty DataFrame with Arrow 
enabled
    
    ## What changes were proposed in this pull request?
    https://github.com/apache/spark/pull/22275 introduced a performance 
improvement where we send partitions out of order to python and then, as a last 
step, send the partition order as well.
    However, if there are no partitions we will never send the partition order 
and we will get an "EofError" on the python side.
    This PR fixes this by also sending the partition order if there are no 
partitions present.
    
    ## How was this patch tested?
    New unit test added.
    
    Closes #24650 from dvogelbacher/dv/fixNoPartitionArrowConversion.
    
    Authored-by: David Vogelbacher <dvogelbac...@palantir.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 python/pyspark/sql/tests/test_arrow.py             |  8 +++++
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 34 +++++++++-------------
 2 files changed, 21 insertions(+), 21 deletions(-)

diff --git a/python/pyspark/sql/tests/test_arrow.py 
b/python/pyspark/sql/tests/test_arrow.py
index 22578cb..f5b5ad9 100644
--- a/python/pyspark/sql/tests/test_arrow.py
+++ b/python/pyspark/sql/tests/test_arrow.py
@@ -183,6 +183,14 @@ class ArrowTests(ReusedSQLTestCase):
         self.assertEqual(pdf.columns[0], "i")
         self.assertTrue(pdf.empty)
 
+    def test_no_partition_frame(self):
+        schema = StructType([StructField("field1", StringType(), True)])
+        df = self.spark.createDataFrame(self.sc.emptyRDD(), schema)
+        pdf = df.toPandas()
+        self.assertEqual(len(pdf.columns), 1)
+        self.assertEqual(pdf.columns[0], "field1")
+        self.assertTrue(pdf.empty)
+
     def _createDataFrame_toggle(self, pdf, schema=None):
         with self.sql_conf({"spark.sql.execution.arrow.enabled": False}):
             df_no_arrow = self.spark.createDataFrame(pdf, schema=schema)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 05436ca..d5f1edb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -3299,15 +3299,12 @@ class Dataset[T] private[sql](
       PythonRDD.serveToStream("serve-Arrow") { outputStream =>
         val out = new DataOutputStream(outputStream)
         val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId)
-        val arrowBatchRdd = toArrowBatchRdd(plan)
-        val numPartitions = arrowBatchRdd.partitions.length
 
         // Batches ordered by (index of partition, batch index in that 
partition) tuple
         val batchOrder = ArrayBuffer.empty[(Int, Int)]
-        var partitionCount = 0
 
         // Handler to eagerly write batches to Python as they arrive, 
un-ordered
-        def handlePartitionBatches(index: Int, arrowBatches: 
Array[Array[Byte]]): Unit = {
+        val handlePartitionBatches = (index: Int, arrowBatches: 
Array[Array[Byte]]) =>
           if (arrowBatches.nonEmpty) {
             // Write all batches (can be more than 1) in the partition, store 
the batch order tuple
             batchWriter.writeBatches(arrowBatches.iterator)
@@ -3315,27 +3312,22 @@ class Dataset[T] private[sql](
               partitionBatchIndex => batchOrder.append((index, 
partitionBatchIndex))
             }
           }
-          partitionCount += 1
-
-          // After last batch, end the stream and write batch order indices
-          if (partitionCount == numPartitions) {
-            batchWriter.end()
-            out.writeInt(batchOrder.length)
-            // Sort by (index of partition, batch index in that partition) 
tuple to get the
-            // overall_batch_index from 0 to N-1 batches, which can be used to 
put the
-            // transferred batches in the correct order
-            batchOrder.zipWithIndex.sortBy(_._1).foreach { case (_, 
overallBatchIndex) =>
-              out.writeInt(overallBatchIndex)
-            }
-            out.flush()
-          }
-        }
 
+        val arrowBatchRdd = toArrowBatchRdd(plan)
         sparkSession.sparkContext.runJob(
           arrowBatchRdd,
-          (ctx: TaskContext, it: Iterator[Array[Byte]]) => it.toArray,
-          0 until numPartitions,
+          (it: Iterator[Array[Byte]]) => it.toArray,
           handlePartitionBatches)
+
+        // After processing all partitions, end the stream and write batch 
order indices
+        batchWriter.end()
+        out.writeInt(batchOrder.length)
+        // Sort by (index of partition, batch index in that partition) tuple 
to get the
+        // overall_batch_index from 0 to N-1 batches, which can be used to put 
the
+        // transferred batches in the correct order
+        batchOrder.zipWithIndex.sortBy(_._1).foreach { case (_, 
overallBatchIndex) =>
+          out.writeInt(overallBatchIndex)
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to