hvanhovell commented on code in PR #38468:
URL: https://github.com/apache/spark/pull/38468#discussion_r1013189541


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala:
##########
@@ -128,6 +128,65 @@ private[sql] object ArrowConverters extends Logging {
     }
   }
 
+  private[sql] def toArrowBatchIterator(
+      rowIter: Iterator[InternalRow],
+      schema: StructType,
+      maxRecordsPerBatch: Int,
+      timeZoneId: String,
+      context: TaskContext): Iterator[(Array[Byte], Long, Long)] = {
+    val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
+    val allocator = ArrowUtils.rootAllocator.newChildAllocator(
+      "toArrowBatchIterator", 0, Long.MaxValue)
+
+    val root = VectorSchemaRoot.create(arrowSchema, allocator)
+    val unloader = new VectorUnloader(root)
+    val arrowWriter = ArrowWriter.create(root)
+
+    if (context != null) { // for test at driver
+      context.addTaskCompletionListener[Unit] { _ =>
+        root.close()
+        allocator.close()
+      }
+    }
+
+    new Iterator[(Array[Byte], Long, Long)] {
+
+      override def hasNext: Boolean = rowIter.hasNext || {
+        root.close()
+        allocator.close()
+        false
+      }
+
+      override def next(): (Array[Byte], Long, Long) = {
+        val out = new ByteArrayOutputStream()
+        val writeChannel = new WriteChannel(Channels.newChannel(out))
+
+        var rowCount = 0L
+        var estimatedSize = 0L
+        Utils.tryWithSafeFinally {
+          while (rowIter.hasNext && (maxRecordsPerBatch <= 0 || rowCount < 
maxRecordsPerBatch)) {
+            val row = rowIter.next()
+            arrowWriter.write(row)
+            rowCount += 1
+            estimatedSize += SizeEstimator.estimate(row)
+          }
+          arrowWriter.finish()
+          val batch = unloader.getRecordBatch()
+
+          MessageSerializer.serialize(writeChannel, arrowSchema)

Review Comment:
   Instead of turning each written result into a separate IPC stream, why not 
make the driver send the schema, and then stream back the record batches? I am 
not 100% how hard it would be to reassemble this on the python side, on the 
scala side it would be fairly straightforward.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to