adriangb commented on code in PR #9972:
URL: https://github.com/apache/arrow-rs/pull/9972#discussion_r3352095020
##########
parquet/src/column/writer/mod.rs:
##########
@@ -555,14 +577,39 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
}
}
- values_offset += self.write_mini_batch(
+ let chunk_size = end_offset - levels_offset;
+ let chunk_def = def_levels.slice(levels_offset, chunk_size);
+ let chunk_rep = rep_levels.slice(levels_offset, chunk_size);
+
+ let sub_batch_size = chunker.pick_sub_batch_size(
Review Comment:
Added a comment marking this as the decision point — write the whole chunk
as one mini-batch (the common case) vs. fall back to byte-budget sub-batching.
Resolved in e79366b1ea.
##########
parquet/src/column/writer/mod.rs:
##########
@@ -713,6 +760,78 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
})
}
+ /// Writes a chunk in sub-batches sized by the caller-supplied
+ /// `sub_batch_size` so the post-write data page byte limit check
+ /// fires before the chunk can grossly overshoot
+ /// `data_page_size_limit`.
+ ///
+ /// For flat (unrepeated) columns sub-batches contain up to
+ /// `sub_batch_size` levels each. For repeated/nested columns
+ /// sub-batches step from one `rep == 0` boundary to the next so a
+ /// record never spans data pages, matching the parquet format rule.
+ ///
+ /// Returns the total number of values consumed across all sub-batches.
+ ///
+ /// `#[inline(never)]` keeps this slow path — only reached for
+ /// variable-width columns whose values need page splitting — out of
+ /// the hot `write_batch_internal` loop.
+ #[allow(clippy::too_many_arguments)]
+ #[inline(never)]
+ fn write_granular_chunk(
+ &mut self,
+ values: &E::Values,
+ values_offset: usize,
+ value_indices: Option<&[usize]>,
+ chunk_size: usize,
+ chunk_def: LevelDataRef<'_>,
+ chunk_rep: LevelDataRef<'_>,
+ sub_batch_size: usize,
+ ) -> Result<usize> {
+ let mut values_consumed = 0;
+ let mut sub_start = 0;
+ while sub_start < chunk_size {
+ let sub_end = match chunk_rep {
+ LevelDataRef::Materialized(levels) => {
+ // Pack up to `sub_batch_size` levels per mini-batch, then
+ // extend to the next record boundary (rep == 0) so a
+ // record never spans data pages. Packing whole records
+ // rather than stepping one record at a time avoids
+ // emitting a `write_mini_batch` per record: records
+ // average only a handful of levels, so the
+ // record-at-a-time loop issued roughly `sub_batch_size`×
+ // more mini-batches than necessary.
+ let mut e = (sub_start + sub_batch_size).min(chunk_size);
+ while e < chunk_size && levels[e] != 0 {
+ e += 1;
+ }
+ // `sub_batch_size` can be 0 when the chunker sizes a
Review Comment:
You're right, it was unreachable: `pick_sub_batch_size` always returns >= 1
and `write_granular_chunk` is only entered when `sub_batch_size < chunk_size`,
so `e > sub_start` always holds and the record-extension loop only increases
`e`. Removed the branch (and the incorrect comment) and replaced it with a
`debug_assert!(sub_batch_size >= 1)` documenting the invariant. Resolved in
e79366b1ea.
##########
parquet/src/column/writer/mod.rs:
##########
@@ -2676,6 +2795,500 @@ mod tests {
assert_eq!(other_values, vec![10]);
}
+ #[test]
+ fn test_column_writer_caps_page_size_for_large_byte_array_values() {
+ // Regression: the post-write data page byte limit check only fires
+ // at mini-batch boundaries, so a 1024-row mini-batch of multi-MiB
+ // BYTE_ARRAY values used to buffer multiple GiB into a single page
+ // before the limit was even consulted. With the threshold-based
+ // granular mode this batch should split into ~one page per value.
+ let value_size = 64 * 1024; // 64 KiB per value
+ let page_byte_limit = 16 * 1024; // 16 KiB page limit
+ let num_rows = 64;
+
+ let mut file = tempfile::tempfile().unwrap();
+ let mut write = TrackedWrite::new(&mut file);
+ let page_writer = Box::new(SerializedPageWriter::new(&mut write));
+ let props = Arc::new(
+ WriterProperties::builder()
+ .set_writer_version(WriterVersion::PARQUET_1_0)
+ .set_dictionary_enabled(false)
+ .set_encoding(Encoding::PLAIN)
+ .set_data_page_size_limit(page_byte_limit)
+ // Default write_batch_size (1024) — without the fix this
+ // buffers the entire input into a single ~4 MiB page.
+ .build(),
+ );
+
+ let mut data = Vec::with_capacity(num_rows);
Review Comment:
Done — extracted a `write_and_collect_pages` helper (+ `CollectedPages`) and
rewrote all six page-size tests through it, so each is now just its props +
input + assertions. ~130 fewer lines, no coverage change. Resolved in
e79366b1ea.
##########
parquet/src/column/writer/byte_budget_chunker.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Decides how many levels of a chunk to write as one mini-batch so that
+//! the resulting data page stays within `data_page_size_limit`.
+//!
+//! The parquet column writer checks the data page byte limit only *after*
+//! each mini-batch finishes writing. Mini-batches are sized in rows
+//! (`write_batch_size`, default 1024), so for BYTE_ARRAY columns whose
+//! values are large (e.g. multi-MiB blobs) a single mini-batch can buffer
+//! GiB into one page before the limit is consulted.
+//!
+//! This module isolates the per-chunk decision that prevents that: given a
+//! chunk's level data and the input values, pick the largest `sub_batch_size`
+//! such that one mini-batch fits in one page byte budget. For the
+//! overwhelmingly common case (small or fixed-width values) the answer is
+//! just `chunk_size` and the decision is O(1) on the column type — only
+//! when the input might overflow does the chunker consult the encoder's
+//! byte estimate.
+
+use crate::basic::Type;
+use crate::column::writer::LevelDataRef;
+use crate::column::writer::encoder::ColumnValueEncoder;
+use crate::file::properties::WriterProperties;
+use crate::schema::types::ColumnDescriptor;
+
+/// Picks byte-budget-aware mini-batch sizes for one column.
+pub(crate) struct ByteBudgetChunker {
+ /// Configured data page byte limit for the column.
+ page_byte_limit: usize,
+ /// Max definition level of the column; a level equal to this marks a
+ /// present (non-null) leaf value. Used to count values per chunk.
+ max_def_level: i16,
+ /// `true` when no chunk of `base_batch_size` values can ever overflow
+ /// `page_byte_limit` regardless of input. Set once at column open from
+ /// the physical type's known per-value byte size; lets the per-chunk
+ /// decision short-circuit with no work for every numeric, bool, or
+ /// narrow `FIXED_LEN_BYTE_ARRAY` column.
+ static_always_fits: bool,
+ /// Configured dictionary page byte limit for the column.
+ dict_page_byte_limit: usize,
+ /// As [`Self::static_always_fits`] but for the dictionary page: `true`
+ /// when one `base_batch_size` mini-batch of this fixed-width type cannot
+ /// overshoot `dict_page_byte_limit` by more than one mini-batch's worth.
+ static_dict_always_fits: bool,
+}
+
+impl ByteBudgetChunker {
+ #[inline]
+ pub(crate) fn new(
+ descr: &ColumnDescriptor,
+ props: &WriterProperties,
+ base_batch_size: usize,
+ ) -> Self {
+ let page_byte_limit = props.column_data_page_size_limit(descr.path());
+ let dict_page_byte_limit =
props.column_dictionary_page_size_limit(descr.path());
+ let static_bytes_per_value = match descr.physical_type() {
+ Type::BOOLEAN => Some(1),
+ Type::INT32 | Type::FLOAT => Some(std::mem::size_of::<i32>()),
+ Type::INT64 | Type::DOUBLE => Some(std::mem::size_of::<i64>()),
+ Type::INT96 => Some(12),
+ Type::FIXED_LEN_BYTE_ARRAY => Some(descr.type_length().max(0) as
usize),
+ Type::BYTE_ARRAY => None,
+ };
+ let static_fits = |limit: usize| {
Review Comment:
The closure is used twice — once for the data-page limit
(`static_always_fits`) and once for the dictionary-page limit
(`static_dict_always_fits`) — so it avoids duplicating the
`map(...).unwrap_or(false)` expression. Happy to inline both call sites if
you'd prefer it flatter, but I kept the closure as the DRY option.
##########
parquet/src/data_type.rs:
##########
@@ -726,6 +726,17 @@ pub(crate) mod private {
(std::mem::size_of::<Self>(), 1)
}
+ /// Estimated encoded byte size of this value when serialized into a
+ /// plain-encoded data page. Used by the column writer to decide
+ /// whether to mini-batch a chunk in one call or value-by-value, so
+ /// that a single mini-batch of very large `BYTE_ARRAY` values can't
+ /// push a page far past the configured page byte limit before the
+ /// post-write size check fires.
+ #[inline]
+ fn byte_size(&self) -> usize {
Review Comment:
This was refactored away: the per-value size logic now lives in the free
function `plain_encoded_byte_size` (`column/writer/encoder.rs`), which
*derives* from `dict_encoding_size` rather than duplicating it, and this branch
no longer modifies `data_type.rs`. Should also dovetail with the rename in
#9700. Resolving.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]