This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 9949226f0a perf(parquet): LevelInfoBuilder batch write when no
repetition childs (#10037)
9949226f0a is described below
commit 9949226f0ab644c701e6ed283db32989bbf6b006
Author: mwish <[email protected]>
AuthorDate: Wed Jun 3 21:33:22 2026 +0800
perf(parquet): LevelInfoBuilder batch write when no repetition childs
(#10037)
# Which issue does this PR close?
- Closes #10023 .
# Rationale for this change
Parquet writer writes lists element one by one, this is extremly slow.
This patch batches writes.
# What changes are included in this PR?
Batches write when writing list with maximum rep level.
# Are these changes tested?
Covered by existing
# Are there any user-facing changes?
No
---
parquet/src/arrow/arrow_writer/levels.rs | 191 +++++++++++++++++++++++++++++--
1 file changed, 180 insertions(+), 11 deletions(-)
diff --git a/parquet/src/arrow/arrow_writer/levels.rs
b/parquet/src/arrow/arrow_writer/levels.rs
index 10f90f707c..e577cf73a8 100644
--- a/parquet/src/arrow/arrow_writer/levels.rs
+++ b/parquet/src/arrow/arrow_writer/levels.rs
@@ -118,6 +118,7 @@ enum LevelInfoBuilder {
LevelContext, // Context
OffsetBuffer<i32>, // Offsets
Option<NullBuffer>, // Nulls
+ bool, // is_last_level (child has no nested rep)
),
/// A large list array
LargeList(
@@ -125,6 +126,7 @@ enum LevelInfoBuilder {
LevelContext, // Context
OffsetBuffer<i64>, // Offsets
Option<NullBuffer>, // Nulls
+ bool, // is_last_level (child has no nested rep)
),
/// A fixed size list array
FixedSizeList(
@@ -223,22 +225,31 @@ impl LevelInfoBuilder {
DataType::List(_) => {
let list = array.as_list();
let child = Self::try_new(child.as_ref(), ctx,
list.values())?;
+ let is_last = child.child_has_no_nested_rep();
let offsets = list.offsets().clone();
- Self::List(Box::new(child), ctx, offsets,
list.nulls().cloned())
+ Self::List(
+ Box::new(child),
+ ctx,
+ offsets,
+ list.nulls().cloned(),
+ is_last,
+ )
}
DataType::LargeList(_) => {
let list = array.as_list();
let child = Self::try_new(child.as_ref(), ctx,
list.values())?;
+ let is_last = child.child_has_no_nested_rep();
let offsets = list.offsets().clone();
let nulls = list.nulls().cloned();
- Self::LargeList(Box::new(child), ctx, offsets, nulls)
+ Self::LargeList(Box::new(child), ctx, offsets, nulls,
is_last)
}
DataType::Map(_, _) => {
let map = array.as_map();
let entries = Arc::new(map.entries().clone()) as
ArrayRef;
let child = Self::try_new(child.as_ref(), ctx,
&entries)?;
+ let is_last = child.child_has_no_nested_rep();
let offsets = map.offsets().clone();
- Self::List(Box::new(child), ctx, offsets,
map.nulls().cloned())
+ Self::List(Box::new(child), ctx, offsets,
map.nulls().cloned(), is_last)
}
DataType::FixedSizeList(_, size) => {
let list = array.as_fixed_size_list();
@@ -274,8 +285,8 @@ impl LevelInfoBuilder {
fn finish(self) -> Vec<ArrayLevels> {
match self {
LevelInfoBuilder::Primitive(v) => vec![v],
- LevelInfoBuilder::List(v, _, _, _)
- | LevelInfoBuilder::LargeList(v, _, _, _)
+ LevelInfoBuilder::List(v, _, _, _, _)
+ | LevelInfoBuilder::LargeList(v, _, _, _, _)
| LevelInfoBuilder::FixedSizeList(v, _, _, _)
| LevelInfoBuilder::ListView(v, _, _, _, _)
| LevelInfoBuilder::LargeListView(v, _, _, _, _) => v.finish(),
@@ -287,11 +298,11 @@ impl LevelInfoBuilder {
fn write(&mut self, range: Range<usize>) {
match self {
LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
- LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
- Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
+ LevelInfoBuilder::List(child, ctx, offsets, nulls, is_last) => {
+ Self::write_list(child, ctx, offsets, nulls.as_ref(), range,
*is_last)
}
- LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
- Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
+ LevelInfoBuilder::LargeList(child, ctx, offsets, nulls, is_last)
=> {
+ Self::write_list(child, ctx, offsets, nulls.as_ref(), range,
*is_last)
}
LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(),
range)
@@ -308,6 +319,19 @@ impl LevelInfoBuilder {
}
}
+ /// Returns `true` if the child contains no nested repetition levels,
meaning
+ /// each child element produces exactly one rep_level entry in the leaf.
+ /// This is true for `Primitive` children and `Struct` trees with no list
descendants.
+ fn child_has_no_nested_rep(&self) -> bool {
+ match self {
+ LevelInfoBuilder::Primitive(_) => true,
+ LevelInfoBuilder::Struct(children, _, _) => {
+ children.iter().all(|c| c.child_has_no_nested_rep())
+ }
+ _ => false,
+ }
+ }
+
/// Write `range` elements from ListArray `array`
///
/// Note: MapArrays are `ListArray<i32>` under the hood and so are
dispatched to this method
@@ -317,6 +341,7 @@ impl LevelInfoBuilder {
offsets: &[O],
nulls: Option<&NullBuffer>,
range: Range<usize>,
+ is_last_level: bool,
) {
// Fast path: entire list array is null; emit bulk null rep/def levels
if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) {
@@ -327,6 +352,18 @@ impl LevelInfoBuilder {
return;
}
+ // Fast path for "last-level list": when the child has no nested
rep_levels,
+ // each child element produces exactly one rep_level entry. We can
batch
+ // contiguous non-empty list slots into a single child.write() call,
then
+ // fix up the rep_levels at list-slot boundaries using offsets
directly.
+ //
+ // Kept as a separate function so the compiler can optimize
write_list's
+ // hot loop independently (function body size affects codegen quality).
+ if is_last_level {
+ Self::write_list_last_level(child, ctx, offsets, nulls, range);
+ return;
+ }
+
let offsets = &offsets[range.start..range.end + 1];
let write_non_null_slice =
@@ -427,6 +464,138 @@ impl LevelInfoBuilder {
}
}
+ /// Optimized write path for lists whose child has no nested repetition
levels.
+ ///
+ /// When the child is a leaf (or a struct of leaves), each child element
maps to
+ /// exactly one rep_level entry. This lets us batch contiguous non-empty
list
+ /// slots into a single `child.write()` call, then stamp the list-start
markers
+ /// at positions computed directly from offsets — avoiding per-slot
`write` +
+ /// reverse-scan overhead.
+ fn write_list_last_level<O: OffsetSizeTrait>(
+ child: &mut LevelInfoBuilder,
+ ctx: &LevelContext,
+ offsets: &[O],
+ nulls: Option<&NullBuffer>,
+ range: Range<usize>,
+ ) {
+ let null_offset = range.start;
+ let offsets = &offsets[range.start..range.end + 1];
+ let list_start_rep = ctx.rep_level - 1;
+
+ let emit_nulls = |child: &mut LevelInfoBuilder, count: usize| {
+ child.visit_leaves(|leaf| {
+ leaf.append_rep_level_run(list_start_rep, count);
+ leaf.append_def_level_run(ctx.def_level - 2, count);
+ });
+ };
+
+ let emit_empties = |child: &mut LevelInfoBuilder, count: usize| {
+ child.visit_leaves(|leaf| {
+ leaf.append_rep_level_run(list_start_rep, count);
+ leaf.append_def_level_run(ctx.def_level - 1, count);
+ });
+ };
+
+ let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets:
&[O]| {
+ debug_assert!(run_offsets.len() >= 2);
+ let values_start = run_offsets[0].as_usize();
+ let values_end = run_offsets[run_offsets.len() - 1].as_usize();
+ debug_assert!(values_end > values_start);
+
+ // Write all leaf values in one batch. Since the child has no
nested
+ // rep, this emits (values_end - values_start) rep_levels all equal
+ // to ctx.rep_level (= "continuation within list").
+ child.write(values_start..values_end);
+
+ // The first element of each list slot needs rep_level =
+ // list_start_rep to mark a new list boundary. Because there's a
1:1
+ // mapping between child elements and rep_level entries, the
position
+ // of each slot's first element is directly computable from
offsets.
+ child.visit_leaves(|leaf| {
+ let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
+ let batch_len = values_end - values_start;
+ let batch_base = rep_levels.len() - batch_len;
+
+ for slot_offset in run_offsets.iter().take(run_offsets.len() -
1) {
+ let list_start_pos = batch_base + (slot_offset.as_usize()
- values_start);
+ rep_levels[list_start_pos] = list_start_rep;
+ }
+ });
+ };
+
+ // Classify each slot, detect run boundaries, flush on transition.
+ #[derive(Clone, Copy, PartialEq)]
+ enum SlotKind {
+ Null,
+ Empty,
+ NonEmpty,
+ }
+
+ let num_slots = offsets.len() - 1;
+ if num_slots == 0 {
+ return;
+ }
+
+ macro_rules! classify {
+ ($i:expr, $nulls:expr) => {
+ if !$nulls.is_valid($i + null_offset) {
+ SlotKind::Null
+ } else if offsets[$i] == offsets[$i + 1] {
+ SlotKind::Empty
+ } else {
+ SlotKind::NonEmpty
+ }
+ };
+ }
+
+ macro_rules! flush_run {
+ ($kind:expr, $start:expr, $end:expr) => {
+ match $kind {
+ SlotKind::Null => emit_nulls(child, $end - $start),
+ SlotKind::Empty => emit_empties(child, $end - $start),
+ SlotKind::NonEmpty => emit_non_empty_run(child,
&offsets[$start..$end + 1]),
+ }
+ };
+ }
+
+ match nulls {
+ Some(nulls) => {
+ let mut run_kind = classify!(0, nulls);
+ let mut run_start: usize = 0;
+ for i in 1..num_slots {
+ let kind = classify!(i, nulls);
+ if kind != run_kind {
+ flush_run!(run_kind, run_start, i);
+ run_kind = kind;
+ run_start = i;
+ }
+ }
+ flush_run!(run_kind, run_start, num_slots);
+ }
+ None => {
+ let mut run_kind = if offsets[0] == offsets[1] {
+ SlotKind::Empty
+ } else {
+ SlotKind::NonEmpty
+ };
+ let mut run_start: usize = 0;
+ for i in 1..num_slots {
+ let kind = if offsets[i] == offsets[i + 1] {
+ SlotKind::Empty
+ } else {
+ SlotKind::NonEmpty
+ };
+ if kind != run_kind {
+ flush_run!(run_kind, run_start, i);
+ run_kind = kind;
+ run_start = i;
+ }
+ }
+ flush_run!(run_kind, run_start, num_slots);
+ }
+ }
+ }
+
/// Write `range` elements from ListViewArray `array`
fn write_list_view<O: OffsetSizeTrait>(
child: &mut LevelInfoBuilder,
@@ -734,8 +903,8 @@ impl LevelInfoBuilder {
fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
match self {
LevelInfoBuilder::Primitive(info) => visit(info),
- LevelInfoBuilder::List(c, _, _, _)
- | LevelInfoBuilder::LargeList(c, _, _, _)
+ LevelInfoBuilder::List(c, _, _, _, _)
+ | LevelInfoBuilder::LargeList(c, _, _, _, _)
| LevelInfoBuilder::FixedSizeList(c, _, _, _)
| LevelInfoBuilder::ListView(c, _, _, _, _)
| LevelInfoBuilder::LargeListView(c, _, _, _, _) =>
c.visit_leaves(visit),