dongjoon-hyun commented on code in PR #52090:
URL: https://github.com/apache/spark/pull/52090#discussion_r2301610232
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/arrow/ArrowConvertersSuite.scala:
##########
@@ -1534,6 +1534,422 @@ class ArrowConvertersSuite extends SharedSparkSession {
}
}
+ test("roundtrip arrow batches with IPC stream - single batch") {
+ val inputRows = (0 until 9).map(InternalRow(_)) :+ InternalRow(null)
+ val schema = StructType(Seq(StructField("int", IntegerType, nullable =
true)))
+ val ctx = TaskContext.empty()
+
+ val batchIter = ArrowConverters.toBatchIterator(
+ inputRows.iterator, schema, 10, null, true, false, ctx)
+
+ // Write batches to Arrow IPC stream format
+ val out = new ByteArrayOutputStream()
+ Utils.tryWithResource(new DataOutputStream(out)) { dataOut =>
+ val writer = new ArrowBatchStreamWriter(schema, dataOut, null, true,
false)
+ writer.writeBatches(batchIter)
+ writer.end()
+ }
+
+ // Test the new IPC stream converter with metrics
+ val (iterator, outputSchema) =
ArrowConverters.fromIPCStreamWithIterator(out.toByteArray, ctx)
+ assert(outputSchema == schema)
+
+ // Initially one batch loaded
+ assert(iterator.batchesLoaded == 0)
+ assert(iterator.totalRowsProcessed == 0)
+
+ var count = 0
+ iterator.zipWithIndex.foreach { case (row, i) =>
+ if (i != 9) {
+ assert(row.getInt(0) == i)
+ } else {
+ assert(row.isNullAt(0))
+ }
+ count += 1
+ }
+ assert(count == inputRows.length)
+
+ // Verify metrics after consuming all rows
+ assert(iterator.batchesLoaded == 1,
+ s"Expected 1 batch loaded, got ${iterator.batchesLoaded}")
+ assert(iterator.totalRowsProcessed == inputRows.length,
+ s"Expected ${inputRows.length} rows processed, got
${iterator.totalRowsProcessed}")
+ }
+
+ test("multiple record batches in single IPC stream") {
Review Comment:
Thank you for adding this test coverage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]