alamb commented on code in PR #9972:
URL: https://github.com/apache/arrow-rs/pull/9972#discussion_r3350826219


##########
parquet/tests/arrow_writer_layout.rs:
##########
@@ -429,10 +429,16 @@ fn test_string() {
                             page_type: PageType::DATA_PAGE,
                         },
                     ],
+                    // The byte-budget chunker sub-batches the dictionary
+                    // phase. The mini-batch deliberately includes the value
+                    // that crosses the 1000-byte limit so the spill triggers
+                    // on this chunk rather than carrying a sliver into the
+                    // next page (see #9972 discussion), giving a 126-row

Review Comment:
   can we make this an actual link to a discussion (I couldn't find exactly 
which one it is referring to on https://github.com/apache/arrow-rs/pull/9972)



##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -593,6 +679,142 @@ where
     }
 }
 
+/// Cumulative-scan fallback used for byte array types that don't expose

Review Comment:
   if this is only used for FixedSizedBinaryArray maybe it would be simpler if 
it were not generic



##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -593,6 +679,142 @@ where
     }
 }
 
+/// Cumulative-scan fallback used for byte array types that don't expose
+/// a single contiguous offsets buffer — view arrays, dictionary arrays,
+/// fixed-size binary. Returns `indices.len()` if every value fits the
+/// budget, otherwise the smallest `k ≥ 1` whose first `k` values' encoded
+/// size first exceeds `byte_budget` — i.e. the boundary value is included
+/// so the caller's page-flush check trips immediately on this mini-batch,
+/// without leaving a sliver to glue onto the next page.
+///
+/// Free function so it can be used with `downcast_op!`.
+fn count_within_budget_accessor<T>(values: T, indices: &[usize], byte_budget: 
usize) -> usize
+where
+    T: ArrayAccessor + Copy,
+    T::Item: AsRef<[u8]>,
+{
+    let mut cum: usize = 0;
+    for (i, idx) in indices.iter().enumerate() {
+        let value_len = values.value(*idx).as_ref().len() + 
std::mem::size_of::<u32>();
+        cum = cum.saturating_add(value_len);
+        if cum > byte_budget {
+            return i + 1;
+        }
+    }
+    indices.len()
+}
+
+/// Upper bound on any single value's byte length in a view array.
+///
+/// An inline view stores at most 12 bytes; an
+/// out-of-line view's data is a contiguous slice of exactly one data
+/// buffer, so it cannot be longer than the largest data buffer. This is a
+/// loose bound (a value is usually far smaller than a whole buffer) but it
+/// is O(number of buffers) and always sound.
+fn max_view_value_len(buffers: &[Buffer]) -> usize {
+    /// Bytes that fit inline in a u128 view word (the rest is len + prefix).
+    const MAX_INLINE_VIEW_LEN: usize = 12;
+    buffers
+        .iter()
+        .map(|b| b.len())
+        .max()
+        .unwrap_or(0)
+        .max(MAX_INLINE_VIEW_LEN)
+}
+
+/// Two-stage budget count for view arrays (`Utf8View`, `BinaryView`).
+///
+///   1. View arrays have no prefix-sum offsets buffer, so the exact O(1)
+///      span subtraction `count_within_budget_offsets` uses is unavailable.
+///      But a *conservative* O(1) bound is: every value is at most
+///      `max_value_len` bytes, so the whole chunk fits the budget when
+///      `n * (max_value_len + 4) <= byte_budget`. This skips the per-value
+///      walk for the common small-value case — what view arrays are built
+///      for, and exactly the case where there is nothing to bound.
+///   2. Otherwise scan per-value lengths from the low 32 bits of each u128
+///      view word (no data-buffer dereference) and stop at the first value
+///      that pushes the cumulative sum past the budget.
+fn count_within_budget_views(
+    views: &[u128],
+    indices: &[usize],
+    byte_budget: usize,
+    max_value_len: usize,
+) -> usize {
+    // Stage 1: O(1) conservative upper bound.
+    let per_value = max_value_len + std::mem::size_of::<u32>();
+    if indices.len().saturating_mul(per_value) <= byte_budget {
+        return indices.len();
+    }
+    // Stage 2: exact per-value scan.
+    let mut cum: usize = 0;
+    for (i, idx) in indices.iter().enumerate() {
+        let len = (views[*idx] as u32) as usize;
+        cum = cum.saturating_add(len + std::mem::size_of::<u32>());

Review Comment:
   why is it adding the length of a `u32` ? Those 4 bytes don't appear as part 
of the actual string content 🤔 



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -4904,6 +4904,245 @@ mod tests {
         assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
     }
 
+    #[test]
+    fn test_arrow_writer_caps_page_size_for_large_strings() {
+        // End-to-end coverage for 
`ByteArrayEncoder::estimated_value_bytes_gather`
+        // and the offsets-buffer fast path in `estimate_byte_size_offsets`.
+        //
+        // The standard column-writer regression test covers the same
+        // logic for the non-arrow path; this one drives writes through
+        // `ArrowWriter`, which is what real users hit and which uses
+        // `write_gather` with `non_null_indices` (always contiguous for
+        // a non-null column).
+        let value_size = 64 * 1024;
+        let page_byte_limit = 16 * 1024;
+        let num_rows = 32;
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "col",
+            ArrowDataType::Utf8,
+            false,
+        )]));
+        let strings: Vec<String> = (0..num_rows).map(|_| 
"x".repeat(value_size)).collect();
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(StringArray::from(strings)) as _],
+        )
+        .unwrap();
+
+        let props = WriterProperties::builder()
+            .set_dictionary_enabled(false)

Review Comment:
   Does this setting do anything when dictionary is enabled (aka the default)? 



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -4904,6 +4904,245 @@ mod tests {
         assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
     }
 
+    #[test]
+    fn test_arrow_writer_caps_page_size_for_large_strings() {
+        // End-to-end coverage for 
`ByteArrayEncoder::estimated_value_bytes_gather`
+        // and the offsets-buffer fast path in `estimate_byte_size_offsets`.
+        //
+        // The standard column-writer regression test covers the same
+        // logic for the non-arrow path; this one drives writes through
+        // `ArrowWriter`, which is what real users hit and which uses
+        // `write_gather` with `non_null_indices` (always contiguous for
+        // a non-null column).
+        let value_size = 64 * 1024;
+        let page_byte_limit = 16 * 1024;
+        let num_rows = 32;
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "col",
+            ArrowDataType::Utf8,
+            false,
+        )]));
+        let strings: Vec<String> = (0..num_rows).map(|_| 
"x".repeat(value_size)).collect();
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(StringArray::from(strings)) as _],
+        )
+        .unwrap();
+
+        let props = WriterProperties::builder()
+            .set_dictionary_enabled(false)
+            .set_data_page_size_limit(page_byte_limit)
+            .build();
+        let mut writer = ArrowWriter::try_new(Vec::new(), schema, 
Some(props)).unwrap();
+        writer.write(&batch).unwrap();
+        let data = Bytes::from(writer.into_inner().unwrap());
+
+        let mut metadata = ParquetMetaDataReader::new();
+        metadata.try_parse(&data).unwrap();
+        let metadata = metadata.finish().unwrap();
+        let col_meta = metadata.row_group(0).column(0);
+        let mut reader =
+            SerializedPageReader::new(Arc::new(data.clone()), col_meta, 
num_rows, None).unwrap();
+
+        let mut data_pages = Vec::new();
+        while let Some(page) = reader.get_next_page().unwrap() {
+            if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) 
{
+                data_pages.push(page.buffer().len());
+            }
+        }
+
+        assert!(
+            data_pages.len() >= num_rows / 2,
+            "expected at least {} data pages for {num_rows} large strings via 
ArrowWriter, got {} ({data_pages:?})",
+            num_rows / 2,
+            data_pages.len(),
+        );
+        for size in &data_pages {
+            assert!(
+                *size <= value_size + 64,
+                "page size {size} exceeds one-value bound; pages 
{data_pages:?}"
+            );
+        }
+    }
+
+    #[test]
+    fn test_arrow_writer_granular_mode_roundtrip() {
+        // Granular mode subdivides chunks and writes more pages than
+        // `main`. Make sure the data we write back is bit-identical to
+        // what went in — page-count assertions elsewhere only prove
+        // pages were cut, not that the encoded data is correct.
+        //
+        // Mix value sizes so that the cumulative-byte-budget cutoff
+        // lands mid-chunk, exercising both batched and granular paths
+        // within the same `write_batch_internal` call.
+        let small = "tiny".to_string();
+        let big = "x".repeat(64 * 1024);
+        let strings: Vec<String> = (0..256)
+            .map(|i| {
+                if i % 16 == 0 {
+                    big.clone()
+                } else {
+                    small.clone()
+                }
+            })
+            .collect();
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "col",
+            ArrowDataType::Utf8,
+            false,
+        )]));
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(StringArray::from(strings.clone())) as _],
+        )
+        .unwrap();
+
+        let props = WriterProperties::builder()
+            .set_dictionary_enabled(false)
+            .set_data_page_size_limit(16 * 1024)
+            .build();
+        let mut writer = ArrowWriter::try_new(Vec::new(), schema, 
Some(props)).unwrap();
+        writer.write(&batch).unwrap();
+        let data = Bytes::from(writer.into_inner().unwrap());
+
+        let mut reader = ParquetRecordBatchReader::try_new(data, 
1024).unwrap();
+        let read = reader.next().unwrap().unwrap();
+        assert!(reader.next().is_none(), "expected one batch");
+        let col = read
+            .column(0)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(col.len(), strings.len());
+        for (i, expected) in strings.iter().enumerate() {
+            assert_eq!(
+                col.value(i),
+                expected.as_str(),
+                "value mismatch at index {i}"
+            );
+        }
+    }
+
+    #[test]
+    fn test_arrow_writer_caps_page_size_for_large_string_view() {
+        // Coverage for the view/dict fallback in
+        // `ByteArrayEncoder::count_values_within_byte_budget_gather`.
+        // `Utf8View` arrays don't expose a single contiguous offsets
+        // buffer, so the gather method falls back to the per-value walk
+        // via `ArrayAccessor::value`.
+        use arrow_array::StringViewArray;
+        let value_size = 64 * 1024;
+        let page_byte_limit = 16 * 1024;
+        let num_rows = 32usize;
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "col",
+            ArrowDataType::Utf8View,
+            false,
+        )]));
+        let strings: Vec<String> = (0..num_rows).map(|_| 
"x".repeat(value_size)).collect();

Review Comment:
   you might be able to avoid allocating 64k strings by just doing it once and 
then using &str



##########
parquet/src/column/writer/encoder.rs:
##########
@@ -247,6 +283,66 @@ impl<T: DataType> ColumnValueEncoder for 
ColumnValueEncoderImpl<T> {
         self.write_slice(&slice)
     }
 
+    fn count_values_within_byte_budget(
+        values: &[T::T],
+        offset: usize,
+        len: usize,
+        byte_budget: usize,
+    ) -> Option<usize> {
+        // Clamp so that a caller-supplied `len` that overruns the input
+        // (e.g. a level/value mismatch the encoder will reject later)
+        // returns an estimate instead of panicking here.
+        let end = (offset + len).min(values.len());
+        let start = offset.min(end);
+        let n = end - start;
+        // Fixed-size physical types have a constant per-value byte cost,
+        // so the answer is one division — no need to walk the slice.
+        let phys = <T::T as ParquetValueType>::PHYSICAL_TYPE;
+        if phys != Type::BYTE_ARRAY && phys != Type::FIXED_LEN_BYTE_ARRAY {
+            let per = std::mem::size_of::<T::T>().max(1);
+            let fits = (byte_budget / per).max(1);
+            return Some(fits.min(n));
+        }
+        // Variable-width: scan, accumulate, exit at the first value that
+        // pushes us past the budget. This both bounds skewed
+        // distributions (one outlier among small values is caught when
+        // it lands, regardless of position) and short-circuits when an
+        // early value alone exceeds the budget. The boundary value is
+        // included in the returned count so the caller's page-flush check
+        // trips on this mini-batch rather than leaving a sliver to glue
+        // onto the next page (see #9972 discussion).
+        let mut cum: usize = 0;
+        for (i, v) in values[start..end].iter().enumerate() {
+            cum = cum.saturating_add(plain_encoded_byte_size::<T>(v));
+            if cum > byte_budget {
+                return Some(i + 1);
+            }
+        }
+        Some(n)
+    }
+
+    fn count_values_within_byte_budget_gather(
+        values: &[T::T],
+        indices: &[usize],
+        byte_budget: usize,
+    ) -> Option<usize> {
+        let phys = <T::T as ParquetValueType>::PHYSICAL_TYPE;

Review Comment:
   isn't this almost eactly the same as count_values_within_byte_budget ? Maybe 
it could call `count_values_within_byte_budget` and avoid the code repetition



##########
parquet/src/column/writer/encoder.rs:
##########
@@ -411,3 +507,38 @@ where
         }
     }
 }
+
+/// Plain-encoded byte cost of a single value of type `T::T`.
+///
+/// Derived from [`ParquetValueType::dict_encoding_size`] so we don't add a
+/// parallel per-value-size hook to the trait. The components returned by
+/// `dict_encoding_size` are `(per-value overhead, value-bytes)`. For
+/// plain encoding the on-disk layout is:
+///
+/// - `BYTE_ARRAY`: 4-byte length prefix + payload bytes = `overhead + bytes`.

Review Comment:
   as above this seems to be a parallel implementation in english that I think 
would be more helpful  if the salient rationale was `//` next to the relevant 
code (rather than in doc comments)



##########
parquet/src/column/writer/mod.rs:
##########
@@ -713,6 +760,78 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
         })
     }
 
+    /// Writes a chunk in sub-batches sized by the caller-supplied
+    /// `sub_batch_size` so the post-write data page byte limit check
+    /// fires before the chunk can grossly overshoot
+    /// `data_page_size_limit`.
+    ///
+    /// For flat (unrepeated) columns sub-batches contain up to
+    /// `sub_batch_size` levels each. For repeated/nested columns
+    /// sub-batches step from one `rep == 0` boundary to the next so a
+    /// record never spans data pages, matching the parquet format rule.
+    ///
+    /// Returns the total number of values consumed across all sub-batches.
+    ///
+    /// `#[inline(never)]` keeps this slow path — only reached for
+    /// variable-width columns whose values need page splitting — out of
+    /// the hot `write_batch_internal` loop.
+    #[allow(clippy::too_many_arguments)]
+    #[inline(never)]
+    fn write_granular_chunk(
+        &mut self,
+        values: &E::Values,
+        values_offset: usize,
+        value_indices: Option<&[usize]>,
+        chunk_size: usize,
+        chunk_def: LevelDataRef<'_>,
+        chunk_rep: LevelDataRef<'_>,
+        sub_batch_size: usize,
+    ) -> Result<usize> {
+        let mut values_consumed = 0;
+        let mut sub_start = 0;
+        while sub_start < chunk_size {
+            let sub_end = match chunk_rep {
+                LevelDataRef::Materialized(levels) => {
+                    // Pack up to `sub_batch_size` levels per mini-batch, then
+                    // extend to the next record boundary (rep == 0) so a
+                    // record never spans data pages. Packing whole records
+                    // rather than stepping one record at a time avoids
+                    // emitting a `write_mini_batch` per record: records

Review Comment:
   ```suggestion
                       // rather than stepping one record at a time avoids
                       // calling `write_mini_batch` per record
   ```



##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -593,6 +679,142 @@ where
     }
 }
 
+/// Cumulative-scan fallback used for byte array types that don't expose
+/// a single contiguous offsets buffer — view arrays, dictionary arrays,
+/// fixed-size binary. Returns `indices.len()` if every value fits the
+/// budget, otherwise the smallest `k ≥ 1` whose first `k` values' encoded
+/// size first exceeds `byte_budget` — i.e. the boundary value is included
+/// so the caller's page-flush check trips immediately on this mini-batch,
+/// without leaving a sliver to glue onto the next page.
+///
+/// Free function so it can be used with `downcast_op!`.
+fn count_within_budget_accessor<T>(values: T, indices: &[usize], byte_budget: 
usize) -> usize
+where
+    T: ArrayAccessor + Copy,
+    T::Item: AsRef<[u8]>,
+{
+    let mut cum: usize = 0;
+    for (i, idx) in indices.iter().enumerate() {
+        let value_len = values.value(*idx).as_ref().len() + 
std::mem::size_of::<u32>();
+        cum = cum.saturating_add(value_len);
+        if cum > byte_budget {
+            return i + 1;
+        }
+    }
+    indices.len()
+}
+
+/// Upper bound on any single value's byte length in a view array.
+///
+/// An inline view stores at most 12 bytes; an
+/// out-of-line view's data is a contiguous slice of exactly one data
+/// buffer, so it cannot be longer than the largest data buffer. This is a
+/// loose bound (a value is usually far smaller than a whole buffer) but it
+/// is O(number of buffers) and always sound.
+fn max_view_value_len(buffers: &[Buffer]) -> usize {
+    /// Bytes that fit inline in a u128 view word (the rest is len + prefix).
+    const MAX_INLINE_VIEW_LEN: usize = 12;
+    buffers
+        .iter()
+        .map(|b| b.len())
+        .max()
+        .unwrap_or(0)
+        .max(MAX_INLINE_VIEW_LEN)
+}
+
+/// Two-stage budget count for view arrays (`Utf8View`, `BinaryView`).

Review Comment:
   I think it would help here to summarize what this function does -- namely 
returns the number of rows (views) whose cumulative length fit within 
`byte_budget` more bytes.
   
   The rest of this comment is basically an english description of what the 
code is doing -- it would probably be more helpful next to the code in question



##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -481,6 +483,90 @@ impl ColumnValueEncoder for ByteArrayEncoder {
         Ok(())
     }
 
+    fn count_values_within_byte_budget_gather(
+        values: &Self::Values,
+        indices: &[usize],
+        byte_budget: usize,
+    ) -> Option<usize> {
+        // `ByteArrayEncoder` only ever writes via `write_gather`, so this
+        // is the relevant method.
+        //
+        // Two-stage walk for the simple offset-buffer byte array types:
+        //   1. If indices are contiguous, compute the total payload in
+        //      O(1) via a single subtraction on the offsets buffer.
+        //      When the total fits the budget — the overwhelmingly
+        //      common "small values" case — return immediately.
+        //   2. Otherwise, walk per-value byte sizes from the offsets
+        //      buffer (still cheap, no slice/UTF-8 construction) and
+        //      exit at the first value that pushes the cumulative sum
+        //      past the budget. This bounds skewed distributions: an
+        //      outlier value is caught wherever it lands in the chunk.
+        let count = match values.data_type() {
+            DataType::Utf8 => count_within_budget_offsets(
+                values.as_any().downcast_ref::<StringArray>().unwrap(),
+                indices,
+                byte_budget,
+            ),
+            DataType::LargeUtf8 => count_within_budget_offsets(
+                values.as_any().downcast_ref::<LargeStringArray>().unwrap(),
+                indices,
+                byte_budget,
+            ),
+            DataType::Binary => count_within_budget_offsets(
+                values.as_any().downcast_ref::<BinaryArray>().unwrap(),
+                indices,
+                byte_budget,
+            ),
+            DataType::LargeBinary => count_within_budget_offsets(
+                values.as_any().downcast_ref::<LargeBinaryArray>().unwrap(),
+                indices,
+                byte_budget,
+            ),
+            // View arrays carry each value's length in the low 32 bits of
+            // its u128 view word, so lengths are scannable without touching
+            // any data buffer — and the common small-value case skips even
+            // that scan via an O(1) conservative bound.
+            DataType::Utf8View => {
+                let array = 
values.as_any().downcast_ref::<StringViewArray>().unwrap();
+                count_within_budget_views(
+                    array.views(),
+                    indices,
+                    byte_budget,
+                    max_view_value_len(array.data_buffers()),
+                )
+            }
+            DataType::BinaryView => {
+                let array = 
values.as_any().downcast_ref::<BinaryViewArray>().unwrap();
+                count_within_budget_views(
+                    array.views(),
+                    indices,
+                    byte_budget,
+                    max_view_value_len(array.data_buffers()),
+                )
+            }
+            // For arrow Dictionary input, treat every chunk as fitting
+            // and stay on the batched path. The arrow array being
+            // Dictionary-encoded in the first place implies its values
+            // are small enough that dedup is worthwhile, which is the
+            // opposite of the "5 MiB blob per row" case this fix

Review Comment:
   I think the "this fix" comment is not going to be helpful in the future
   
   Just pointing out that the code assumes that the values in a dictionary are 
already small and deduplicated is probably enough for now



##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -593,6 +679,142 @@ where
     }
 }
 
+/// Cumulative-scan fallback used for byte array types that don't expose
+/// a single contiguous offsets buffer — view arrays, dictionary arrays,
+/// fixed-size binary. Returns `indices.len()` if every value fits the
+/// budget, otherwise the smallest `k ≥ 1` whose first `k` values' encoded
+/// size first exceeds `byte_budget` — i.e. the boundary value is included
+/// so the caller's page-flush check trips immediately on this mini-batch,
+/// without leaving a sliver to glue onto the next page.
+///
+/// Free function so it can be used with `downcast_op!`.
+fn count_within_budget_accessor<T>(values: T, indices: &[usize], byte_budget: 
usize) -> usize
+where
+    T: ArrayAccessor + Copy,
+    T::Item: AsRef<[u8]>,
+{
+    let mut cum: usize = 0;
+    for (i, idx) in indices.iter().enumerate() {
+        let value_len = values.value(*idx).as_ref().len() + 
std::mem::size_of::<u32>();
+        cum = cum.saturating_add(value_len);
+        if cum > byte_budget {
+            return i + 1;
+        }
+    }
+    indices.len()
+}
+
+/// Upper bound on any single value's byte length in a view array.
+///
+/// An inline view stores at most 12 bytes; an

Review Comment:
   nit: this is an implementation detail, so probably would be easier / better 
if if were a `//` comment next to the code 



##########
parquet/src/column/writer/mod.rs:
##########
@@ -713,6 +760,78 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
         })
     }
 
+    /// Writes a chunk in sub-batches sized by the caller-supplied
+    /// `sub_batch_size` so the post-write data page byte limit check
+    /// fires before the chunk can grossly overshoot
+    /// `data_page_size_limit`.
+    ///
+    /// For flat (unrepeated) columns sub-batches contain up to
+    /// `sub_batch_size` levels each. For repeated/nested columns
+    /// sub-batches step from one `rep == 0` boundary to the next so a
+    /// record never spans data pages, matching the parquet format rule.
+    ///
+    /// Returns the total number of values consumed across all sub-batches.
+    ///
+    /// `#[inline(never)]` keeps this slow path — only reached for
+    /// variable-width columns whose values need page splitting — out of
+    /// the hot `write_batch_internal` loop.
+    #[allow(clippy::too_many_arguments)]
+    #[inline(never)]
+    fn write_granular_chunk(
+        &mut self,
+        values: &E::Values,
+        values_offset: usize,
+        value_indices: Option<&[usize]>,
+        chunk_size: usize,
+        chunk_def: LevelDataRef<'_>,
+        chunk_rep: LevelDataRef<'_>,
+        sub_batch_size: usize,
+    ) -> Result<usize> {
+        let mut values_consumed = 0;
+        let mut sub_start = 0;
+        while sub_start < chunk_size {
+            let sub_end = match chunk_rep {
+                LevelDataRef::Materialized(levels) => {
+                    // Pack up to `sub_batch_size` levels per mini-batch, then
+                    // extend to the next record boundary (rep == 0) so a
+                    // record never spans data pages. Packing whole records
+                    // rather than stepping one record at a time avoids
+                    // emitting a `write_mini_batch` per record: records
+                    // average only a handful of levels, so the
+                    // record-at-a-time loop issued roughly `sub_batch_size`×
+                    // more mini-batches than necessary.
+                    let mut e = (sub_start + sub_batch_size).min(chunk_size);
+                    while e < chunk_size && levels[e] != 0 {
+                        e += 1;
+                    }
+                    // `sub_batch_size` can be 0 when the chunker sizes a

Review Comment:
   I read the comments for the chunker says it always produces at least one 
record 🤔 



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -4904,6 +4904,245 @@ mod tests {
         assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
     }
 
+    #[test]
+    fn test_arrow_writer_caps_page_size_for_large_strings() {
+        // End-to-end coverage for 
`ByteArrayEncoder::estimated_value_bytes_gather`
+        // and the offsets-buffer fast path in `estimate_byte_size_offsets`.
+        //
+        // The standard column-writer regression test covers the same
+        // logic for the non-arrow path; this one drives writes through
+        // `ArrowWriter`, which is what real users hit and which uses
+        // `write_gather` with `non_null_indices` (always contiguous for
+        // a non-null column).
+        let value_size = 64 * 1024;
+        let page_byte_limit = 16 * 1024;
+        let num_rows = 32;
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "col",
+            ArrowDataType::Utf8,
+            false,
+        )]));
+        let strings: Vec<String> = (0..num_rows).map(|_| 
"x".repeat(value_size)).collect();
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(StringArray::from(strings)) as _],
+        )
+        .unwrap();
+
+        let props = WriterProperties::builder()
+            .set_dictionary_enabled(false)
+            .set_data_page_size_limit(page_byte_limit)
+            .build();
+        let mut writer = ArrowWriter::try_new(Vec::new(), schema, 
Some(props)).unwrap();
+        writer.write(&batch).unwrap();
+        let data = Bytes::from(writer.into_inner().unwrap());
+
+        let mut metadata = ParquetMetaDataReader::new();
+        metadata.try_parse(&data).unwrap();
+        let metadata = metadata.finish().unwrap();
+        let col_meta = metadata.row_group(0).column(0);
+        let mut reader =
+            SerializedPageReader::new(Arc::new(data.clone()), col_meta, 
num_rows, None).unwrap();
+
+        let mut data_pages = Vec::new();
+        while let Some(page) = reader.get_next_page().unwrap() {
+            if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) 
{
+                data_pages.push(page.buffer().len());
+            }
+        }
+
+        assert!(
+            data_pages.len() >= num_rows / 2,
+            "expected at least {} data pages for {num_rows} large strings via 
ArrowWriter, got {} ({data_pages:?})",
+            num_rows / 2,
+            data_pages.len(),
+        );
+        for size in &data_pages {
+            assert!(
+                *size <= value_size + 64,
+                "page size {size} exceeds one-value bound; pages 
{data_pages:?}"
+            );
+        }
+    }
+
+    #[test]
+    fn test_arrow_writer_granular_mode_roundtrip() {
+        // Granular mode subdivides chunks and writes more pages than
+        // `main`. Make sure the data we write back is bit-identical to
+        // what went in — page-count assertions elsewhere only prove
+        // pages were cut, not that the encoded data is correct.
+        //
+        // Mix value sizes so that the cumulative-byte-budget cutoff
+        // lands mid-chunk, exercising both batched and granular paths
+        // within the same `write_batch_internal` call.
+        let small = "tiny".to_string();
+        let big = "x".repeat(64 * 1024);
+        let strings: Vec<String> = (0..256)
+            .map(|i| {
+                if i % 16 == 0 {
+                    big.clone()
+                } else {
+                    small.clone()
+                }
+            })
+            .collect();
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "col",
+            ArrowDataType::Utf8,
+            false,
+        )]));
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(StringArray::from(strings.clone())) as _],
+        )
+        .unwrap();
+
+        let props = WriterProperties::builder()
+            .set_dictionary_enabled(false)
+            .set_data_page_size_limit(16 * 1024)
+            .build();
+        let mut writer = ArrowWriter::try_new(Vec::new(), schema, 
Some(props)).unwrap();
+        writer.write(&batch).unwrap();
+        let data = Bytes::from(writer.into_inner().unwrap());
+
+        let mut reader = ParquetRecordBatchReader::try_new(data, 
1024).unwrap();
+        let read = reader.next().unwrap().unwrap();
+        assert!(reader.next().is_none(), "expected one batch");
+        let col = read
+            .column(0)
+            .as_any()
+            .downcast_ref::<StringArray>()
+            .unwrap();
+        assert_eq!(col.len(), strings.len());
+        for (i, expected) in strings.iter().enumerate() {
+            assert_eq!(
+                col.value(i),
+                expected.as_str(),
+                "value mismatch at index {i}"
+            );
+        }
+    }
+
+    #[test]
+    fn test_arrow_writer_caps_page_size_for_large_string_view() {
+        // Coverage for the view/dict fallback in
+        // `ByteArrayEncoder::count_values_within_byte_budget_gather`.
+        // `Utf8View` arrays don't expose a single contiguous offsets
+        // buffer, so the gather method falls back to the per-value walk
+        // via `ArrayAccessor::value`.
+        use arrow_array::StringViewArray;

Review Comment:
   there is a lot of repetition in these tests which makes it hard for me to 
understand what cases are covered and what is not. 
   
   Could you perhaps make a test fixgure -- something like
   
   ```rust
   let test_result = WriterLayoutTest::new()
     .with_value_size(64*1024)
     .with_page_byte_limit(16*1024)
     .with_num_rows(32)
     .run();
   
   // test_result includes Vec<usize> of datapages
     assert!(
               result.data_pages.len() >= num_rows / 2,
               "expected pages bounded by byte budget for Utf8View, got 
{data_pages:?}"
           );
   // additional asserts here
   ```
   
   I think that would probably reduce the size of this PR significantly and 
make it easier to undestand and validate what was being tested



##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -593,6 +679,142 @@ where
     }
 }
 
+/// Cumulative-scan fallback used for byte array types that don't expose
+/// a single contiguous offsets buffer — view arrays, dictionary arrays,
+/// fixed-size binary. Returns `indices.len()` if every value fits the
+/// budget, otherwise the smallest `k ≥ 1` whose first `k` values' encoded
+/// size first exceeds `byte_budget` — i.e. the boundary value is included
+/// so the caller's page-flush check trips immediately on this mini-batch,
+/// without leaving a sliver to glue onto the next page.
+///
+/// Free function so it can be used with `downcast_op!`.
+fn count_within_budget_accessor<T>(values: T, indices: &[usize], byte_budget: 
usize) -> usize
+where
+    T: ArrayAccessor + Copy,
+    T::Item: AsRef<[u8]>,
+{
+    let mut cum: usize = 0;
+    for (i, idx) in indices.iter().enumerate() {
+        let value_len = values.value(*idx).as_ref().len() + 
std::mem::size_of::<u32>();
+        cum = cum.saturating_add(value_len);
+        if cum > byte_budget {
+            return i + 1;
+        }
+    }
+    indices.len()
+}
+
+/// Upper bound on any single value's byte length in a view array.
+///
+/// An inline view stores at most 12 bytes; an
+/// out-of-line view's data is a contiguous slice of exactly one data
+/// buffer, so it cannot be longer than the largest data buffer. This is a
+/// loose bound (a value is usually far smaller than a whole buffer) but it
+/// is O(number of buffers) and always sound.
+fn max_view_value_len(buffers: &[Buffer]) -> usize {
+    /// Bytes that fit inline in a u128 view word (the rest is len + prefix).
+    const MAX_INLINE_VIEW_LEN: usize = 12;
+    buffers
+        .iter()
+        .map(|b| b.len())
+        .max()
+        .unwrap_or(0)
+        .max(MAX_INLINE_VIEW_LEN)
+}
+
+/// Two-stage budget count for view arrays (`Utf8View`, `BinaryView`).
+///
+///   1. View arrays have no prefix-sum offsets buffer, so the exact O(1)
+///      span subtraction `count_within_budget_offsets` uses is unavailable.
+///      But a *conservative* O(1) bound is: every value is at most
+///      `max_value_len` bytes, so the whole chunk fits the budget when
+///      `n * (max_value_len + 4) <= byte_budget`. This skips the per-value
+///      walk for the common small-value case — what view arrays are built
+///      for, and exactly the case where there is nothing to bound.
+///   2. Otherwise scan per-value lengths from the low 32 bits of each u128
+///      view word (no data-buffer dereference) and stop at the first value
+///      that pushes the cumulative sum past the budget.
+fn count_within_budget_views(
+    views: &[u128],
+    indices: &[usize],
+    byte_budget: usize,
+    max_value_len: usize,
+) -> usize {
+    // Stage 1: O(1) conservative upper bound.
+    let per_value = max_value_len + std::mem::size_of::<u32>();
+    if indices.len().saturating_mul(per_value) <= byte_budget {
+        return indices.len();
+    }
+    // Stage 2: exact per-value scan.
+    let mut cum: usize = 0;
+    for (i, idx) in indices.iter().enumerate() {
+        let len = (views[*idx] as u32) as usize;
+        cum = cum.saturating_add(len + std::mem::size_of::<u32>());
+        if cum > byte_budget {
+            return i + 1;
+        }
+    }
+    indices.len()
+}
+
+/// Two-stage fast path for `GenericByteArray<O>`

Review Comment:
   same comment here about doc comments should summarize what the function 
returns and then implementation details can be inline comments, if they still 
are useful



##########
parquet/src/arrow/arrow_writer/mod.rs:
##########
@@ -4904,6 +4904,245 @@ mod tests {
         assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
     }
 
+    #[test]
+    fn test_arrow_writer_caps_page_size_for_large_strings() {
+        // End-to-end coverage for 
`ByteArrayEncoder::estimated_value_bytes_gather`
+        // and the offsets-buffer fast path in `estimate_byte_size_offsets`.
+        //
+        // The standard column-writer regression test covers the same
+        // logic for the non-arrow path; this one drives writes through
+        // `ArrowWriter`, which is what real users hit and which uses
+        // `write_gather` with `non_null_indices` (always contiguous for
+        // a non-null column).
+        let value_size = 64 * 1024;
+        let page_byte_limit = 16 * 1024;
+        let num_rows = 32;
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "col",
+            ArrowDataType::Utf8,
+            false,
+        )]));
+        let strings: Vec<String> = (0..num_rows).map(|_| 
"x".repeat(value_size)).collect();
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(StringArray::from(strings)) as _],
+        )
+        .unwrap();
+
+        let props = WriterProperties::builder()
+            .set_dictionary_enabled(false)
+            .set_data_page_size_limit(page_byte_limit)
+            .build();
+        let mut writer = ArrowWriter::try_new(Vec::new(), schema, 
Some(props)).unwrap();
+        writer.write(&batch).unwrap();
+        let data = Bytes::from(writer.into_inner().unwrap());
+
+        let mut metadata = ParquetMetaDataReader::new();
+        metadata.try_parse(&data).unwrap();
+        let metadata = metadata.finish().unwrap();
+        let col_meta = metadata.row_group(0).column(0);
+        let mut reader =
+            SerializedPageReader::new(Arc::new(data.clone()), col_meta, 
num_rows, None).unwrap();
+
+        let mut data_pages = Vec::new();
+        while let Some(page) = reader.get_next_page().unwrap() {
+            if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) 
{
+                data_pages.push(page.buffer().len());
+            }
+        }
+
+        assert!(
+            data_pages.len() >= num_rows / 2,
+            "expected at least {} data pages for {num_rows} large strings via 
ArrowWriter, got {} ({data_pages:?})",
+            num_rows / 2,
+            data_pages.len(),
+        );
+        for size in &data_pages {
+            assert!(
+                *size <= value_size + 64,
+                "page size {size} exceeds one-value bound; pages 
{data_pages:?}"
+            );
+        }
+    }
+
+    #[test]
+    fn test_arrow_writer_granular_mode_roundtrip() {
+        // Granular mode subdivides chunks and writes more pages than
+        // `main`. Make sure the data we write back is bit-identical to
+        // what went in — page-count assertions elsewhere only prove
+        // pages were cut, not that the encoded data is correct.
+        //
+        // Mix value sizes so that the cumulative-byte-budget cutoff
+        // lands mid-chunk, exercising both batched and granular paths
+        // within the same `write_batch_internal` call.
+        let small = "tiny".to_string();
+        let big = "x".repeat(64 * 1024);
+        let strings: Vec<String> = (0..256)
+            .map(|i| {
+                if i % 16 == 0 {
+                    big.clone()
+                } else {
+                    small.clone()
+                }
+            })
+            .collect();
+
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "col",
+            ArrowDataType::Utf8,
+            false,
+        )]));
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(StringArray::from(strings.clone())) as _],

Review Comment:
   do we need to clone this (also not sure if we need String or if this could 
just be Vec<&str> and save a bunch of copies



##########
parquet/src/column/writer/byte_budget_chunker.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Decides how many levels of a chunk to write as one mini-batch so that
+//! the resulting data page stays within `data_page_size_limit`.
+//!
+//! The parquet column writer checks the data page byte limit only *after*
+//! each mini-batch finishes writing. Mini-batches are sized in rows
+//! (`write_batch_size`, default 1024), so for BYTE_ARRAY columns whose
+//! values are large (e.g. multi-MiB blobs) a single mini-batch can buffer
+//! GiB into one page before the limit is consulted.
+//!
+//! This module isolates the per-chunk decision that prevents that: given a
+//! chunk's level data and the input values, pick the largest `sub_batch_size`
+//! such that one mini-batch fits in one page byte budget. For the
+//! overwhelmingly common case (small or fixed-width values) the answer is
+//! just `chunk_size` and the decision is O(1) on the column type — only
+//! when the input might overflow does the chunker consult the encoder's
+//! byte estimate.
+
+use crate::basic::Type;
+use crate::column::writer::LevelDataRef;
+use crate::column::writer::encoder::ColumnValueEncoder;
+use crate::file::properties::WriterProperties;
+use crate::schema::types::ColumnDescriptor;
+
+/// Picks byte-budget-aware mini-batch sizes for one column.
+pub(crate) struct ByteBudgetChunker {
+    /// Configured data page byte limit for the column.
+    page_byte_limit: usize,
+    /// Max definition level of the column; a level equal to this marks a
+    /// present (non-null) leaf value. Used to count values per chunk.
+    max_def_level: i16,
+    /// `true` when no chunk of `base_batch_size` values can ever overflow
+    /// `page_byte_limit` regardless of input. Set once at column open from
+    /// the physical type's known per-value byte size; lets the per-chunk
+    /// decision short-circuit with no work for every numeric, bool, or
+    /// narrow `FIXED_LEN_BYTE_ARRAY` column.
+    static_always_fits: bool,
+    /// Configured dictionary page byte limit for the column.
+    dict_page_byte_limit: usize,
+    /// As [`Self::static_always_fits`] but for the dictionary page: `true`
+    /// when one `base_batch_size` mini-batch of this fixed-width type cannot
+    /// overshoot `dict_page_byte_limit` by more than one mini-batch's worth.
+    static_dict_always_fits: bool,
+}
+
+impl ByteBudgetChunker {
+    #[inline]
+    pub(crate) fn new(
+        descr: &ColumnDescriptor,
+        props: &WriterProperties,
+        base_batch_size: usize,
+    ) -> Self {
+        let page_byte_limit = props.column_data_page_size_limit(descr.path());
+        let dict_page_byte_limit = 
props.column_dictionary_page_size_limit(descr.path());
+        let static_bytes_per_value = match descr.physical_type() {
+            Type::BOOLEAN => Some(1),
+            Type::INT32 | Type::FLOAT => Some(std::mem::size_of::<i32>()),
+            Type::INT64 | Type::DOUBLE => Some(std::mem::size_of::<i64>()),
+            Type::INT96 => Some(12),
+            Type::FIXED_LEN_BYTE_ARRAY => Some(descr.type_length().max(0) as 
usize),
+            Type::BYTE_ARRAY => None,
+        };
+        let static_fits = |limit: usize| {

Review Comment:
   why use a closure here rather than just explicitly calculating 
static_always_fits?



##########
parquet/src/column/writer/byte_budget_chunker.rs:
##########
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Decides how many levels of a chunk to write as one mini-batch so that

Review Comment:
   this is a good background -- it might be easier to find as comments on 
`ByteBudgetChunker` and maybe the module level could just include a link to 
`ByteBudgetChunker`
   



##########
parquet/src/column/writer/encoder.rs:
##########
@@ -411,3 +507,38 @@ where
         }
     }
 }
+
+/// Plain-encoded byte cost of a single value of type `T::T`.
+///
+/// Derived from [`ParquetValueType::dict_encoding_size`] so we don't add a
+/// parallel per-value-size hook to the trait. The components returned by
+/// `dict_encoding_size` are `(per-value overhead, value-bytes)`. For
+/// plain encoding the on-disk layout is:
+///
+/// - `BYTE_ARRAY`: 4-byte length prefix + payload bytes = `overhead + bytes`.
+/// - `FIXED_LEN_BYTE_ARRAY`: raw bytes only, taken from the type descriptor's
+///   `type_length`. The value's own `dict_encoding_size` reports the length
+///   prefix, which is irrelevant for plain FLBA encoding; the encoder passes
+///   `type_length` directly.
+/// - Everything else (numeric / bool): a constant per-value size; the caller
+///   already short-circuits these via `mem::size_of::<T::T>()` before
+///   touching this function, so this branch is unreachable in practice and
+///   we fall back to `overhead` defensively.
+///
+/// See `dict_encoder.rs::push` (line ~52) for the matching dispatch.

Review Comment:
   this link will likely be broken immediaetly -- can you change this to link 
to the actual method ?



##########
parquet/src/column/writer/mod.rs:
##########
@@ -713,6 +760,78 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
         })
     }
 
+    /// Writes a chunk in sub-batches sized by the caller-supplied
+    /// `sub_batch_size` so the post-write data page byte limit check
+    /// fires before the chunk can grossly overshoot
+    /// `data_page_size_limit`.
+    ///
+    /// For flat (unrepeated) columns sub-batches contain up to
+    /// `sub_batch_size` levels each. For repeated/nested columns
+    /// sub-batches step from one `rep == 0` boundary to the next so a
+    /// record never spans data pages, matching the parquet format rule.
+    ///
+    /// Returns the total number of values consumed across all sub-batches.
+    ///
+    /// `#[inline(never)]` keeps this slow path — only reached for
+    /// variable-width columns whose values need page splitting — out of
+    /// the hot `write_batch_internal` loop.
+    #[allow(clippy::too_many_arguments)]
+    #[inline(never)]
+    fn write_granular_chunk(
+        &mut self,
+        values: &E::Values,
+        values_offset: usize,
+        value_indices: Option<&[usize]>,
+        chunk_size: usize,
+        chunk_def: LevelDataRef<'_>,
+        chunk_rep: LevelDataRef<'_>,
+        sub_batch_size: usize,
+    ) -> Result<usize> {
+        let mut values_consumed = 0;
+        let mut sub_start = 0;
+        while sub_start < chunk_size {
+            let sub_end = match chunk_rep {
+                LevelDataRef::Materialized(levels) => {
+                    // Pack up to `sub_batch_size` levels per mini-batch, then
+                    // extend to the next record boundary (rep == 0) so a
+                    // record never spans data pages. Packing whole records
+                    // rather than stepping one record at a time avoids
+                    // emitting a `write_mini_batch` per record: records
+                    // average only a handful of levels, so the

Review Comment:
   what is this talking about "record-at-a-time loop issued" it sounds like it 
is a comment about a former version of this PR 🤔 



##########
parquet/src/column/writer/mod.rs:
##########
@@ -2676,6 +2795,500 @@ mod tests {
         assert_eq!(other_values, vec![10]);
     }
 
+    #[test]
+    fn test_column_writer_caps_page_size_for_large_byte_array_values() {
+        // Regression: the post-write data page byte limit check only fires
+        // at mini-batch boundaries, so a 1024-row mini-batch of multi-MiB
+        // BYTE_ARRAY values used to buffer multiple GiB into a single page
+        // before the limit was even consulted. With the threshold-based
+        // granular mode this batch should split into ~one page per value.
+        let value_size = 64 * 1024; // 64 KiB per value
+        let page_byte_limit = 16 * 1024; // 16 KiB page limit
+        let num_rows = 64;
+
+        let mut file = tempfile::tempfile().unwrap();
+        let mut write = TrackedWrite::new(&mut file);
+        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
+        let props = Arc::new(
+            WriterProperties::builder()
+                .set_writer_version(WriterVersion::PARQUET_1_0)
+                .set_dictionary_enabled(false)
+                .set_encoding(Encoding::PLAIN)
+                .set_data_page_size_limit(page_byte_limit)
+                // Default write_batch_size (1024) — without the fix this
+                // buffers the entire input into a single ~4 MiB page.
+                .build(),
+        );
+
+        let mut data = Vec::with_capacity(num_rows);

Review Comment:
   My comment from earlier applies here too -- there is so much boiler plate in 
these tests it is hard to understand what they are really covering and how they 
are varying from test to test -- refactoring into a common fixture would likely 
help a lot



##########
parquet/src/column/writer/mod.rs:
##########
@@ -555,14 +577,39 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, 
E> {
                 }
             }
 
-            values_offset += self.write_mini_batch(
+            let chunk_size = end_offset - levels_offset;
+            let chunk_def = def_levels.slice(levels_offset, chunk_size);
+            let chunk_rep = rep_levels.slice(levels_offset, chunk_size);
+
+            let sub_batch_size = chunker.pick_sub_batch_size(

Review Comment:
   a few comments might help here -- basically that this is the key decision 
point: decide if we can write the entire mini batch without any more page size 
checks or if we need to fallback to more accurate page size accounting



##########
parquet/src/column/writer/mod.rs:
##########
@@ -713,6 +760,78 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> 
{
         })
     }
 
+    /// Writes a chunk in sub-batches sized by the caller-supplied
+    /// `sub_batch_size` so the post-write data page byte limit check
+    /// fires before the chunk can grossly overshoot
+    /// `data_page_size_limit`.
+    ///

Review Comment:
   Maybe we can reduce the jargon here -- something like
   ```rust
   ///  Writes a chunk in `sub_batch_size` chunks, checking the 
   /// the data page byte before each. This keeps the page size closer to
   /// `data_page_size_limit`.
   ```



##########
parquet/tests/arrow_writer_layout.rs:
##########


Review Comment:
   I am surprised you didn't add more new LayoutTests -- this framework seems 
to be exactly what you are trying to to test here (the output page layout of 
writing parquet files)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to