This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 034cb13 [SPARK-27778][PYTHON] Fix toPandas conversion of empty DataFrame with Arrow enabled 034cb13 is described below commit 034cb139a1eadb455cb6909a3ac3e73a509d324e Author: David Vogelbacher <dvogelbac...@palantir.com> AuthorDate: Wed May 22 13:21:26 2019 +0900 [SPARK-27778][PYTHON] Fix toPandas conversion of empty DataFrame with Arrow enabled ## What changes were proposed in this pull request? https://github.com/apache/spark/pull/22275 introduced a performance improvement where we send partitions out of order to python and then, as a last step, send the partition order as well. However, if there are no partitions we will never send the partition order and we will get an "EofError" on the python side. This PR fixes this by also sending the partition order if there are no partitions present. ## How was this patch tested? New unit test added. Closes #24650 from dvogelbacher/dv/fixNoPartitionArrowConversion. Authored-by: David Vogelbacher <dvogelbac...@palantir.com> Signed-off-by: HyukjinKwon <gurwls...@apache.org> --- python/pyspark/sql/tests/test_arrow.py | 8 +++++ .../main/scala/org/apache/spark/sql/Dataset.scala | 34 +++++++++------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/python/pyspark/sql/tests/test_arrow.py b/python/pyspark/sql/tests/test_arrow.py index 22578cb..f5b5ad9 100644 --- a/python/pyspark/sql/tests/test_arrow.py +++ b/python/pyspark/sql/tests/test_arrow.py @@ -183,6 +183,14 @@ class ArrowTests(ReusedSQLTestCase): self.assertEqual(pdf.columns[0], "i") self.assertTrue(pdf.empty) + def test_no_partition_frame(self): + schema = StructType([StructField("field1", StringType(), True)]) + df = self.spark.createDataFrame(self.sc.emptyRDD(), schema) + pdf = df.toPandas() + self.assertEqual(len(pdf.columns), 1) + self.assertEqual(pdf.columns[0], "field1") + self.assertTrue(pdf.empty) + def _createDataFrame_toggle(self, pdf, schema=None): with self.sql_conf({"spark.sql.execution.arrow.enabled": False}): df_no_arrow = self.spark.createDataFrame(pdf, schema=schema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 05436ca..d5f1edb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -3299,15 +3299,12 @@ class Dataset[T] private[sql]( PythonRDD.serveToStream("serve-Arrow") { outputStream => val out = new DataOutputStream(outputStream) val batchWriter = new ArrowBatchStreamWriter(schema, out, timeZoneId) - val arrowBatchRdd = toArrowBatchRdd(plan) - val numPartitions = arrowBatchRdd.partitions.length // Batches ordered by (index of partition, batch index in that partition) tuple val batchOrder = ArrayBuffer.empty[(Int, Int)] - var partitionCount = 0 // Handler to eagerly write batches to Python as they arrive, un-ordered - def handlePartitionBatches(index: Int, arrowBatches: Array[Array[Byte]]): Unit = { + val handlePartitionBatches = (index: Int, arrowBatches: Array[Array[Byte]]) => if (arrowBatches.nonEmpty) { // Write all batches (can be more than 1) in the partition, store the batch order tuple batchWriter.writeBatches(arrowBatches.iterator) @@ -3315,27 +3312,22 @@ class Dataset[T] private[sql]( partitionBatchIndex => batchOrder.append((index, partitionBatchIndex)) } } - partitionCount += 1 - - // After last batch, end the stream and write batch order indices - if (partitionCount == numPartitions) { - batchWriter.end() - out.writeInt(batchOrder.length) - // Sort by (index of partition, batch index in that partition) tuple to get the - // overall_batch_index from 0 to N-1 batches, which can be used to put the - // transferred batches in the correct order - batchOrder.zipWithIndex.sortBy(_._1).foreach { case (_, overallBatchIndex) => - out.writeInt(overallBatchIndex) - } - out.flush() - } - } + val arrowBatchRdd = toArrowBatchRdd(plan) sparkSession.sparkContext.runJob( arrowBatchRdd, - (ctx: TaskContext, it: Iterator[Array[Byte]]) => it.toArray, - 0 until numPartitions, + (it: Iterator[Array[Byte]]) => it.toArray, handlePartitionBatches) + + // After processing all partitions, end the stream and write batch order indices + batchWriter.end() + out.writeInt(batchOrder.length) + // Sort by (index of partition, batch index in that partition) tuple to get the + // overall_batch_index from 0 to N-1 batches, which can be used to put the + // transferred batches in the correct order + batchOrder.zipWithIndex.sortBy(_._1).foreach { case (_, overallBatchIndex) => + out.writeInt(overallBatchIndex) + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org