This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new d8946ca077 Fix `ArrowArrayStreamReader` for 0-columns record batch
streams (#9405)
d8946ca077 is described below
commit d8946ca0775ab7fe0eef2fdea4b8bb3d55ec6664
Author: Jonas Dedden <[email protected]>
AuthorDate: Fri Feb 13 15:49:10 2026 +0100
Fix `ArrowArrayStreamReader` for 0-columns record batch streams (#9405)
# Which issue does this PR close?
- Closes https://github.com/apache/arrow-rs/issues/9394
# Rationale for this change
PR https://github.com/apache/arrow-rs/pull/8944 introduced a regression
that 0-column record batch streams could not longer be decoded.
# What changes are included in this PR?
- Construct `RecordBatch` with `try_new_with_options` using the `len` of
the `ArrayData`, instead of letting it try to implicitly determine `len`
by looking at the first column (this is what `try_new` does).
- Slight refactor and reduction of code duplication of the existing
`test_stream_round_trip_[import/export]` tests
- Introduction of a new `test_stream_round_trip_no_columns` test
# Are these changes tested?
Yes, both export and import are tested in
`test_stream_round_trip_no_columns`.
# Are there any user-facing changes?
0-column record batch streams should be decodable now.
---
arrow-array/src/ffi_stream.rs | 75 ++++++++++++++++++++++---------------------
1 file changed, 39 insertions(+), 36 deletions(-)
diff --git a/arrow-array/src/ffi_stream.rs b/arrow-array/src/ffi_stream.rs
index c469436829..815d7c5760 100644
--- a/arrow-array/src/ffi_stream.rs
+++ b/arrow-array/src/ffi_stream.rs
@@ -66,6 +66,7 @@ use std::{
use arrow_data::ffi::FFI_ArrowArray;
use arrow_schema::{ArrowError, Schema, SchemaRef, ffi::FFI_ArrowSchema};
+use crate::RecordBatchOptions;
use crate::array::Array;
use crate::array::StructArray;
use crate::ffi::from_ffi_and_data_type;
@@ -365,7 +366,12 @@ impl Iterator for ArrowArrayStreamReader {
from_ffi_and_data_type(array,
DataType::Struct(self.schema().fields().clone()))
};
Some(result.and_then(|data| {
- RecordBatch::try_new(self.schema.clone(),
StructArray::from(data).into_parts().1)
+ let len = data.len();
+ RecordBatch::try_new_with_options(
+ self.schema.clone(),
+ StructArray::from(data).into_parts().1,
+ &RecordBatchOptions::new().with_row_count(Some(len)),
+ )
}))
} else {
let last_error = self.get_stream_last_error();
@@ -419,20 +425,7 @@ mod tests {
}
}
- fn _test_round_trip_export(arrays: Vec<Arc<dyn Array>>) -> Result<()> {
- let metadata = HashMap::from([("foo".to_owned(), "bar".to_owned())]);
- let schema = Arc::new(Schema::new_with_metadata(
- vec![
- Field::new("a", arrays[0].data_type().clone(), true)
- .with_metadata(metadata.clone()),
- Field::new("b", arrays[1].data_type().clone(), true)
- .with_metadata(metadata.clone()),
- Field::new("c", arrays[2].data_type().clone(), true)
- .with_metadata(metadata.clone()),
- ],
- metadata,
- ));
- let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
+ fn _test_round_trip_export(batch: RecordBatch, schema: Arc<Schema>) ->
Result<()> {
let iter = Box::new(vec![batch.clone(),
batch.clone()].into_iter().map(Ok)) as _;
let reader = TestRecordBatchReader::new(schema.clone(), iter);
@@ -461,10 +454,12 @@ mod tests {
}
let array = unsafe { from_ffi(ffi_array, &ffi_schema) }.unwrap();
+ let len = array.len();
- let record_batch = RecordBatch::try_new(
+ let record_batch = RecordBatch::try_new_with_options(
SchemaRef::from(exported_schema.clone()),
StructArray::from(array).into_parts().1,
+ &RecordBatchOptions::new().with_row_count(Some(len)),
)
.unwrap();
produced_batches.push(record_batch);
@@ -475,20 +470,7 @@ mod tests {
Ok(())
}
- fn _test_round_trip_import(arrays: Vec<Arc<dyn Array>>) -> Result<()> {
- let metadata = HashMap::from([("foo".to_owned(), "bar".to_owned())]);
- let schema = Arc::new(Schema::new_with_metadata(
- vec![
- Field::new("a", arrays[0].data_type().clone(), true)
- .with_metadata(metadata.clone()),
- Field::new("b", arrays[1].data_type().clone(), true)
- .with_metadata(metadata.clone()),
- Field::new("c", arrays[2].data_type().clone(), true)
- .with_metadata(metadata.clone()),
- ],
- metadata,
- ));
- let batch = RecordBatch::try_new(schema.clone(), arrays).unwrap();
+ fn _test_round_trip_import(batch: RecordBatch, schema: Arc<Schema>) ->
Result<()> {
let iter = Box::new(vec![batch.clone(),
batch.clone()].into_iter().map(Ok)) as _;
let reader = TestRecordBatchReader::new(schema.clone(), iter);
@@ -511,19 +493,40 @@ mod tests {
}
#[test]
- fn test_stream_round_trip_export() -> Result<()> {
+ fn test_stream_round_trip() {
let array = Int32Array::from(vec![Some(2), None, Some(1), None]);
let array: Arc<dyn Array> = Arc::new(array);
+ let metadata = HashMap::from([("foo".to_owned(), "bar".to_owned())]);
+
+ let schema = Arc::new(Schema::new_with_metadata(
+ vec![
+ Field::new("a", array.data_type().clone(),
true).with_metadata(metadata.clone()),
+ Field::new("b", array.data_type().clone(),
true).with_metadata(metadata.clone()),
+ Field::new("c", array.data_type().clone(),
true).with_metadata(metadata.clone()),
+ ],
+ metadata,
+ ));
+ let batch = RecordBatch::try_new(schema.clone(), vec![array.clone(),
array.clone(), array])
+ .unwrap();
- _test_round_trip_export(vec![array.clone(), array.clone(), array])
+ _test_round_trip_export(batch.clone(), schema.clone()).unwrap();
+ _test_round_trip_import(batch, schema).unwrap();
}
#[test]
- fn test_stream_round_trip_import() -> Result<()> {
- let array = Int32Array::from(vec![Some(2), None, Some(1), None]);
- let array: Arc<dyn Array> = Arc::new(array);
+ fn test_stream_round_trip_no_columns() {
+ let metadata = HashMap::from([("foo".to_owned(), "bar".to_owned())]);
+
+ let schema = Arc::new(Schema::new_with_metadata(Vec::<Field>::new(),
metadata));
+ let batch = RecordBatch::try_new_with_options(
+ schema.clone(),
+ Vec::<Arc<dyn Array>>::new(),
+ &RecordBatchOptions::new().with_row_count(Some(10)),
+ )
+ .unwrap();
- _test_round_trip_import(vec![array.clone(), array.clone(), array])
+ _test_round_trip_export(batch.clone(), schema.clone()).unwrap();
+ _test_round_trip_import(batch, schema).unwrap();
}
#[test]