This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new a05129a08e feat(parquet): stream-encode definition/repetition levels
incrementally (#9447)
a05129a08e is described below
commit a05129a08ecf2f1f39f7eba56eb1d76848f79c3a
Author: Hippolyte Barraud <[email protected]>
AuthorDate: Wed Apr 1 15:21:19 2026 -0400
feat(parquet): stream-encode definition/repetition levels incrementally
(#9447)
# Which issue does this PR close?
- Closes #9446.
- closes https://github.com/apache/arrow-rs/pull/9636
# Rationale for this change
When writing a Parquet column with very sparse data,
`GenericColumnWriter` accumulates unbounded memory for definition and
repetition levels. The raw `i16` values are appended into `Vec<i16>`
sinks on every `write_batch` call and only RLE-encoded in bulk when a
data page is flushed. For a column that is almost entirely nulls, the
actual RLE-encoded output can be tiny, yet the intermediate buffer grows
linearly with the number of rows.
# What changes are included in this PR?
Replace the two raw-level `Vec<i16>` sinks (`def_levels_sink` /
`rep_levels_sink`) with streaming `LevelEncoder` fields
(`def_levels_encoder` / `rep_levels_encoder`). Behavior is the same, but
we keep running RLE-encoded state rather than the full list of rows in
memory. Existing logic is reused.
# Are these changes tested?
Yes, all tests passing.
Benchmarks show no regression. `list_primitive` benches improved by
3-5%:
```
Benchmarking list_primitive/default: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase
target time to 6.1s, enable flat sampling, or reduce sample count to 60.
list_primitive/default time: [1.2109 ms 1.2171 ms 1.2248 ms]
thrpt: [1.6999 GiB/s 1.7105 GiB/s 1.7194 GiB/s]
change:
time: [−3.7197% −2.8848% −2.0036%] (p = 0.00 <
0.05)
thrpt: [+2.0445% +2.9705% +3.8634%]
Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
3 (3.00%) high mild
1 (1.00%) high severe
Benchmarking list_primitive/bloom_filter: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase
target time to 7.5s, enable flat sampling, or reduce sample count to 50.
list_primitive/bloom_filter
time: [1.4405 ms 1.4810 ms 1.5292 ms]
thrpt: [1.3615 GiB/s 1.4058 GiB/s 1.4452 GiB/s]
change:
time: [−6.4332% −4.7568% −2.9048%] (p = 0.00 <
0.05)
thrpt: [+2.9917% +4.9944% +6.8755%]
Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
2 (2.00%) high mild
3 (3.00%) high severe
Benchmarking list_primitive/parquet_2: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase
target time to 6.3s, enable flat sampling, or reduce sample count to 60.
list_primitive/parquet_2
time: [1.2271 ms 1.2311 ms 1.2362 ms]
thrpt: [1.6841 GiB/s 1.6911 GiB/s 1.6966 GiB/s]
change:
time: [−5.8536% −4.9672% −4.1905%] (p = 0.00 <
0.05)
thrpt: [+4.3738% +5.2269% +6.2175%]
Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
2 (2.00%) high mild
3 (3.00%) high severe
list_primitive/zstd time: [2.0056 ms 2.0148 ms 2.0262 ms]
thrpt: [1.0275 GiB/s 1.0333 GiB/s 1.0381 GiB/s]
change:
time: [−4.7073% −3.6719% −2.6698%] (p = 0.00 <
0.05)
thrpt: [+2.7431% +3.8118% +4.9398%]
Performance has improved.
Found 12 outliers among 100 measurements (12.00%)
2 (2.00%) high mild
10 (10.00%) high severe
list_primitive/zstd_parquet_2
time: [2.0455 ms 2.0730 ms 2.1120 ms]
thrpt: [1009.4 MiB/s 1.0043 GiB/s 1.0178 GiB/s]
change:
time: [−5.8626% −3.7672% −1.4196%] (p = 0.00 <
0.05)
thrpt: [+1.4401% +3.9146% +6.2277%]
Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
2 (2.00%) high mild
5 (5.00%) high severe
Benchmarking list_primitive_non_null/default: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase
target time to 6.6s, enable flat sampling, or reduce sample count to 60.
list_primitive_non_null/default
time: [1.3199 ms 1.3333 ms 1.3504 ms]
thrpt: [1.5384 GiB/s 1.5581 GiB/s 1.5740 GiB/s]
change:
time: [−4.1662% −2.3491% −0.7148%] (p = 0.01 <
0.05)
thrpt: [+0.7200% +2.4056% +4.3473%]
Change within noise threshold.
Found 6 outliers among 100 measurements (6.00%)
3 (3.00%) high mild
3 (3.00%) high severe
Benchmarking list_primitive_non_null/bloom_filter: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase
target time to 8.4s, enable flat sampling, or reduce sample count to 50.
list_primitive_non_null/bloom_filter
time: [1.6567 ms 1.6668 ms 1.6805 ms]
thrpt: [1.2362 GiB/s 1.2464 GiB/s 1.2540 GiB/s]
change:
time: [−2.7884% −1.3493% +0.2820%] (p = 0.07 >
0.05)
thrpt: [−0.2812% +1.3677% +2.8684%]
No change in performance detected.
Found 4 outliers among 100 measurements (4.00%)
1 (1.00%) high mild
3 (3.00%) high severe
Benchmarking list_primitive_non_null/parquet_2: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase
target time to 7.2s, enable flat sampling, or reduce sample count to 50.
list_primitive_non_null/parquet_2
time: [1.4279 ms 1.4409 ms 1.4551 ms]
thrpt: [1.4277 GiB/s 1.4418 GiB/s 1.4550 GiB/s]
change:
time: [−2.0598% −0.9952% −0.1318%] (p = 0.04 <
0.05)
thrpt: [+0.1319% +1.0052% +2.1032%]
Change within noise threshold.
Found 3 outliers among 100 measurements (3.00%)
2 (2.00%) high mild
1 (1.00%) high severe
list_primitive_non_null/zstd
time: [2.6966 ms 2.7358 ms 2.7994 ms]
thrpt: [759.93 MiB/s 777.60 MiB/s 788.89 MiB/s]
change:
time: [−3.8379% −2.1418% +0.0785%] (p = 0.03 <
0.05)
thrpt: [−0.0784% +2.1887% +3.9911%]
Change within noise threshold.
Found 7 outliers among 100 measurements (7.00%)
3 (3.00%) high mild
4 (4.00%) high severe
list_primitive_non_null/zstd_parquet_2
time: [2.7684 ms 2.7861 ms 2.8099 ms]
thrpt: [757.07 MiB/s 763.55 MiB/s 768.44 MiB/s]
change:
time: [−6.4460% −4.1387% −2.1474%] (p = 0.00 <
0.05)
thrpt: [+2.1946% +4.3174% +6.8901%]
Performance has improved.
```
# Are there any user-facing changes?
None. Some internal symbols are now unused. I added some
`#[allow(dead_code)]` statements since these were experimental-visible
and might be externally relied on.
---------
Signed-off-by: Hippolyte Barraud <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/src/column/reader.rs | 4 +-
parquet/src/column/writer/mod.rs | 73 +++++++++-----------------
parquet/src/encodings/levels.rs | 87 ++++++++++++++++++-------------
parquet/src/encodings/rle.rs | 17 +++++-
parquet/src/util/bit_util.rs | 7 +++
parquet/src/util/test_common/page_util.rs | 2 +-
6 files changed, 100 insertions(+), 90 deletions(-)
diff --git a/parquet/src/column/reader.rs b/parquet/src/column/reader.rs
index 29cb50185a..d1a328237f 100644
--- a/parquet/src/column/reader.rs
+++ b/parquet/src/column/reader.rs
@@ -1401,11 +1401,11 @@ mod tests {
// Helper: build a DataPage v2 for this list column.
let make_v2_page =
|rep_levels: &[i16], def_levels: &[i16], values: &[i32], num_rows:
u32| -> Page {
- let mut rep_enc = LevelEncoder::v2(max_rep_level,
rep_levels.len());
+ let mut rep_enc = LevelEncoder::v2_streaming(max_rep_level);
rep_enc.put(rep_levels);
let rep_bytes = rep_enc.consume();
- let mut def_enc = LevelEncoder::v2(max_def_level,
def_levels.len());
+ let mut def_enc = LevelEncoder::v2_streaming(max_def_level);
def_enc.put(def_levels);
let def_bytes = def_enc.consume();
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 4c3dbabc21..cdf489f3b6 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -351,9 +351,9 @@ pub struct GenericColumnWriter<'a, E: ColumnValueEncoder> {
/// but we use a BTreeSet so that the output is deterministic
encodings: BTreeSet<Encoding>,
encoding_stats: Vec<PageEncodingStats>,
- // Reused buffers
- def_levels_sink: Vec<i16>,
- rep_levels_sink: Vec<i16>,
+ // Streaming level encoders for definition/repetition levels.
+ def_levels_encoder: LevelEncoder,
+ rep_levels_encoder: LevelEncoder,
data_pages: VecDeque<CompressedPage>,
// column index and offset index
column_index_builder: ColumnIndexBuilder,
@@ -411,6 +411,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
};
Self {
+ def_levels_encoder:
Self::create_level_encoder(descr.max_def_level(), &props),
+ rep_levels_encoder:
Self::create_level_encoder(descr.max_rep_level(), &props),
descr,
props,
statistics_enabled,
@@ -418,8 +420,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
codec,
compressor,
encoder,
- def_levels_sink: vec![],
- rep_levels_sink: vec![],
data_pages: VecDeque::new(),
page_metrics,
column_metrics,
@@ -647,6 +647,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
})
}
+ /// Creates a new streaming level encoder appropriate for the writer
version.
+ fn create_level_encoder(max_level: i16, props: &WriterProperties) ->
LevelEncoder {
+ match props.writer_version() {
+ WriterVersion::PARQUET_1_0 =>
LevelEncoder::v1_streaming(Encoding::RLE, max_level),
+ WriterVersion::PARQUET_2_0 =>
LevelEncoder::v2_streaming(max_level),
+ }
+ }
+
/// Writes mini batch of values, definition and repetition levels.
/// This allows fine-grained processing of values and maintaining a
reasonable
/// page size.
@@ -677,7 +685,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// Update histogram
self.page_metrics.update_definition_level_histogram(levels);
- self.def_levels_sink.extend_from_slice(levels);
+ self.def_levels_encoder.put(levels);
values_to_write
} else {
num_levels
@@ -708,7 +716,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
// Update histogram
self.page_metrics.update_repetition_level_histogram(levels);
- self.rep_levels_sink.extend_from_slice(levels);
+ self.rep_levels_encoder.put(levels);
} else {
// Each value is exactly one row.
// Equals to the number of values, we count nulls as well.
@@ -1060,23 +1068,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
let mut buffer = vec![];
if max_rep_level > 0 {
- buffer.extend_from_slice(
- &self.encode_levels_v1(
- Encoding::RLE,
- &self.rep_levels_sink[..],
- max_rep_level,
- )[..],
- );
+ self.rep_levels_encoder
+ .flush_to(|data| buffer.extend_from_slice(data));
}
if max_def_level > 0 {
- buffer.extend_from_slice(
- &self.encode_levels_v1(
- Encoding::RLE,
- &self.def_levels_sink[..],
- max_def_level,
- )[..],
- );
+ self.def_levels_encoder
+ .flush_to(|data| buffer.extend_from_slice(data));
}
buffer.extend_from_slice(&values_data.buf);
@@ -1106,15 +1104,15 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
let mut buffer = vec![];
if max_rep_level > 0 {
- let levels =
self.encode_levels_v2(&self.rep_levels_sink[..], max_rep_level);
- rep_levels_byte_len = levels.len();
- buffer.extend_from_slice(&levels[..]);
+ self.rep_levels_encoder
+ .flush_to(|data| buffer.extend_from_slice(data));
+ rep_levels_byte_len = buffer.len();
}
if max_def_level > 0 {
- let levels =
self.encode_levels_v2(&self.def_levels_sink[..], max_def_level);
- def_levels_byte_len = levels.len();
- buffer.extend_from_slice(&levels[..]);
+ self.def_levels_encoder
+ .flush_to(|data| buffer.extend_from_slice(data));
+ def_levels_byte_len = buffer.len() - rep_levels_byte_len;
}
let uncompressed_size =
@@ -1164,10 +1162,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
// Update total number of rows.
self.column_metrics.total_rows_written +=
self.page_metrics.num_buffered_rows as u64;
-
- // Reset state.
- self.rep_levels_sink.clear();
- self.def_levels_sink.clear();
self.page_metrics.new_page();
Ok(())
@@ -1244,23 +1238,6 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
Ok(metadata)
}
- /// Encodes definition or repetition levels for Data Page v1.
- #[inline]
- fn encode_levels_v1(&self, encoding: Encoding, levels: &[i16], max_level:
i16) -> Vec<u8> {
- let mut encoder = LevelEncoder::v1(encoding, max_level, levels.len());
- encoder.put(levels);
- encoder.consume()
- }
-
- /// Encodes definition or repetition levels for Data Page v2.
- /// Encoding is always RLE.
- #[inline]
- fn encode_levels_v2(&self, levels: &[i16], max_level: i16) -> Vec<u8> {
- let mut encoder = LevelEncoder::v2(max_level, levels.len());
- encoder.put(levels);
- encoder.consume()
- }
-
/// Writes compressed data page into underlying sink and updates global
metrics.
#[inline]
fn write_data_page(&mut self, page: CompressedPage) -> Result<()> {
diff --git a/parquet/src/encodings/levels.rs b/parquet/src/encodings/levels.rs
index 8c95e5ca51..b761a5ac5d 100644
--- a/parquet/src/encodings/levels.rs
+++ b/parquet/src/encodings/levels.rs
@@ -21,21 +21,7 @@ use super::rle::RleEncoder;
use crate::basic::Encoding;
use crate::data_type::AsBytes;
-use crate::util::bit_util::{BitWriter, ceil, num_required_bits};
-
-/// Computes max buffer size for level encoder/decoder based on encoding, max
-/// repetition/definition level and number of total buffered values (includes
null
-/// values).
-#[inline]
-pub fn max_buffer_size(encoding: Encoding, max_level: i16,
num_buffered_values: usize) -> usize {
- let bit_width = num_required_bits(max_level as u64);
- match encoding {
- Encoding::RLE => RleEncoder::max_buffer_size(bit_width,
num_buffered_values),
- #[allow(deprecated)]
- Encoding::BIT_PACKED => ceil(num_buffered_values * bit_width as usize,
8),
- _ => panic!("Unsupported encoding type {encoding}"),
- }
-}
+use crate::util::bit_util::{BitWriter, num_required_bits};
/// Encoder for definition/repetition levels.
/// Currently only supports Rle and BitPacked (dev/null) encoding, including
v2.
@@ -46,46 +32,44 @@ pub enum LevelEncoder {
}
impl LevelEncoder {
- /// Creates new level encoder based on encoding, max level and underlying
byte buffer.
- /// For bit packed encoding it is assumed that buffer is already allocated
with
- /// `levels::max_buffer_size` method.
- ///
- /// Used to encode levels for Data Page v1.
+ /// Creates a new streaming level encoder for Data Page v1.
///
- /// Panics, if encoding is not supported.
- pub fn v1(encoding: Encoding, max_level: i16, capacity: usize) -> Self {
- let capacity_bytes = max_buffer_size(encoding, max_level, capacity);
- let mut buffer = Vec::with_capacity(capacity_bytes);
+ /// Unlike [`v1`](Self::v1), this does not require knowing the number of
values
+ /// upfront, making it suitable for incremental encoding where levels are
fed in
+ /// as they arrive via [`put`](Self::put).
+ pub fn v1_streaming(encoding: Encoding, max_level: i16) -> Self {
let bit_width = num_required_bits(max_level as u64);
match encoding {
Encoding::RLE => {
// Reserve space for length header
- buffer.extend_from_slice(&[0; 4]);
+ let buffer = vec![0u8; 4];
LevelEncoder::Rle(RleEncoder::new_from_buf(bit_width, buffer))
}
#[allow(deprecated)]
Encoding::BIT_PACKED => {
- // Here we set full byte buffer without adjusting for
num_buffered_values,
- // because byte buffer will already be allocated with size from
- // `max_buffer_size()` method.
- LevelEncoder::BitPacked(bit_width,
BitWriter::new_from_buf(buffer))
+ LevelEncoder::BitPacked(bit_width,
BitWriter::new_from_buf(Vec::new()))
}
_ => panic!("Unsupported encoding type {encoding}"),
}
}
- /// Creates new level encoder based on RLE encoding. Used to encode Data
Page v2
- /// repetition and definition levels.
- pub fn v2(max_level: i16, capacity: usize) -> Self {
- let capacity_bytes = max_buffer_size(Encoding::RLE, max_level,
capacity);
- let buffer = Vec::with_capacity(capacity_bytes);
+ /// Creates a new streaming RLE level encoder for Data Page v2.
+ ///
+ /// Unlike [`v2`](Self::v2), this does not require knowing the number of
values
+ /// upfront, making it suitable for incremental encoding where levels are
fed in
+ /// as they arrive via [`put`](Self::put).
+ pub fn v2_streaming(max_level: i16) -> Self {
let bit_width = num_required_bits(max_level as u64);
- LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, buffer))
+ LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, Vec::new()))
}
/// Put/encode levels vector into this level encoder.
/// Returns number of encoded values that are less than or equal to length
of the
/// input buffer.
+ ///
+ /// This method does **not** flush the underlying encoder, so it can be
called
+ /// incrementally across multiple batches without forcing run boundaries.
+ /// The encoder is flushed automatically when [`consume`](Self::consume)
is called.
#[inline]
pub fn put(&mut self, buffer: &[i16]) -> usize {
let mut num_encoded = 0;
@@ -95,14 +79,12 @@ impl LevelEncoder {
encoder.put(*value as u64);
num_encoded += 1;
}
- encoder.flush();
}
LevelEncoder::BitPacked(bit_width, ref mut encoder) => {
for value in buffer {
encoder.put_value(*value as u64, bit_width as usize);
num_encoded += 1;
}
- encoder.flush();
}
}
num_encoded
@@ -111,6 +93,7 @@ impl LevelEncoder {
/// Finalizes level encoder, flush all intermediate buffers and return
resulting
/// encoded buffer. Returned buffer is already truncated to encoded bytes
only.
#[inline]
+ #[allow(unused)]
pub fn consume(self) -> Vec<u8> {
match self {
LevelEncoder::Rle(encoder) => {
@@ -126,4 +109,34 @@ impl LevelEncoder {
LevelEncoder::BitPacked(_, encoder) => encoder.consume(),
}
}
+
+ /// Flushes all intermediate buffers, passes the encoded data to `f`, then
+ /// resets the encoder for reuse while retaining the buffer allocation.
+ #[inline]
+ pub fn flush_to<F, R>(&mut self, f: F) -> R
+ where
+ F: FnOnce(&[u8]) -> R,
+ {
+ let result = match self {
+ LevelEncoder::Rle(encoder) => {
+ let data = encoder.flush_buffer_mut();
+ // Patch the 4-byte length header reserved at the start of the
buffer
+ let encoded_len = (data.len() - mem::size_of::<i32>()) as i32;
+ data[..4].copy_from_slice(&encoded_len.to_le_bytes());
+ f(data)
+ }
+ LevelEncoder::RleV2(encoder) => f(encoder.flush_buffer()),
+ LevelEncoder::BitPacked(_, encoder) => f(encoder.flush_buffer()),
+ };
+ match self {
+ LevelEncoder::Rle(encoder) => {
+ encoder.clear();
+ // Re-reserve the 4-byte length header for the next page
+ encoder.skip(mem::size_of::<i32>());
+ }
+ LevelEncoder::RleV2(encoder) => encoder.clear(),
+ LevelEncoder::BitPacked(_, encoder) => encoder.clear(),
+ }
+ result
+ }
}
diff --git a/parquet/src/encodings/rle.rs b/parquet/src/encodings/rle.rs
index c95a46c634..2815c20dab 100644
--- a/parquet/src/encodings/rle.rs
+++ b/parquet/src/encodings/rle.rs
@@ -177,16 +177,22 @@ impl RleEncoder {
/// Borrow equivalent of the `consume` method.
/// Call `clear()` after invoking this method.
#[inline]
- #[allow(unused)]
pub fn flush_buffer(&mut self) -> &[u8] {
self.flush();
self.bit_writer.flush_buffer()
}
+ /// Like `flush_buffer`, but returns mutable access to the internal buffer.
+ /// Call `clear()` after invoking this method.
+ #[inline]
+ pub fn flush_buffer_mut(&mut self) -> &mut [u8] {
+ self.flush();
+ self.bit_writer.flush_buffer_mut()
+ }
+
/// Clears the internal state so this encoder can be reused (e.g., after
becoming
/// full).
#[inline]
- #[allow(unused)]
pub fn clear(&mut self) {
self.bit_writer.clear();
self.num_buffered_values = 0;
@@ -196,6 +202,13 @@ impl RleEncoder {
self.indicator_byte_pos = -1;
}
+ /// Advances the buffer by `num_bytes` zero bytes, delegating to the
+ /// underlying [`BitWriter::skip`].
+ #[inline]
+ pub fn skip(&mut self, num_bytes: usize) {
+ self.bit_writer.skip(num_bytes);
+ }
+
/// Flushes all remaining values and return the final byte buffer
maintained by the
/// internal writer.
#[inline]
diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs
index 2625648258..7d7907f6f5 100644
--- a/parquet/src/util/bit_util.rs
+++ b/parquet/src/util/bit_util.rs
@@ -219,6 +219,13 @@ impl BitWriter {
self.buffer()
}
+ /// Like `flush_buffer`, but returns mutable access to the buffer.
+ #[inline]
+ pub fn flush_buffer_mut(&mut self) -> &mut [u8] {
+ self.flush();
+ &mut self.buffer
+ }
+
/// Clears the internal state so the buffer can be reused.
#[inline]
pub fn clear(&mut self) {
diff --git a/parquet/src/util/test_common/page_util.rs
b/parquet/src/util/test_common/page_util.rs
index 5b64eb5413..6a99beaea1 100644
--- a/parquet/src/util/test_common/page_util.rs
+++ b/parquet/src/util/test_common/page_util.rs
@@ -75,7 +75,7 @@ impl DataPageBuilderImpl {
if max_level <= 0 {
return 0;
}
- let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level,
levels.len());
+ let mut level_encoder = LevelEncoder::v1_streaming(Encoding::RLE,
max_level);
level_encoder.put(levels);
let encoded_levels = level_encoder.consume();
// Actual encoded bytes (without length offset)