yonipeleg33 commented on code in PR #9357:
URL: https://github.com/apache/arrow-rs/pull/9357#discussion_r2769983367
##########
parquet/src/file/properties.rs:
##########
@@ -575,7 +595,34 @@ impl WriterPropertiesBuilder {
/// If the value is set to 0.
pub fn set_max_row_group_size(mut self, value: usize) -> Self {
assert!(value > 0, "Cannot have a 0 max row group size");
- self.max_row_group_size = value;
+ self.max_row_group_row_count = Some(value);
+ self
+ }
+
+ /// Sets maximum number of rows in a row group, or `None` for unlimited.
+ ///
+ /// # Panics
+ /// If the value is `Some(0)`.
Review Comment:
Done
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -4518,4 +4575,185 @@ mod tests {
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
}
+
+ /// Helper to create a test batch with the given number of rows.
+ /// Each row is approximately 4 bytes (one i32).
+ fn create_test_batch(num_rows: usize) -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "int",
+ ArrowDataType::Int32,
+ false,
+ )]));
+ let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>());
+ RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
+ }
+
+ #[test]
+ fn test_row_group_limit_none_writes_single_row_group() {
+ // When both limits are None, all data should go into a single row
group
+ let batch = create_test_batch(1000);
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_row_count(None)
+ .set_max_row_group_bytes(None)
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(),
Some(props)).unwrap();
+
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ assert_eq!(
+ &row_group_sizes(builder.metadata()),
+ &[1000],
+ "With no limits, all rows should be in a single row group"
+ );
+ }
+
+ #[test]
+ fn test_row_group_limit_rows_only() {
+ // When only max_row_group_size is set, respect the row limit
+ let batch = create_test_batch(1000);
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(300)
+ .set_max_row_group_bytes(None)
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(),
Some(props)).unwrap();
+
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ assert_eq!(
+ &row_group_sizes(builder.metadata()),
+ &[300, 300, 300, 100],
+ "Row groups should be split by row count"
+ );
+ }
+
+ #[test]
+ fn test_row_group_limit_bytes_only() {
+ // When only max_row_group_bytes is set, respect the byte limit
+ // Create batches with string data for more predictable byte sizes
+ // Write in multiple small batches so byte-based splitting can work
+ // (first batch establishes the avg row size, subsequent batches are
split)
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "str",
+ ArrowDataType::Utf8,
+ false,
+ )]));
+
+ // Set byte limit to approximately fit ~30 rows worth of data (~100
bytes each)
Review Comment:
Done
##########
parquet/src/file/properties.rs:
##########
@@ -45,6 +45,8 @@ pub const DEFAULT_STATISTICS_ENABLED: EnabledStatistics =
EnabledStatistics::Pag
pub const DEFAULT_WRITE_PAGE_HEADER_STATISTICS: bool = false;
/// Default value for [`WriterProperties::max_row_group_size`]
pub const DEFAULT_MAX_ROW_GROUP_SIZE: usize = 1024 * 1024;
+/// Default value for [`WriterProperties::max_row_group_bytes`] (128 MB, same
as parquet-mr's parquet.block.size)
Review Comment:
Done
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -4518,4 +4575,185 @@ mod tests {
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
}
+
+ /// Helper to create a test batch with the given number of rows.
+ /// Each row is approximately 4 bytes (one i32).
+ fn create_test_batch(num_rows: usize) -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "int",
+ ArrowDataType::Int32,
+ false,
+ )]));
+ let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>());
+ RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
+ }
+
+ #[test]
+ fn test_row_group_limit_none_writes_single_row_group() {
+ // When both limits are None, all data should go into a single row
group
+ let batch = create_test_batch(1000);
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_row_count(None)
+ .set_max_row_group_bytes(None)
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(),
Some(props)).unwrap();
+
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ assert_eq!(
+ &row_group_sizes(builder.metadata()),
+ &[1000],
+ "With no limits, all rows should be in a single row group"
+ );
+ }
+
+ #[test]
+ fn test_row_group_limit_rows_only() {
+ // When only max_row_group_size is set, respect the row limit
+ let batch = create_test_batch(1000);
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(300)
+ .set_max_row_group_bytes(None)
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(),
Some(props)).unwrap();
+
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ assert_eq!(
+ &row_group_sizes(builder.metadata()),
+ &[300, 300, 300, 100],
+ "Row groups should be split by row count"
+ );
+ }
+
+ #[test]
+ fn test_row_group_limit_bytes_only() {
+ // When only max_row_group_bytes is set, respect the byte limit
+ // Create batches with string data for more predictable byte sizes
+ // Write in multiple small batches so byte-based splitting can work
+ // (first batch establishes the avg row size, subsequent batches are
split)
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "str",
+ ArrowDataType::Utf8,
+ false,
+ )]));
+
+ // Set byte limit to approximately fit ~30 rows worth of data (~100
bytes each)
+ let props = WriterProperties::builder()
+ .set_max_row_group_row_count(None)
+ .set_max_row_group_bytes(Some(3500))
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(),
Some(props)).unwrap();
+
+ // Write 10 batches of 10 rows each (100 rows total)
+ // Each string is ~100 bytes
+ for batch_idx in 0..10 {
+ let strings: Vec<String> = (0..10)
+ .map(|i| format!("{:0>100}", batch_idx * 10 + i))
+ .collect();
+ let array = StringArray::from(strings);
+ let batch = RecordBatch::try_new(schema.clone(),
vec![Arc::new(array)]).unwrap();
+ writer.write(&batch).unwrap();
+ }
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ let sizes = row_group_sizes(builder.metadata());
+
+ assert!(
+ sizes.len() > 1,
+ "Should have multiple row groups due to byte limit, got {sizes:?}",
+ );
+
+ let total_rows: i64 = sizes.iter().sum();
+ assert_eq!(total_rows, 100, "Total rows should be preserved");
+ }
+
+ #[test]
+ fn test_row_group_limit_both_row_wins() {
Review Comment:
No can do; The first batch is always written as a whole, because we need
some statistics in order to calculate average row size. This is also noted in
the PR description:
> This means that the first batch will always be written as a whole (unless
row count limit is also set).
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -4518,4 +4575,185 @@ mod tests {
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
}
+
+ /// Helper to create a test batch with the given number of rows.
+ /// Each row is approximately 4 bytes (one i32).
+ fn create_test_batch(num_rows: usize) -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "int",
+ ArrowDataType::Int32,
+ false,
+ )]));
+ let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>());
+ RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
+ }
+
+ #[test]
+ fn test_row_group_limit_none_writes_single_row_group() {
+ // When both limits are None, all data should go into a single row
group
+ let batch = create_test_batch(1000);
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_row_count(None)
+ .set_max_row_group_bytes(None)
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(),
Some(props)).unwrap();
+
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ assert_eq!(
+ &row_group_sizes(builder.metadata()),
+ &[1000],
+ "With no limits, all rows should be in a single row group"
+ );
+ }
+
+ #[test]
+ fn test_row_group_limit_rows_only() {
+ // When only max_row_group_size is set, respect the row limit
+ let batch = create_test_batch(1000);
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(300)
+ .set_max_row_group_bytes(None)
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(),
Some(props)).unwrap();
+
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ assert_eq!(
+ &row_group_sizes(builder.metadata()),
+ &[300, 300, 300, 100],
+ "Row groups should be split by row count"
+ );
+ }
+
+ #[test]
+ fn test_row_group_limit_bytes_only() {
+ // When only max_row_group_bytes is set, respect the byte limit
+ // Create batches with string data for more predictable byte sizes
+ // Write in multiple small batches so byte-based splitting can work
+ // (first batch establishes the avg row size, subsequent batches are
split)
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "str",
+ ArrowDataType::Utf8,
+ false,
+ )]));
+
+ // Set byte limit to approximately fit ~30 rows worth of data (~100
bytes each)
+ let props = WriterProperties::builder()
+ .set_max_row_group_row_count(None)
+ .set_max_row_group_bytes(Some(3500))
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(),
Some(props)).unwrap();
+
+ // Write 10 batches of 10 rows each (100 rows total)
+ // Each string is ~100 bytes
+ for batch_idx in 0..10 {
+ let strings: Vec<String> = (0..10)
+ .map(|i| format!("{:0>100}", batch_idx * 10 + i))
+ .collect();
+ let array = StringArray::from(strings);
+ let batch = RecordBatch::try_new(schema.clone(),
vec![Arc::new(array)]).unwrap();
+ writer.write(&batch).unwrap();
+ }
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ let sizes = row_group_sizes(builder.metadata());
+
+ assert!(
+ sizes.len() > 1,
+ "Should have multiple row groups due to byte limit, got {sizes:?}",
+ );
+
+ let total_rows: i64 = sizes.iter().sum();
+ assert_eq!(total_rows, 100, "Total rows should be preserved");
+ }
+
+ #[test]
+ fn test_row_group_limit_both_row_wins() {
+ // When both limits are set, the row limit triggers first
+ let batch = create_test_batch(1000);
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(200) // Will trigger at 200 rows
+ .set_max_row_group_bytes(Some(1024 * 1024)) // 1MB - won't trigger
for small int data
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(),
Some(props)).unwrap();
+
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ assert_eq!(
+ &row_group_sizes(builder.metadata()),
+ &[200, 200, 200, 200, 200],
+ "Row limit should trigger before byte limit"
+ );
+ }
+
+ #[test]
+ fn test_row_group_limit_both_bytes_wins() {
+ // When both limits are set, the byte limit triggers first
+ // Write in multiple small batches so byte-based splitting can work
Review Comment:
Unfortunately, the way data is fed _does_ affect the row group splits,
because of the first batch issue (noted in the PR description):
> This means that the first batch will always be written as a whole (unless
row count limit is also set).
And even beyond the first batch, the behaviour is not predictable:
Byte-based limit is enforced by calculating the average row size, based on
previous batches. This is more dynamic than row-based limit.
I'm not sure what's actionable from this comment. If you think there's
still a missing test case, please LMK.
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -4518,4 +4575,185 @@ mod tests {
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
}
+
+ /// Helper to create a test batch with the given number of rows.
+ /// Each row is approximately 4 bytes (one i32).
Review Comment:
Done
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -4518,4 +4575,185 @@ mod tests {
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
}
+
+ /// Helper to create a test batch with the given number of rows.
+ /// Each row is approximately 4 bytes (one i32).
+ fn create_test_batch(num_rows: usize) -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "int",
+ ArrowDataType::Int32,
+ false,
+ )]));
+ let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>());
+ RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
+ }
+
+ #[test]
+ fn test_row_group_limit_none_writes_single_row_group() {
+ // When both limits are None, all data should go into a single row
group
+ let batch = create_test_batch(1000);
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_row_count(None)
+ .set_max_row_group_bytes(None)
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(),
Some(props)).unwrap();
+
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ assert_eq!(
+ &row_group_sizes(builder.metadata()),
+ &[1000],
+ "With no limits, all rows should be in a single row group"
+ );
+ }
+
+ #[test]
+ fn test_row_group_limit_rows_only() {
+ // When only max_row_group_size is set, respect the row limit
Review Comment:
Done
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -4518,4 +4575,185 @@ mod tests {
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
}
+
+ /// Helper to create a test batch with the given number of rows.
+ /// Each row is approximately 4 bytes (one i32).
+ fn create_test_batch(num_rows: usize) -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "int",
+ ArrowDataType::Int32,
+ false,
+ )]));
+ let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>());
Review Comment:
Nice! Done
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -4518,4 +4575,185 @@ mod tests {
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
}
+
+ /// Helper to create a test batch with the given number of rows.
+ /// Each row is approximately 4 bytes (one i32).
+ fn create_test_batch(num_rows: usize) -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "int",
+ ArrowDataType::Int32,
+ false,
+ )]));
+ let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>());
+ RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
+ }
+
+ #[test]
+ fn test_row_group_limit_none_writes_single_row_group() {
+ // When both limits are None, all data should go into a single row
group
Review Comment:
Done (moved to above the test function)
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -4518,4 +4575,185 @@ mod tests {
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
}
+
+ /// Helper to create a test batch with the given number of rows.
+ /// Each row is approximately 4 bytes (one i32).
+ fn create_test_batch(num_rows: usize) -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "int",
+ ArrowDataType::Int32,
+ false,
+ )]));
+ let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>());
+ RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
+ }
+
+ #[test]
+ fn test_row_group_limit_none_writes_single_row_group() {
+ // When both limits are None, all data should go into a single row
group
+ let batch = create_test_batch(1000);
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_row_count(None)
+ .set_max_row_group_bytes(None)
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(),
Some(props)).unwrap();
+
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ assert_eq!(
+ &row_group_sizes(builder.metadata()),
+ &[1000],
+ "With no limits, all rows should be in a single row group"
+ );
+ }
+
+ #[test]
+ fn test_row_group_limit_rows_only() {
+ // When only max_row_group_size is set, respect the row limit
+ let batch = create_test_batch(1000);
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(300)
+ .set_max_row_group_bytes(None)
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(),
Some(props)).unwrap();
+
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ assert_eq!(
+ &row_group_sizes(builder.metadata()),
+ &[300, 300, 300, 100],
+ "Row groups should be split by row count"
+ );
+ }
+
+ #[test]
+ fn test_row_group_limit_bytes_only() {
+ // When only max_row_group_bytes is set, respect the byte limit
+ // Create batches with string data for more predictable byte sizes
+ // Write in multiple small batches so byte-based splitting can work
+ // (first batch establishes the avg row size, subsequent batches are
split)
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "str",
+ ArrowDataType::Utf8,
+ false,
+ )]));
+
+ // Set byte limit to approximately fit ~30 rows worth of data (~100
bytes each)
+ let props = WriterProperties::builder()
+ .set_max_row_group_row_count(None)
+ .set_max_row_group_bytes(Some(3500))
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(),
Some(props)).unwrap();
+
+ // Write 10 batches of 10 rows each (100 rows total)
+ // Each string is ~100 bytes
+ for batch_idx in 0..10 {
+ let strings: Vec<String> = (0..10)
+ .map(|i| format!("{:0>100}", batch_idx * 10 + i))
+ .collect();
+ let array = StringArray::from(strings);
+ let batch = RecordBatch::try_new(schema.clone(),
vec![Arc::new(array)]).unwrap();
+ writer.write(&batch).unwrap();
+ }
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ let sizes = row_group_sizes(builder.metadata());
+
+ assert!(
+ sizes.len() > 1,
+ "Should have multiple row groups due to byte limit, got {sizes:?}",
+ );
+
+ let total_rows: i64 = sizes.iter().sum();
+ assert_eq!(total_rows, 100, "Total rows should be preserved");
+ }
+
+ #[test]
+ fn test_row_group_limit_both_row_wins() {
+ // When both limits are set, the row limit triggers first
Review Comment:
Done
##########
parquet/src/file/properties.rs:
##########
@@ -575,7 +595,34 @@ impl WriterPropertiesBuilder {
/// If the value is set to 0.
pub fn set_max_row_group_size(mut self, value: usize) -> Self {
Review Comment:
Probably. Done
##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -4518,4 +4575,185 @@ mod tests {
assert_eq!(get_dict_page_size(col0_meta), 1024 * 1024);
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
}
+
+ /// Helper to create a test batch with the given number of rows.
+ /// Each row is approximately 4 bytes (one i32).
+ fn create_test_batch(num_rows: usize) -> RecordBatch {
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "int",
+ ArrowDataType::Int32,
+ false,
+ )]));
+ let array = Int32Array::from((0..num_rows as i32).collect::<Vec<_>>());
+ RecordBatch::try_new(schema, vec![Arc::new(array)]).unwrap()
+ }
+
+ #[test]
+ fn test_row_group_limit_none_writes_single_row_group() {
+ // When both limits are None, all data should go into a single row
group
+ let batch = create_test_batch(1000);
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_row_count(None)
+ .set_max_row_group_bytes(None)
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(),
Some(props)).unwrap();
+
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ assert_eq!(
+ &row_group_sizes(builder.metadata()),
+ &[1000],
+ "With no limits, all rows should be in a single row group"
+ );
+ }
+
+ #[test]
+ fn test_row_group_limit_rows_only() {
+ // When only max_row_group_size is set, respect the row limit
+ let batch = create_test_batch(1000);
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(300)
+ .set_max_row_group_bytes(None)
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(),
Some(props)).unwrap();
+
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ assert_eq!(
+ &row_group_sizes(builder.metadata()),
+ &[300, 300, 300, 100],
+ "Row groups should be split by row count"
+ );
+ }
+
+ #[test]
+ fn test_row_group_limit_bytes_only() {
+ // When only max_row_group_bytes is set, respect the byte limit
+ // Create batches with string data for more predictable byte sizes
+ // Write in multiple small batches so byte-based splitting can work
+ // (first batch establishes the avg row size, subsequent batches are
split)
+ let schema = Arc::new(Schema::new(vec![Field::new(
+ "str",
+ ArrowDataType::Utf8,
+ false,
+ )]));
+
+ // Set byte limit to approximately fit ~30 rows worth of data (~100
bytes each)
+ let props = WriterProperties::builder()
+ .set_max_row_group_row_count(None)
+ .set_max_row_group_bytes(Some(3500))
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), schema.clone(),
Some(props)).unwrap();
+
+ // Write 10 batches of 10 rows each (100 rows total)
+ // Each string is ~100 bytes
+ for batch_idx in 0..10 {
+ let strings: Vec<String> = (0..10)
+ .map(|i| format!("{:0>100}", batch_idx * 10 + i))
+ .collect();
+ let array = StringArray::from(strings);
+ let batch = RecordBatch::try_new(schema.clone(),
vec![Arc::new(array)]).unwrap();
+ writer.write(&batch).unwrap();
+ }
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ let sizes = row_group_sizes(builder.metadata());
+
+ assert!(
+ sizes.len() > 1,
+ "Should have multiple row groups due to byte limit, got {sizes:?}",
+ );
+
+ let total_rows: i64 = sizes.iter().sum();
+ assert_eq!(total_rows, 100, "Total rows should be preserved");
+ }
+
+ #[test]
+ fn test_row_group_limit_both_row_wins() {
+ // When both limits are set, the row limit triggers first
+ let batch = create_test_batch(1000);
+
+ let props = WriterProperties::builder()
+ .set_max_row_group_size(200) // Will trigger at 200 rows
+ .set_max_row_group_bytes(Some(1024 * 1024)) // 1MB - won't trigger
for small int data
+ .build();
+
+ let file = tempfile::tempfile().unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), batch.schema(),
Some(props)).unwrap();
+
+ writer.write(&batch).unwrap();
+ writer.close().unwrap();
+
+ let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap();
+ assert_eq!(
+ &row_group_sizes(builder.metadata()),
+ &[200, 200, 200, 200, 200],
+ "Row limit should trigger before byte limit"
+ );
+ }
+
+ #[test]
+ fn test_row_group_limit_both_bytes_wins() {
Review Comment:
Done - see `test_row_group_limit_both_row_wins_multiple_batches` vs.
`test_row_group_limit_both_row_wins_single_batch`
##########
parquet/src/file/properties.rs:
##########
@@ -575,7 +595,34 @@ impl WriterPropertiesBuilder {
/// If the value is set to 0.
pub fn set_max_row_group_size(mut self, value: usize) -> Self {
assert!(value > 0, "Cannot have a 0 max row group size");
- self.max_row_group_size = value;
+ self.max_row_group_row_count = Some(value);
+ self
+ }
+
+ /// Sets maximum number of rows in a row group, or `None` for unlimited.
+ ///
+ /// # Panics
+ /// If the value is `Some(0)`.
+ pub fn set_max_row_group_row_count(mut self, value: Option<usize>) -> Self
{
+ if let Some(v) = value {
+ assert!(v > 0, "Cannot have a 0 max row group size");
+ }
+ self.max_row_group_row_count = value;
+ self
+ }
+
+ /// Sets maximum size of a row group in bytes, or `None` for unlimited.
+ ///
+ /// Row groups are flushed when their estimated encoded size exceeds this
threshold.
+ /// This is similar to the official `parquet.block.size` behavior.
+ ///
+ /// # Panics
+ /// If the value is `Some(0)`.
+ pub fn set_max_row_group_bytes(mut self, value: Option<usize>) -> Self {
+ if let Some(v) = value {
+ assert!(v > 0, "Cannot have a 0 max row group bytes");
+ }
Review Comment:
Neat, done
##########
parquet/src/file/properties.rs:
##########
@@ -575,7 +595,34 @@ impl WriterPropertiesBuilder {
/// If the value is set to 0.
pub fn set_max_row_group_size(mut self, value: usize) -> Self {
assert!(value > 0, "Cannot have a 0 max row group size");
- self.max_row_group_size = value;
+ self.max_row_group_row_count = Some(value);
+ self
+ }
+
+ /// Sets maximum number of rows in a row group, or `None` for unlimited.
+ ///
+ /// # Panics
+ /// If the value is `Some(0)`.
+ pub fn set_max_row_group_row_count(mut self, value: Option<usize>) -> Self
{
+ if let Some(v) = value {
+ assert!(v > 0, "Cannot have a 0 max row group size");
+ }
+ self.max_row_group_row_count = value;
+ self
+ }
+
+ /// Sets maximum size of a row group in bytes, or `None` for unlimited.
+ ///
+ /// Row groups are flushed when their estimated encoded size exceeds this
threshold.
+ /// This is similar to the official `parquet.block.size` behavior.
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]