This is an automated email from the ASF dual-hosted git repository.
mbrobbel pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 6bf5795b58 Avoid too many open files by using in memory buffers for
round trip p… (#8407)
6bf5795b58 is described below
commit 6bf5795b58d2382113763721ec23bf1a6ebc74fe
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Sep 24 02:51:35 2025 -0700
Avoid too many open files by using in memory buffers for round trip p…
(#8407)
…arquet testing
# Which issue does this PR close?
- closes https://github.com/apache/arrow-rs/issues/8406
# Rationale for this change
It has annoyed me for a long time that running `cargo test -p parquet
--all-features` fails with a default ulimit (-n 256)
# What changes are included in this PR?
Change the roundtrip test to read/write from in memory buffers rather
than `File`s
# Are these changes tested?
By CI (and I verified that `cargo test -p parquet --all-features` passes
locally for me manually)
# Are there any user-facing changes?
No, this is a development process change only
---
parquet/src/arrow/arrow_writer/mod.rs | 59 ++++++++++++++---------------------
1 file changed, 23 insertions(+), 36 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 684d5cf747..25fd2396c1 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -1506,7 +1506,6 @@ mod tests {
use super::*;
use std::fs::File;
- use std::io::Seek;
use crate::arrow::arrow_reader::{ParquetRecordBatchReader,
ParquetRecordBatchReaderBuilder};
use crate::arrow::ARROW_SCHEMA_META_KEY;
@@ -2282,7 +2281,7 @@ mod tests {
const SMALL_SIZE: usize = 7;
const MEDIUM_SIZE: usize = 63;
- fn roundtrip(expected_batch: RecordBatch, max_row_group_size:
Option<usize>) -> Vec<File> {
+ fn roundtrip(expected_batch: RecordBatch, max_row_group_size:
Option<usize>) -> Vec<Bytes> {
let mut files = vec![];
for version in [WriterVersion::PARQUET_1_0,
WriterVersion::PARQUET_2_0] {
let mut props =
WriterProperties::builder().set_writer_version(version);
@@ -2297,27 +2296,27 @@ mod tests {
files
}
+ // Round trip the specified record batch with the specified writer
properties,
+ // to an in-memory file, and validate the arrays using the specified
function.
+ // Returns the in-memory file.
fn roundtrip_opts_with_array_validation<F>(
expected_batch: &RecordBatch,
props: WriterProperties,
validate: F,
- ) -> File
+ ) -> Bytes
where
F: Fn(&ArrayData, &ArrayData),
{
- let file = tempfile::tempfile().unwrap();
+ let mut file = vec![];
- let mut writer = ArrowWriter::try_new(
- file.try_clone().unwrap(),
- expected_batch.schema(),
- Some(props),
- )
- .expect("Unable to write file");
+ let mut writer = ArrowWriter::try_new(&mut file,
expected_batch.schema(), Some(props))
+ .expect("Unable to write file");
writer.write(expected_batch).unwrap();
writer.close().unwrap();
+ let file = Bytes::from(file);
let mut record_batch_reader =
- ParquetRecordBatchReader::try_new(file.try_clone().unwrap(),
1024).unwrap();
+ ParquetRecordBatchReader::try_new(file.clone(), 1024).unwrap();
let actual_batch = record_batch_reader
.next()
@@ -2336,7 +2335,7 @@ mod tests {
file
}
- fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties)
-> File {
+ fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties)
-> Bytes {
roundtrip_opts_with_array_validation(expected_batch, props, |a, b| {
a.validate_full().expect("valid expected data");
b.validate_full().expect("valid actual data");
@@ -2364,17 +2363,17 @@ mod tests {
}
}
- fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
+ fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<Bytes> {
one_column_roundtrip_with_options(RoundTripOptions::new(values,
nullable))
}
- fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef)
-> Vec<File> {
+ fn one_column_roundtrip_with_schema(values: ArrayRef, schema: SchemaRef)
-> Vec<Bytes> {
let mut options = RoundTripOptions::new(values, false);
options.schema = schema;
one_column_roundtrip_with_options(options)
}
- fn one_column_roundtrip_with_options(options: RoundTripOptions) ->
Vec<File> {
+ fn one_column_roundtrip_with_options(options: RoundTripOptions) ->
Vec<Bytes> {
let RoundTripOptions {
values,
schema,
@@ -2435,7 +2434,7 @@ mod tests {
files
}
- fn values_required<A, I>(iter: I) -> Vec<File>
+ fn values_required<A, I>(iter: I) -> Vec<Bytes>
where
A: From<Vec<I::Item>> + Array + 'static,
I: IntoIterator,
@@ -2445,7 +2444,7 @@ mod tests {
one_column_roundtrip(values, false)
}
- fn values_optional<A, I>(iter: I) -> Vec<File>
+ fn values_optional<A, I>(iter: I) -> Vec<Bytes>
where
A: From<Vec<Option<I::Item>>> + Array + 'static,
I: IntoIterator,
@@ -2469,7 +2468,7 @@ mod tests {
}
fn check_bloom_filter<T: AsBytes>(
- files: Vec<File>,
+ files: Vec<Bytes>,
file_column: String,
positive_values: Vec<T>,
negative_values: Vec<T>,
@@ -4201,17 +4200,13 @@ mod tests {
.set_compression(crate::basic::Compression::UNCOMPRESSED)
.build();
- let mut file = roundtrip_opts(&batch, props);
+ let file = roundtrip_opts(&batch, props);
// read file and decode page headers
// Note: use the thrift API as there is no Rust API to access the
statistics in the page headers
- let mut buf = vec![];
- file.seek(std::io::SeekFrom::Start(0)).unwrap();
- let read = file.read_to_end(&mut buf).unwrap();
- assert!(read > 0);
// decode first page header
- let first_page = &buf[4..];
+ let first_page = &file[4..];
let mut prot = TCompactSliceInputProtocol::new(first_page);
let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
let stats = hdr.data_page_header.unwrap().statistics;
@@ -4235,17 +4230,13 @@ mod tests {
.set_compression(crate::basic::Compression::UNCOMPRESSED)
.build();
- let mut file = roundtrip_opts(&batch, props);
+ let file = roundtrip_opts(&batch, props);
// read file and decode page headers
// Note: use the thrift API as there is no Rust API to access the
statistics in the page headers
- let mut buf = vec![];
- file.seek(std::io::SeekFrom::Start(0)).unwrap();
- let read = file.read_to_end(&mut buf).unwrap();
- assert!(read > 0);
// decode first page header
- let first_page = &buf[4..];
+ let first_page = &file[4..];
let mut prot = TCompactSliceInputProtocol::new(first_page);
let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
let stats = hdr.data_page_header.unwrap().statistics;
@@ -4287,17 +4278,13 @@ mod tests {
.set_compression(crate::basic::Compression::UNCOMPRESSED)
.build();
- let mut file = roundtrip_opts(&batch, props);
+ let file = roundtrip_opts(&batch, props);
// read file and decode page headers
// Note: use the thrift API as there is no Rust API to access the
statistics in the page headers
- let mut buf = vec![];
- file.seek(std::io::SeekFrom::Start(0)).unwrap();
- let read = file.read_to_end(&mut buf).unwrap();
- assert!(read > 0);
// decode first page header
- let first_page = &buf[4..];
+ let first_page = &file[4..];
let mut prot = TCompactSliceInputProtocol::new(first_page);
let hdr = PageHeader::read_from_in_protocol(&mut prot).unwrap();
let stats = hdr.data_page_header.unwrap().statistics;