adriangb commented on code in PR #9972:
URL: https://github.com/apache/arrow-rs/pull/9972#discussion_r3352025568
##########
parquet/tests/arrow_writer_layout.rs:
##########
@@ -429,10 +429,16 @@ fn test_string() {
page_type: PageType::DATA_PAGE,
},
],
+ // The byte-budget chunker sub-batches the dictionary
+ // phase. The mini-batch deliberately includes the value
+ // that crosses the 1000-byte limit so the spill triggers
+ // on this chunk rather than carrying a sliver into the
+ // next page (see #9972 discussion), giving a 126-row
Review Comment:
Dropped the dangling `#9972 discussion` reference — the surrounding comment
already explains the boundary-inclusion. Resolved in 2acb3f2060.
##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -481,6 +483,90 @@ impl ColumnValueEncoder for ByteArrayEncoder {
Ok(())
}
+ fn count_values_within_byte_budget_gather(
+ values: &Self::Values,
+ indices: &[usize],
+ byte_budget: usize,
+ ) -> Option<usize> {
+ // `ByteArrayEncoder` only ever writes via `write_gather`, so this
+ // is the relevant method.
+ //
+ // Two-stage walk for the simple offset-buffer byte array types:
+ // 1. If indices are contiguous, compute the total payload in
+ // O(1) via a single subtraction on the offsets buffer.
+ // When the total fits the budget — the overwhelmingly
+ // common "small values" case — return immediately.
+ // 2. Otherwise, walk per-value byte sizes from the offsets
+ // buffer (still cheap, no slice/UTF-8 construction) and
+ // exit at the first value that pushes the cumulative sum
+ // past the budget. This bounds skewed distributions: an
+ // outlier value is caught wherever it lands in the chunk.
+ let count = match values.data_type() {
+ DataType::Utf8 => count_within_budget_offsets(
+ values.as_any().downcast_ref::<StringArray>().unwrap(),
+ indices,
+ byte_budget,
+ ),
+ DataType::LargeUtf8 => count_within_budget_offsets(
+ values.as_any().downcast_ref::<LargeStringArray>().unwrap(),
+ indices,
+ byte_budget,
+ ),
+ DataType::Binary => count_within_budget_offsets(
+ values.as_any().downcast_ref::<BinaryArray>().unwrap(),
+ indices,
+ byte_budget,
+ ),
+ DataType::LargeBinary => count_within_budget_offsets(
+ values.as_any().downcast_ref::<LargeBinaryArray>().unwrap(),
+ indices,
+ byte_budget,
+ ),
+ // View arrays carry each value's length in the low 32 bits of
+ // its u128 view word, so lengths are scannable without touching
+ // any data buffer — and the common small-value case skips even
+ // that scan via an O(1) conservative bound.
+ DataType::Utf8View => {
+ let array =
values.as_any().downcast_ref::<StringViewArray>().unwrap();
+ count_within_budget_views(
+ array.views(),
+ indices,
+ byte_budget,
+ max_view_value_len(array.data_buffers()),
+ )
+ }
+ DataType::BinaryView => {
+ let array =
values.as_any().downcast_ref::<BinaryViewArray>().unwrap();
+ count_within_budget_views(
+ array.views(),
+ indices,
+ byte_budget,
+ max_view_value_len(array.data_buffers()),
+ )
+ }
+ // For arrow Dictionary input, treat every chunk as fitting
+ // and stay on the batched path. The arrow array being
+ // Dictionary-encoded in the first place implies its values
+ // are small enough that dedup is worthwhile, which is the
+ // opposite of the "5 MiB blob per row" case this fix
Review Comment:
Trimmed to: the values in an arrow dictionary are already small and
deduplicated, so there's nothing to bound. Resolved in 2acb3f2060.
##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -593,6 +679,142 @@ where
}
}
+/// Cumulative-scan fallback used for byte array types that don't expose
+/// a single contiguous offsets buffer — view arrays, dictionary arrays,
+/// fixed-size binary. Returns `indices.len()` if every value fits the
+/// budget, otherwise the smallest `k ≥ 1` whose first `k` values' encoded
+/// size first exceeds `byte_budget` — i.e. the boundary value is included
+/// so the caller's page-flush check trips immediately on this mini-batch,
+/// without leaving a sliver to glue onto the next page.
+///
+/// Free function so it can be used with `downcast_op!`.
+fn count_within_budget_accessor<T>(values: T, indices: &[usize], byte_budget:
usize) -> usize
+where
+ T: ArrayAccessor + Copy,
+ T::Item: AsRef<[u8]>,
+{
+ let mut cum: usize = 0;
+ for (i, idx) in indices.iter().enumerate() {
+ let value_len = values.value(*idx).as_ref().len() +
std::mem::size_of::<u32>();
+ cum = cum.saturating_add(value_len);
+ if cum > byte_budget {
+ return i + 1;
+ }
+ }
+ indices.len()
+}
+
+/// Upper bound on any single value's byte length in a view array.
+///
+/// An inline view stores at most 12 bytes; an
Review Comment:
Demoted the explanation to a `//` comment next to the code. Resolved in
2acb3f2060.
##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -593,6 +679,142 @@ where
}
}
+/// Cumulative-scan fallback used for byte array types that don't expose
+/// a single contiguous offsets buffer — view arrays, dictionary arrays,
+/// fixed-size binary. Returns `indices.len()` if every value fits the
+/// budget, otherwise the smallest `k ≥ 1` whose first `k` values' encoded
+/// size first exceeds `byte_budget` — i.e. the boundary value is included
+/// so the caller's page-flush check trips immediately on this mini-batch,
+/// without leaving a sliver to glue onto the next page.
+///
+/// Free function so it can be used with `downcast_op!`.
+fn count_within_budget_accessor<T>(values: T, indices: &[usize], byte_budget:
usize) -> usize
+where
+ T: ArrayAccessor + Copy,
+ T::Item: AsRef<[u8]>,
+{
+ let mut cum: usize = 0;
+ for (i, idx) in indices.iter().enumerate() {
+ let value_len = values.value(*idx).as_ref().len() +
std::mem::size_of::<u32>();
+ cum = cum.saturating_add(value_len);
+ if cum > byte_budget {
+ return i + 1;
+ }
+ }
+ indices.len()
+}
+
+/// Upper bound on any single value's byte length in a view array.
+///
+/// An inline view stores at most 12 bytes; an
+/// out-of-line view's data is a contiguous slice of exactly one data
+/// buffer, so it cannot be longer than the largest data buffer. This is a
+/// loose bound (a value is usually far smaller than a whole buffer) but it
+/// is O(number of buffers) and always sound.
+fn max_view_value_len(buffers: &[Buffer]) -> usize {
+ /// Bytes that fit inline in a u128 view word (the rest is len + prefix).
+ const MAX_INLINE_VIEW_LEN: usize = 12;
+ buffers
+ .iter()
+ .map(|b| b.len())
+ .max()
+ .unwrap_or(0)
+ .max(MAX_INLINE_VIEW_LEN)
+}
+
+/// Two-stage budget count for view arrays (`Utf8View`, `BinaryView`).
Review Comment:
Added a one-line summary of what it returns (leading values whose cumulative
plain-encoded size fits the budget); moved the two-stage walkthrough inline.
Resolved in 2acb3f2060.
##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -593,6 +679,142 @@ where
}
}
+/// Cumulative-scan fallback used for byte array types that don't expose
+/// a single contiguous offsets buffer — view arrays, dictionary arrays,
+/// fixed-size binary. Returns `indices.len()` if every value fits the
+/// budget, otherwise the smallest `k ≥ 1` whose first `k` values' encoded
+/// size first exceeds `byte_budget` — i.e. the boundary value is included
+/// so the caller's page-flush check trips immediately on this mini-batch,
+/// without leaving a sliver to glue onto the next page.
+///
+/// Free function so it can be used with `downcast_op!`.
+fn count_within_budget_accessor<T>(values: T, indices: &[usize], byte_budget:
usize) -> usize
+where
+ T: ArrayAccessor + Copy,
+ T::Item: AsRef<[u8]>,
+{
+ let mut cum: usize = 0;
+ for (i, idx) in indices.iter().enumerate() {
+ let value_len = values.value(*idx).as_ref().len() +
std::mem::size_of::<u32>();
+ cum = cum.saturating_add(value_len);
+ if cum > byte_budget {
+ return i + 1;
+ }
+ }
+ indices.len()
+}
+
+/// Upper bound on any single value's byte length in a view array.
+///
+/// An inline view stores at most 12 bytes; an
+/// out-of-line view's data is a contiguous slice of exactly one data
+/// buffer, so it cannot be longer than the largest data buffer. This is a
+/// loose bound (a value is usually far smaller than a whole buffer) but it
+/// is O(number of buffers) and always sound.
+fn max_view_value_len(buffers: &[Buffer]) -> usize {
+ /// Bytes that fit inline in a u128 view word (the rest is len + prefix).
+ const MAX_INLINE_VIEW_LEN: usize = 12;
+ buffers
+ .iter()
+ .map(|b| b.len())
+ .max()
+ .unwrap_or(0)
+ .max(MAX_INLINE_VIEW_LEN)
+}
+
+/// Two-stage budget count for view arrays (`Utf8View`, `BinaryView`).
+///
+/// 1. View arrays have no prefix-sum offsets buffer, so the exact O(1)
+/// span subtraction `count_within_budget_offsets` uses is unavailable.
+/// But a *conservative* O(1) bound is: every value is at most
+/// `max_value_len` bytes, so the whole chunk fits the budget when
+/// `n * (max_value_len + 4) <= byte_budget`. This skips the per-value
+/// walk for the common small-value case — what view arrays are built
+/// for, and exactly the case where there is nothing to bound.
+/// 2. Otherwise scan per-value lengths from the low 32 bits of each u128
+/// view word (no data-buffer dereference) and stop at the first value
+/// that pushes the cumulative sum past the budget.
+fn count_within_budget_views(
+ views: &[u128],
+ indices: &[usize],
+ byte_budget: usize,
+ max_value_len: usize,
+) -> usize {
+ // Stage 1: O(1) conservative upper bound.
+ let per_value = max_value_len + std::mem::size_of::<u32>();
+ if indices.len().saturating_mul(per_value) <= byte_budget {
+ return indices.len();
+ }
+ // Stage 2: exact per-value scan.
+ let mut cum: usize = 0;
+ for (i, idx) in indices.iter().enumerate() {
+ let len = (views[*idx] as u32) as usize;
+ cum = cum.saturating_add(len + std::mem::size_of::<u32>());
Review Comment:
It's the 4-byte length prefix that PLAIN `BYTE_ARRAY` encoding writes per
value (not part of the string content), so the budget matches the bytes
actually written to the page. Added an inline comment saying exactly that.
Resolved in 2acb3f2060.
##########
parquet/src/arrow/arrow_writer/byte_array.rs:
##########
@@ -593,6 +679,142 @@ where
}
}
+/// Cumulative-scan fallback used for byte array types that don't expose
+/// a single contiguous offsets buffer — view arrays, dictionary arrays,
+/// fixed-size binary. Returns `indices.len()` if every value fits the
+/// budget, otherwise the smallest `k ≥ 1` whose first `k` values' encoded
+/// size first exceeds `byte_budget` — i.e. the boundary value is included
+/// so the caller's page-flush check trips immediately on this mini-batch,
+/// without leaving a sliver to glue onto the next page.
+///
+/// Free function so it can be used with `downcast_op!`.
+fn count_within_budget_accessor<T>(values: T, indices: &[usize], byte_budget:
usize) -> usize
+where
+ T: ArrayAccessor + Copy,
+ T::Item: AsRef<[u8]>,
+{
+ let mut cum: usize = 0;
+ for (i, idx) in indices.iter().enumerate() {
+ let value_len = values.value(*idx).as_ref().len() +
std::mem::size_of::<u32>();
+ cum = cum.saturating_add(value_len);
+ if cum > byte_budget {
+ return i + 1;
+ }
+ }
+ indices.len()
+}
+
+/// Upper bound on any single value's byte length in a view array.
+///
+/// An inline view stores at most 12 bytes; an
+/// out-of-line view's data is a contiguous slice of exactly one data
+/// buffer, so it cannot be longer than the largest data buffer. This is a
+/// loose bound (a value is usually far smaller than a whole buffer) but it
+/// is O(number of buffers) and always sound.
+fn max_view_value_len(buffers: &[Buffer]) -> usize {
+ /// Bytes that fit inline in a u128 view word (the rest is len + prefix).
+ const MAX_INLINE_VIEW_LEN: usize = 12;
+ buffers
+ .iter()
+ .map(|b| b.len())
+ .max()
+ .unwrap_or(0)
+ .max(MAX_INLINE_VIEW_LEN)
+}
+
+/// Two-stage budget count for view arrays (`Utf8View`, `BinaryView`).
+///
+/// 1. View arrays have no prefix-sum offsets buffer, so the exact O(1)
+/// span subtraction `count_within_budget_offsets` uses is unavailable.
+/// But a *conservative* O(1) bound is: every value is at most
+/// `max_value_len` bytes, so the whole chunk fits the budget when
+/// `n * (max_value_len + 4) <= byte_budget`. This skips the per-value
+/// walk for the common small-value case — what view arrays are built
+/// for, and exactly the case where there is nothing to bound.
+/// 2. Otherwise scan per-value lengths from the low 32 bits of each u128
+/// view word (no data-buffer dereference) and stop at the first value
+/// that pushes the cumulative sum past the budget.
+fn count_within_budget_views(
+ views: &[u128],
+ indices: &[usize],
+ byte_budget: usize,
+ max_value_len: usize,
+) -> usize {
+ // Stage 1: O(1) conservative upper bound.
+ let per_value = max_value_len + std::mem::size_of::<u32>();
+ if indices.len().saturating_mul(per_value) <= byte_budget {
+ return indices.len();
+ }
+ // Stage 2: exact per-value scan.
+ let mut cum: usize = 0;
+ for (i, idx) in indices.iter().enumerate() {
+ let len = (views[*idx] as u32) as usize;
+ cum = cum.saturating_add(len + std::mem::size_of::<u32>());
+ if cum > byte_budget {
+ return i + 1;
+ }
+ }
+ indices.len()
+}
+
+/// Two-stage fast path for `GenericByteArray<O>`
Review Comment:
Added a one-line return summary; the rationale is now inline. Resolved in
2acb3f2060.
##########
parquet/src/column/writer/encoder.rs:
##########
@@ -411,3 +507,38 @@ where
}
}
}
+
+/// Plain-encoded byte cost of a single value of type `T::T`.
+///
+/// Derived from [`ParquetValueType::dict_encoding_size`] so we don't add a
+/// parallel per-value-size hook to the trait. The components returned by
+/// `dict_encoding_size` are `(per-value overhead, value-bytes)`. For
+/// plain encoding the on-disk layout is:
+///
+/// - `BYTE_ARRAY`: 4-byte length prefix + payload bytes = `overhead + bytes`.
+/// - `FIXED_LEN_BYTE_ARRAY`: raw bytes only, taken from the type descriptor's
+/// `type_length`. The value's own `dict_encoding_size` reports the length
+/// prefix, which is irrelevant for plain FLBA encoding; the encoder passes
+/// `type_length` directly.
+/// - Everything else (numeric / bool): a constant per-value size; the caller
+/// already short-circuits these via `mem::size_of::<T::T>()` before
+/// touching this function, so this branch is unreachable in practice and
+/// we fall back to `overhead` defensively.
+///
+/// See `dict_encoder.rs::push` (line ~52) for the matching dispatch.
Review Comment:
Fixed — replaced with a plain-text reference to `KeyStorage::push` in
`encodings/encoding/dict_encoder.rs`. (An intra-doc `[]` link would break
rustdoc since `KeyStorage` is private.) Resolved in 2acb3f2060.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]