Github user holdenk commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22275#discussion_r219561178
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
    @@ -3279,34 +3280,33 @@ class Dataset[T] private[sql](
         val timeZoneId = sparkSession.sessionState.conf.sessionLocalTimeZone
     
         withAction("collectAsArrowToPython", queryExecution) { plan =>
    -      PythonRDD.serveToStream("serve-Arrow") { out =>
    +      PythonRDD.serveToStream("serve-Arrow") { outputStream =>
    +        val out = new DataOutputStream(outputStream)
             val batchWriter = new ArrowBatchStreamWriter(schema, out, 
timeZoneId)
             val arrowBatchRdd = toArrowBatchRdd(plan)
             val numPartitions = arrowBatchRdd.partitions.length
     
    -        // Store collection results for worst case of 1 to N-1 partitions
    -        val results = new Array[Array[Array[Byte]]](numPartitions - 1)
    -        var lastIndex = -1  // index of last partition written
    +        // Batches ordered by (index of partition, batch # in partition) 
tuple
    +        val batchOrder = new ArrayBuffer[(Int, Int)]()
    +        var partitionCount = 0
     
    -        // Handler to eagerly write partitions to Python in order
    +        // Handler to eagerly write batches to Python out of order
             def handlePartitionBatches(index: Int, arrowBatches: 
Array[Array[Byte]]): Unit = {
    -          // If result is from next partition in order
    -          if (index - 1 == lastIndex) {
    +          if (arrowBatches.nonEmpty) {
                 batchWriter.writeBatches(arrowBatches.iterator)
    -            lastIndex += 1
    -            // Write stored partitions that come next in order
    -            while (lastIndex < results.length && results(lastIndex) != 
null) {
    -              batchWriter.writeBatches(results(lastIndex).iterator)
    -              results(lastIndex) = null
    -              lastIndex += 1
    -            }
    -            // After last batch, end the stream
    -            if (lastIndex == results.length) {
    -              batchWriter.end()
    +            arrowBatches.indices.foreach { i => batchOrder.append((index, 
i)) }
    --- End diff --
    
    Could we call `i` something more descriptive like partition_batch_num or 
similar?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to