This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new c6ea0a5e0c Add bloom filter folding to automatically size SBBF filters
(#9628)
c6ea0a5e0c is described below
commit c6ea0a5e0cf6c5fc9f2fc3610fe2351bb5ec25c0
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Tue Apr 7 05:54:47 2026 -0500
Add bloom filter folding to automatically size SBBF filters (#9628)
## Summary
Bloom filters now support **folding mode**: allocate a conservatively
large filter (sized for worst-case NDV), insert all values during
writing, then fold down at flush time to meet a target FPP. This
eliminates the need to guess NDV upfront and produces optimally-sized
filters automatically.
### Changes
- `BloomFilterProperties.ndv` changed from `u64` to `Option<u64>` — when
`None` (new default), the filter is sized based on
`max_row_group_row_count`; when `Some(n)`, the explicit NDV is used
- `DEFAULT_BLOOM_FILTER_NDV` redefined to
`DEFAULT_MAX_ROW_GROUP_ROW_COUNT as u64` (was hardcoded `1_000_000`)
- Added `Sbbf::fold_to_target_fpp()` and supporting methods
(`num_folds_for_target_fpp`, `fold_n`, `num_blocks`) with comprehensive
documentation
- `flush_bloom_filter()` in both `ColumnValueEncoderImpl` and
`ByteArrayEncoder` now folds the filter before returning it
- New `create_bloom_filter()` helper in `encoder.rs` centralizes bloom
filter construction logic
### How folding works
The SBBF fold operation merges adjacent block pairs (`block[2i] |
block[2i+1]`) via bitwise OR, halving the filter size. This differs from
standard Bloom filter folding (which merges halves at distance `m/2`)
because SBBF uses multiplicative hashing for block selection:
```
block_index = ((hash >> 32) * num_blocks) >> 32
```
When `num_blocks` is halved, the new index becomes `floor(original_index
/ 2)`, so adjacent blocks map to the same position.
The number of safe folds is determined analytically from the average
per-block fill rate: after `k` folds, expected fill is `1 -
(1-f)^(2^k)`, giving `FPP = fill^8`. This requires only a single
popcount scan over the blocks (no scratch allocation), then O(log N)
floating-point ops to find the optimal fold count. The actual fold is
then performed in a single pass.
### Benchmarks
Filter sized for 1M NDV, varying actual distinct values inserted.
Measured on Apple M3 Pro.
**Fold overhead (fold_to_target_fpp only):**
| Actual NDV | Time | Throughput |
|---|---|---|
| 1,000 | 39.1 µs | 838 Melem/s |
| 10,000 | 34.2 µs | 960 Melem/s |
| 100,000 | 32.5 µs | 1.01 Gelem/s |
**End-to-end (insert + fold) vs insert-only:**
| Actual NDV | Insert only | Insert + fold | Fold overhead |
|---|---|---|---|
| 1,000 | 14.7 µs | 49.1 µs | 34.4 µs (70%) |
| 10,000 | 30.7 µs | 58.1 µs | 27.4 µs (47%) |
| 100,000 | 162.5 µs | 189.8 µs | 27.3 µs (14%) |
The fold cost is dominated by the popcount scan over the initial (large)
filter. For the common case (100K values into a 1M-NDV filter), folding
adds only ~14% overhead to the total insert+fold time.
### References
Sailhan & Stehr, ["Folding and Unfolding Bloom
Filters"](https://hal.science/hal-01126174v1/document), IEEE iThings
2012.
Liang, ["Blocked Bloom Filters: Speeding Up Point Lookups in Tiger
Postgres' Native
Columnstore"](https://www.tigerdata.com/blog/blocked-bloom-filters-speeding-up-point-lookups-in-tiger-postgres-native-columnstore)
### Breaking changes
There are no breaking API changes
However, when bloom filters are enabled without specifying the number of
distinct values, the bloom filters are automatically sized. Previously
they would be sized using the default value of
`DEFAULT_BLOOM_FILTER_NDV`
## Test plan
- [x] All existing bloom filter unit tests pass
- [x] All existing integration tests (sync + async reader roundtrips)
pass
- [x] New unit tests: fold correctness, no false negatives after
folding, FPP target respected, minimum size guard
- [x] New unit tests: folded filter is bit-identical to a fresh filter
of the same size (proves correctness via two lemmas about SBBF hashing)
- [x] New unit tests: multi-step folding, folded FPP matches fresh FPP
empirically, fold size matches optimal fixed-size filter
- [x] New integration test: `i32_column_bloom_filter_fixed_ndv` —
roundtrip with both overestimated and underestimated NDV
- [x] Full `cargo test -p parquet` passes
🤖 Generated with [Claude Code](https://claude.com/claude-code)
---------
Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
Co-authored-by: Matthew Kim
<[email protected]>
Co-authored-by: emkornfield <[email protected]>
Co-authored-by: Liang-Chi Hsieh <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
---
parquet/Cargo.toml | 4 +
parquet/benches/bloom_filter.rs | 113 +++++
parquet/src/arrow/arrow_writer/byte_array.rs | 15 +-
parquet/src/arrow/arrow_writer/mod.rs | 47 +-
parquet/src/bloom_filter/mod.rs | 622 ++++++++++++++++++++++++++-
parquet/src/column/writer/encoder.rs | 26 +-
parquet/src/file/properties.rs | 99 ++++-
7 files changed, 876 insertions(+), 50 deletions(-)
diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index efcd1fe219..696e9b5411 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -275,5 +275,9 @@ name = "row_selection_cursor"
harness = false
required-features = ["arrow"]
+[[bench]]
+name = "bloom_filter"
+harness = false
+
[lib]
bench = false
diff --git a/parquet/benches/bloom_filter.rs b/parquet/benches/bloom_filter.rs
new file mode 100644
index 0000000000..ca4f900067
--- /dev/null
+++ b/parquet/benches/bloom_filter.rs
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use criterion::{BenchmarkId, Criterion, Throughput, criterion_group,
criterion_main};
+use parquet::bloom_filter::Sbbf;
+
+/// Build a bloom filter sized for `initial_ndv` at `fpp`, insert `num_values`
distinct values,
+/// and return it ready for folding.
+fn build_filter(initial_ndv: u64, fpp: f64, num_values: u64) -> Sbbf {
+ let mut sbbf = Sbbf::new_with_ndv_fpp(initial_ndv, fpp).unwrap();
+ for i in 0..num_values {
+ sbbf.insert(&i);
+ }
+ sbbf
+}
+
+fn bench_fold_to_target_fpp(c: &mut Criterion) {
+ let mut group = c.benchmark_group("fold_to_target_fpp");
+
+ // Realistic scenario: filter sized for 1M NDV, varying actual distinct
values
+ let initial_ndv = 1_000_000u64;
+ let fpp = 0.05;
+
+ for num_values in [1_000u64, 10_000, 100_000] {
+ let filter = build_filter(initial_ndv, fpp, num_values);
+ let num_blocks = filter.num_blocks();
+ group.throughput(Throughput::Elements(num_blocks as u64));
+ group.bench_with_input(BenchmarkId::new("ndv", num_values), &filter,
|b, filter| {
+ b.iter_batched(
+ || filter.clone(),
+ |mut f| {
+ f.fold_to_target_fpp(fpp);
+ f
+ },
+ criterion::BatchSize::SmallInput,
+ );
+ });
+ }
+ group.finish();
+}
+
+fn bench_insert_and_fold(c: &mut Criterion) {
+ let mut group = c.benchmark_group("insert_and_fold");
+
+ let initial_ndv = 1_000_000u64;
+ let fpp = 0.05;
+
+ for num_values in [1_000u64, 10_000, 100_000] {
+ group.throughput(Throughput::Elements(num_values));
+ group.bench_with_input(
+ BenchmarkId::new("values", num_values),
+ &num_values,
+ |b, &num_values| {
+ b.iter(|| {
+ let mut sbbf = Sbbf::new_with_ndv_fpp(initial_ndv,
fpp).unwrap();
+ for i in 0..num_values {
+ sbbf.insert(&i);
+ }
+ sbbf.fold_to_target_fpp(fpp);
+ sbbf
+ });
+ },
+ );
+ }
+ group.finish();
+}
+
+fn bench_insert_only(c: &mut Criterion) {
+ let mut group = c.benchmark_group("insert_only");
+
+ let initial_ndv = 1_000_000u64;
+ let fpp = 0.05;
+
+ for num_values in [1_000u64, 10_000, 100_000] {
+ group.throughput(Throughput::Elements(num_values));
+ group.bench_with_input(
+ BenchmarkId::new("values", num_values),
+ &num_values,
+ |b, &num_values| {
+ b.iter(|| {
+ let mut sbbf = Sbbf::new_with_ndv_fpp(initial_ndv,
fpp).unwrap();
+ for i in 0..num_values {
+ sbbf.insert(&i);
+ }
+ sbbf
+ });
+ },
+ );
+ }
+ group.finish();
+}
+
+criterion_group!(
+ benches,
+ bench_fold_to_target_fpp,
+ bench_insert_and_fold,
+ bench_insert_only
+);
+criterion_main!(benches);
diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs
b/parquet/src/arrow/arrow_writer/byte_array.rs
index 228d229b30..f56f9570ad 100644
--- a/parquet/src/arrow/arrow_writer/byte_array.rs
+++ b/parquet/src/arrow/arrow_writer/byte_array.rs
@@ -17,7 +17,9 @@
use crate::basic::Encoding;
use crate::bloom_filter::Sbbf;
-use crate::column::writer::encoder::{ColumnValueEncoder, DataPageValues,
DictionaryPage};
+use crate::column::writer::encoder::{
+ ColumnValueEncoder, DataPageValues, DictionaryPage, create_bloom_filter,
+};
use crate::data_type::{AsBytes, ByteArray, Int32Type};
use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
use crate::encodings::rle::RleEncoder;
@@ -423,6 +425,7 @@ pub struct ByteArrayEncoder {
min_value: Option<ByteArray>,
max_value: Option<ByteArray>,
bloom_filter: Option<Sbbf>,
+ bloom_filter_target_fpp: f64,
geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
}
@@ -430,7 +433,9 @@ impl ColumnValueEncoder for ByteArrayEncoder {
type T = ByteArray;
type Values = dyn Array;
fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
- self.bloom_filter.take()
+ let mut sbbf = self.bloom_filter.take()?;
+ sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
+ Some(sbbf)
}
fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
@@ -443,10 +448,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
let fallback = FallbackEncoder::new(descr, props)?;
- let bloom_filter = props
- .bloom_filter_properties(descr.path())
- .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
- .transpose()?;
+ let (bloom_filter, bloom_filter_target_fpp) =
create_bloom_filter(props, descr)?;
let statistics_enabled = props.statistics_enabled(descr.path());
@@ -456,6 +458,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
fallback,
statistics_enabled,
bloom_filter,
+ bloom_filter_target_fpp,
dict_encoder: dictionary,
min_value: None,
max_value: None,
diff --git a/parquet/src/arrow/arrow_writer/mod.rs
b/parquet/src/arrow/arrow_writer/mod.rs
index 2ef71d5745..8422263b1f 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -2681,6 +2681,7 @@ mod tests {
values: ArrayRef,
schema: SchemaRef,
bloom_filter: bool,
+ bloom_filter_ndv: Option<u64>,
bloom_filter_position: BloomFilterPosition,
}
@@ -2692,6 +2693,7 @@ mod tests {
values,
schema: Arc::new(schema),
bloom_filter: false,
+ bloom_filter_ndv: None,
bloom_filter_position: BloomFilterPosition::AfterRowGroup,
}
}
@@ -2712,6 +2714,7 @@ mod tests {
values,
schema,
bloom_filter,
+ bloom_filter_ndv,
bloom_filter_position,
} = options;
@@ -2750,15 +2753,18 @@ mod tests {
for encoding in &encodings {
for version in [WriterVersion::PARQUET_1_0,
WriterVersion::PARQUET_2_0] {
for row_group_size in row_group_sizes {
- let props = WriterProperties::builder()
+ let mut builder = WriterProperties::builder()
.set_writer_version(version)
.set_max_row_group_row_count(Some(row_group_size))
.set_dictionary_enabled(dictionary_size != 0)
.set_dictionary_page_size_limit(dictionary_size.max(1))
.set_encoding(*encoding)
.set_bloom_filter_enabled(bloom_filter)
- .set_bloom_filter_position(bloom_filter_position)
- .build();
+ .set_bloom_filter_position(bloom_filter_position);
+ if let Some(ndv) = bloom_filter_ndv {
+ builder = builder.set_bloom_filter_ndv(ndv);
+ }
+ let props = builder.build();
files.push(roundtrip_opts(&expected_batch, props))
}
@@ -3142,6 +3148,41 @@ mod tests {
);
}
+ /// Test that bloom filter folding produces correct results even when
+ /// the configured NDV differs significantly from actual NDV.
+ /// A large NDV means a larger initial filter that gets folded down;
+ /// a small NDV means a smaller initial filter.
+ #[test]
+ fn i32_column_bloom_filter_fixed_ndv() {
+ let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
+
+ // NDV much larger than actual distinct values — tests folding a large
filter down
+ let mut options = RoundTripOptions::new(array.clone(), false);
+ options.bloom_filter = true;
+ options.bloom_filter_ndv = Some(1_000_000);
+
+ let files = one_column_roundtrip_with_options(options);
+ check_bloom_filter(
+ files,
+ "col".to_string(),
+ (0..SMALL_SIZE as i32).collect(),
+ (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
+ );
+
+ // NDV smaller than actual distinct values — tests the underestimate
path
+ let mut options = RoundTripOptions::new(array, false);
+ options.bloom_filter = true;
+ options.bloom_filter_ndv = Some(3);
+
+ let files = one_column_roundtrip_with_options(options);
+ check_bloom_filter(
+ files,
+ "col".to_string(),
+ (0..SMALL_SIZE as i32).collect(),
+ (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
+ );
+ }
+
#[test]
fn binary_column_bloom_filter() {
let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index 933b5a269f..8e89ba406b 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -68,6 +68,46 @@
//! | 1,000,000 | 0.00001 | 131,072 | 4,096 |
//! | 1,000,000 | 0.000001 | 262,144 | 8,192 |
//!
+//! # Structure: Filter → Blocks → Words → Bits
+//!
+//! An SBBF is an array of **blocks**. Each block is 256 bits (32 bytes),
+//! divided into eight 32-bit **words**. A word is just a `u32` — an array of
+//! 32 individual bits that can each be "set" (1) or "not set" (0).
+//!
+//! ```text
+//! Sbbf (the whole filter)
+//! ┌──────────┬──────────┬──────────┬─── ─── ──┬──────────┐
+//! │ Block 0 │ Block 1 │ Block 2 │ ... │ Block N-1│
+//! └──────────┴──────────┴──────────┴─── ─── ──┴──────────┘
+//! │
+//! ▼
+//! One Block = 256 bits = 8 words
+//! ┌────────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┐
+//! │ word 0 │ word 1 │ word 2 │ word 3 │ word 4 │ word 5 │ word 6 │ word 7 │
+//! │ (u32) │ (u32) │ (u32) │ (u32) │ (u32) │ (u32) │ (u32) │ (u32) │
+//! └────────┴────────┴────────┴────────┴────────┴────────┴────────┴────────┘
+//! │
+//! ▼
+//! One Word = 32 individual bits
+//! ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
+//! │0│0│1│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│ ← bit
29 is set
+//! └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
+//! ```
+//!
+//! **Inserting** a value hashes it to a 64-bit number, then:
+//! 1. The upper 32 bits pick which **block** (via
`Sbbf::hash_to_block_index`).
+//! 2. The lower 32 bits pick one bit position in each of the 8 **words**
(via `Block::mask`).
+//! So each insert sets exactly **8 bits** (one per word) in a single
block.
+//!
+//! **Checking** does the same two steps and returns `true` only if all 8 bits
+//! are already set — meaning the value was *probably* inserted (or is a false
+//! positive).
+//!
+//! # Bloom Filter Folding
+//!
+//! After inserting all values into a bloom filter it can be "folded" to
minimize it's size.
+//! See [`Sbbf::fold_to_target_fpp`] for details on the algorithm and its
mathematical basis.
+//!
//! [parquet-bf-spec]:
https://github.com/apache/parquet-format/blob/master/BloomFilter.md
//! [sbbf-paper]: https://arxiv.org/pdf/2101.01719
//! [bf-formulae]: http://tfk.mit.edu/pdf/bloom.pdf
@@ -114,23 +154,50 @@ pub struct BloomFilterHeader {
}
);
-/// Each block is 256 bits, broken up into eight contiguous "words", each
consisting of 32 bits.
-/// Each word is thought of as an array of bits; each bit is either "set" or
"not set".
+/// A single 256-bit block, the basic unit of the Split Block Bloom Filter.
+///
+/// A block is eight contiguous 32-bit **words** (`[u32; 8]`).
+/// Each word is an independent bit-array of 32 positions:
+///
+/// ```text
+/// Block (256 bits total)
+/// ┌────────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┐
+/// │ word 0 │ word 1 │ word 2 │ word 3 │ word 4 │ word 5 │ word 6 │ word 7 │
+/// │ 32 bits│ 32 bits│ 32 bits│ 32 bits│ 32 bits│ 32 bits│ 32 bits│ 32 bits│
+/// └────────┴────────┴────────┴────────┴────────┴────────┴────────┴────────┘
+/// ```
+///
+/// When a value is inserted, [`Block::mask`] picks one bit in each word
+/// (8 bits total), and those bits are OR'd in. When checking, we verify
+/// all 8 bits are set.
#[derive(Debug, Copy, Clone)]
#[repr(transparent)]
struct Block([u32; 8]);
impl Block {
const ZERO: Block = Block([0; 8]);
- /// takes as its argument a single unsigned 32-bit integer and returns a
block in which each
- /// word has exactly one bit set.
+ /// Produce a block where each of the 8 words has exactly one bit set.
+ ///
+ /// For each word `i` the bit position is derived from `x`:
+ ///
+ /// ```text
+ /// y = (x wrapping* SALT[i]) >> 27 // top 5 bits → value in 0..31
+ /// word[i] = 1 << y // exactly one bit set per word
+ /// ```
+ ///
+ /// Because only the top 5 bits survive the shift, each word picks one of
+ /// 32 possible bit positions. The eight SALT constants spread the choices
+ /// so different words usually light up different positions.
+ ///
+ /// Key property: the mask depends *only* on `x` (a u32) and the fixed
+ /// SALT constants — it is independent of the filter size. This is why
+ /// folding preserves bit patterns (see Lemma 2 in tests).
fn mask(x: u32) -> Self {
let mut result = [0_u32; 8];
for i in 0..8 {
- // wrapping instead of checking for overflow
- let y = x.wrapping_mul(SALT[i]);
- let y = y >> 27;
- result[i] = 1 << y;
+ let y = x.wrapping_mul(SALT[i]); // spread bits via multiply
+ let y = y >> 27; // keep top 5 bits → 0..31
+ result[i] = 1 << y; // set exactly that one bit
}
Self(result)
}
@@ -155,7 +222,10 @@ impl Block {
self
}
- /// setting every bit in the block that was also set in the result from
mask
+ /// OR the mask bits into this block (`block[i] |= mask[i]`).
+ ///
+ /// After insertion the 8 bits chosen by `mask(hash)` are guaranteed set;
+ /// bits previously set by other hashes are preserved.
fn insert(&mut self, hash: u32) {
let mask = Self::mask(hash);
for i in 0..8 {
@@ -163,7 +233,11 @@ impl Block {
}
}
- /// returns true when every bit that is set in the result of mask is also
set in the block.
+ /// Check membership: returns `true` when *every* bit from `mask(hash)` is
+ /// already set in this block (`block[i] & mask[i] != 0` for all 8 words).
+ ///
+ /// A `true` result means "probably present" (other inserts may have set
+ /// the same bits). A `false` is definitive — the value was never inserted.
fn check(&self, hash: u32) -> bool {
let mask = Self::mask(hash);
for i in 0..8 {
@@ -191,7 +265,55 @@ impl std::ops::IndexMut<usize> for Block {
}
}
-/// A split block Bloom filter.
+impl std::ops::BitOr for Block {
+ type Output = Self;
+
+ #[inline]
+ fn bitor(self, rhs: Self) -> Self {
+ let mut result = [0u32; 8];
+ for (i, item) in result.iter_mut().enumerate() {
+ *item = self.0[i] | rhs.0[i];
+ }
+ Self(result)
+ }
+}
+
+impl std::ops::BitOrAssign for Block {
+ #[inline]
+ fn bitor_assign(&mut self, rhs: Self) {
+ for i in 0..8 {
+ self.0[i] |= rhs.0[i];
+ }
+ }
+}
+
+impl Block {
+ /// Count the total number of set bits across all 8 words.
+ ///
+ /// Computes popcount on each word separately and sums. Keeping the
popcount
+ /// separate from the OR allows the compiler to batch SIMD popcount
instructions
+ /// (e.g., `cnt.16b` on ARM NEON) instead of interleaving them with OR
operations.
+ #[inline]
+ fn count_ones(self) -> u32 {
+ // Written as a fold over the array so the compiler sees 8 independent
+ // popcount operations it can vectorize into cnt.16b + horizontal sum.
+ self.0.iter().map(|w| w.count_ones()).sum()
+ }
+}
+
+/// A split block Bloom filter (SBBF).
+///
+/// An SBBF partitions its bit space into fixed-size 256-bit (32-byte) blocks,
each fitting in a
+/// single CPU cache line. Each block contains eight 32-bit words, aligned
with SIMD lanes for
+/// parallel bit manipulation. When checking membership, only one block is
accessed per query,
+/// eliminating the cache-miss penalty of standard Bloom filters.
+///
+/// ## Sizing and folding
+///
+/// Filters are initially sized for a maximum expected number of distinct
values (NDV) via
+/// [`Sbbf::new_with_ndv_fpp`]. After all values are inserted, the filter is
compacted by
+/// calling [`Sbbf::fold_to_target_fpp`], which folds the filter down to the
smallest size
+/// that still meets the target false positive probability.
///
/// The creation of this structure is based on the
[`crate::file::properties::BloomFilterProperties`]
/// struct set via [`crate::file::properties::WriterProperties`] and is thus
hidden by default.
@@ -395,10 +517,27 @@ impl Sbbf {
Ok(Some(Self::new(&bitset)))
}
+ /// Map a 64-bit hash to a block index in `[0, num_blocks)`.
+ ///
+ /// Uses the "multiply-and-shift" trick (a fast alternative to modulo):
+ ///
+ /// ```text
+ /// upper32 = hash >> 32 // take the top 32 bits of the hash
+ /// index = (upper32 * N) >> 32 // ∈ [0, N) where N = num_blocks
+ /// ```
+ ///
+ /// Why this matters for folding (Lemma 1): when N is a power of two and
+ /// you halve it to N/2, the index also halves:
+ ///
+ /// ```text
+ /// index_N = (upper32 * N) >> 32
+ /// index_N/2 = (upper32 * N/2) >> 32 = index_N / 2 (integer division)
+ /// ```
+ ///
+ /// So the block that held hash `h` in the big filter is at `index / 2` in
+ /// the half-sized filter — exactly where `fold` ORs it.
#[inline]
fn hash_to_block_index(&self, hash: u64) -> usize {
- // unchecked_mul is unstable, but in reality this is safe, we'd just
use saturating mul
- // but it will not saturate
(((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
}
@@ -431,6 +570,140 @@ impl Sbbf {
self.0.capacity() * std::mem::size_of::<Block>()
}
+ /// Returns the number of blocks in this bloom filter.
+ pub fn num_blocks(&self) -> usize {
+ self.0.len()
+ }
+
+ /// Fold the bloom filter down to the smallest size that still meets the
target FPP
+ /// (False Positive Percentage).
+ ///
+ /// Folds the filter by merging groups of adjacent blocks via bitwise OR,
where each
+ /// fold level halves the number of blocks. The fold count is chosen as
the maximum
+ /// number of folds whose estimated FPP stays within `target_fpp`. The
filter stops
+ /// at a minimum size of 1 block (32 bytes).
+ ///
+ /// ## How it works
+ ///
+ /// SBBFs use multiplicative hashing for block selection:
+ ///
+ /// ```text
+ /// block_index = ((hash >> 32) * num_blocks) >> 32
+ /// ```
+ ///
+ /// A single fold halves the block count: when `num_blocks` is halved, the
new index
+ /// becomes `floor(original_index / 2)`, so blocks `2i` and `2i+1` map to
the same
+ /// position. More generally, `k` folds reduce the block count by `2^k`,
merging
+ /// groups of `2^k` adjacent blocks in a single pass:
+ ///
+ /// ```text
+ /// folded[i] = blocks[i*2^k] | blocks[i*2^k + 1] | ... | blocks[i*2^k +
2^k - 1]
+ /// ```
+ ///
+ /// This differs from standard Bloom filter folding, which merges the two
halves
+ /// (`B[i] | B[i + m/2]`) because standard filters use modular hashing
where
+ /// `h(x) mod (m/2)` maps indices `i` and `i + m/2` to the same position.
+ ///
+ /// ## Correctness
+ ///
+ /// Folding **never introduces false negatives**. Every bit that was set
in the original
+ /// filter remains set in the folded filter (via bitwise OR). The only
effect is a controlled
+ /// increase in FPP as set bits from different blocks are merged together.
+ /// This is was originally proven in [Sailhan & Stehr 2012] for standard
bloom filters and is empirically
+ /// demonstrated for SBBFs in Lemma 1 and Lemma 2 of the tests.
+ ///
+ /// ## References
+ ///
+ /// [Sailhan & Stehr 2012]: https://doi.org/10.1109/GreenCom.2012.16
+ pub fn fold_to_target_fpp(&mut self, target_fpp: f64) {
+ let num_folds = self.num_folds_for_target_fpp(target_fpp);
+ if num_folds > 0 {
+ self.fold_n(num_folds);
+ }
+ }
+
+ /// Determine how many folds can be applied without exceeding `target_fpp`.
+ ///
+ /// Computes the average per-block fill rate in a single pass (no
allocation),
+ /// then analytically estimates the FPP at each fold level.
+ ///
+ /// When two blocks with independent fill rate `f` are OR'd, the expected
fill
+ /// of the merged block is `1 - (1-f)^2`. After `k` folds (merging `2^k`
blocks):
+ ///
+ /// ```text
+ /// f_k = 1 - (1 - f)^(2^k)
+ /// ```
+ ///
+ /// SBBF membership checks perform `k=8` bit checks within one 256-bit
block,
+ /// so the estimated FPP at fold level k is `f_k^8`.
+ fn num_folds_for_target_fpp(&self, target_fpp: f64) -> u32 {
+ let len = self.0.len();
+ if len < 2 {
+ return 0;
+ }
+
+ // Single pass: compute average per-block fill rate.
+ let total_set_bits: u64 = self.0.iter().map(|b|
u64::from(b.count_ones())).sum();
+ let avg_fill = total_set_bits as f64 / (len as f64 * 256.0);
+
+ // Empty filter: can fold all the way down.
+ if avg_fill == 0.0 {
+ return len.trailing_zeros();
+ }
+
+ // Find max folds where estimated FPP stays within target.
+ // f_k = 1 - (1 - avg_fill)^(2^k), FPP_k = f_k^8
+ assert!(
+ len.is_power_of_two(),
+ "Number of blocks must be a power of 2 for folding"
+ );
+ let max_folds = len.trailing_zeros(); // log2(len) since len is power
of 2
+ let one_minus_f = 1.0 - avg_fill;
+ let mut num_folds = 0u32;
+ let mut one_minus_fk = one_minus_f; // (1-f)^1 initially
+
+ for _ in 0..max_folds {
+ // After one more fold: (1-f)^(2^(k+1)) = ((1-f)^(2^k))^2
+ one_minus_fk = one_minus_fk * one_minus_fk;
+ let fk = 1.0 - one_minus_fk;
+ let estimated_fpp = fk.powi(8);
+ if estimated_fpp > target_fpp {
+ break;
+ }
+ num_folds += 1;
+ }
+
+ num_folds
+ }
+
+ /// Fold the filter `num_folds` times in a single pass.
+ ///
+ /// Merges groups of `2^num_folds` adjacent blocks via bitwise OR,
producing
+ /// `len / 2^num_folds` output blocks. The original allocation is reused.
+ ///
+ /// # Panics
+ ///
+ /// Panics if `num_folds` is 0 or would reduce the filter below 1 block.
+ fn fold_n(&mut self, num_folds: u32) {
+ assert!(num_folds > 0, "num_folds must be at least 1");
+ let len = self.0.len();
+ let group_size = 1usize << num_folds;
+ assert!(
+ group_size <= len,
+ "Cannot fold {num_folds} times: need at least {group_size} blocks,
have {len}"
+ );
+ let new_len = len / group_size;
+ for i in 0..new_len {
+ let start = i * group_size;
+ let mut merged = self.0[start];
+ for j in 1..group_size {
+ merged |= self.0[start + j];
+ }
+ self.0[i] = merged;
+ }
+ self.0.truncate(new_len);
+ }
+
/// Reads a Sbff from Thrift encoded bytes
///
/// # Examples
@@ -603,6 +876,86 @@ mod tests {
}
}
+ #[test]
+ fn test_fold_n_halves_block_count() {
+ let mut sbbf = Sbbf::new_with_num_of_bytes(1024); // 32 blocks
+ assert_eq!(sbbf.num_blocks(), 32);
+ sbbf.fold_n(1);
+ assert_eq!(sbbf.num_blocks(), 16);
+ sbbf.fold_n(1);
+ assert_eq!(sbbf.num_blocks(), 8);
+ }
+
+ #[test]
+ fn test_fold_preserves_inserted_values() {
+ // Create a large filter, insert values, fold, verify no false
negatives
+ let mut sbbf = Sbbf::new_with_num_of_bytes(32 * 1024); // 32KB = 1024
blocks
+ let values: Vec<String> = (0..1000).map(|i|
format!("value_{i}")).collect();
+ for v in &values {
+ sbbf.insert(v.as_str());
+ }
+
+ // Fold several times
+ let original_blocks = sbbf.num_blocks();
+ sbbf.fold_to_target_fpp(0.05);
+ assert!(
+ sbbf.num_blocks() < original_blocks,
+ "should have folded at least once"
+ );
+
+ // All inserted values must still be found (no false negatives)
+ for v in &values {
+ assert!(
+ sbbf.check(v.as_str()),
+ "Value '{}' missing after folding (false negative!)",
+ v
+ );
+ }
+ }
+
+ #[test]
+ fn test_fold_to_target_fpp_stops_before_exceeding_target() {
+ let mut sbbf = Sbbf::new_with_num_of_bytes(64 * 1024); // 64KB
+ // Insert enough values to set some bits
+ for i in 0..5000 {
+ sbbf.insert(&i);
+ }
+
+ let target_fpp = 0.01;
+ sbbf.fold_to_target_fpp(target_fpp);
+
+ // After folding, the estimated FPP should be at or below target
+ // (the current state should not exceed target — we stopped before
that would happen)
+ let total_bits = (sbbf.num_blocks() * 256) as f64;
+ let set_bits: u64 = sbbf
+ .0
+ .iter()
+ .flat_map(|b| b.0.iter())
+ .map(|w| w.count_ones() as u64)
+ .sum();
+ let fill = set_bits as f64 / total_bits;
+ let current_fpp = fill.powi(8);
+ assert!(
+ current_fpp <= target_fpp,
+ "FPP {current_fpp} exceeds target {target_fpp}"
+ );
+ }
+
+ #[test]
+ fn test_fold_empty_filter_folds_to_minimum() {
+ // An empty filter has fill=0, so estimated FPP is always 0 — should
fold all the way down
+ let mut sbbf = Sbbf::new_with_num_of_bytes(1024); // 32 blocks
+ sbbf.fold_to_target_fpp(0.01);
+ assert_eq!(sbbf.num_blocks(), 1);
+ }
+
+ #[test]
+ #[should_panic(expected = "Cannot fold 1 times: need at least 2 blocks,
have 1")]
+ fn test_fold_n_panics_at_minimum_size() {
+ let mut sbbf = Sbbf::new_with_num_of_bytes(32); // 1 block (minimum)
+ sbbf.fold_n(1);
+ }
+
#[test]
fn test_sbbf_write_round_trip() {
// Create a bloom filter with a 32-byte bitset (minimum size)
@@ -641,4 +994,247 @@ mod tests {
);
}
}
+
+ /// Prove that folding an SBBF by one level produces the exact same bits
+ /// as building a fresh filter at the smaller size from scratch.
+ ///
+ /// # What is folding?
+ ///
+ /// ```text
+ /// Original (N = 8 blocks):
+ /// ┌───┬───┬───┬───┬───┬───┬───┬───┐
+ /// │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │
+ /// └─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┘
+ /// │ │ │ │ │ │ │ │
+ /// └─OR┘ └─OR┘ └─OR┘ └─OR┘ pair-wise OR
+ /// │ │ │ │
+ /// ┌───┴──┬────┴──┬────┴──┬────┴──┐
+ /// │ 0|1 │ 2|3 │ 4|5 │ 6|7 │ Folded (N/2 = 4 blocks)
+ /// └──────┴───────┴───────┴───────┘
+ /// ```
+ ///
+ /// # Why folded == fresh (the two lemmas)
+ ///
+ /// An SBBF insertion does two things with a 64-bit hash `h`:
+ ///
+ /// 1. **Pick a block** — uses the upper 32 bits via
`hash_to_block_index`
+ /// 2. **Set 8 bits in that block** — uses the lower 32 bits via
`Block::mask`
+ ///
+ /// **Lemma 1 (block index halves):** `hash_to_block_index` uses
+ /// `(upper32 * N) >> 32`. When N halves, the index halves too:
+ /// `index_in(N/2) == index_in(N) / 2`. So the hash lands in the same
+ /// destination block whether you fold or build fresh.
+ ///
+ /// **Lemma 2 (mask is size-independent):** `Block::mask(h as u32)` depends
+ /// only on the lower 32 bits and the fixed SALT constants — the filter
+ /// size N is not involved. So the same 8 bits get set regardless.
+ ///
+ /// Combined: every hash sets the *same bits* in the *same destination
+ /// block* whether you fold or build fresh → filters are bit-identical.
+ #[test]
+ fn test_sbbf_folded_equals_fresh() {
+ let values = (0..5000).map(|i|
format!("elem_{i}")).collect::<Vec<_>>();
+ let hashes = values
+ .iter()
+ .map(|v| hash_as_bytes(v.as_str()))
+ .collect::<Vec<_>>();
+
+ for num_blocks in [64, 256, 1024] {
+ let half = num_blocks / 2;
+
+ // Build a filter with N blocks and insert all values.
+ let mut original = Sbbf::new_with_num_of_bytes(num_blocks * 32);
+ assert_eq!(original.num_blocks(), num_blocks);
+ for &h in &hashes {
+ original.insert_hash(h);
+ }
+
+ // --- Per-hash verification of the two lemmas ---
+ for &h in hashes.iter() {
+ // mask(h as u32) gives the 8-bit pattern that this hash sets
+ // inside whichever block it lands in. It uses only the lower
+ // 32 bits of h, so it's the same regardless of filter size.
+ let mask = Block::mask(h as u32);
+
+ // Lemma 1 check: the block index in the original N-block
+ // filter, divided by 2, should equal the block index in a
+ // fresh N/2-block filter.
+ let orig_idx = original.hash_to_block_index(h);
+ assert!(orig_idx < num_blocks);
+
+ let fresh_idx = {
+ let tmp = Sbbf(vec![Block::ZERO; half]);
+ tmp.hash_to_block_index(h)
+ };
+ let folded_idx = orig_idx / 2;
+ assert_eq!(
+ fresh_idx, folded_idx,
+ "Lemma 1 failed: fresh index {fresh_idx} != folded index
{folded_idx}"
+ );
+
+ // Lemma 2 check: every bit that mask wants to set is actually
+ // present in the original block.
+ //
+ // mask.0[w] has exactly ONE bit set (see Block::mask: `1 <<
y`).
+ // The block at orig_idx has many bits set from many inserts,
so
+ // we can't test equality — we test that the specific mask bit
is
+ // *present*:
+ //
+ // block_word & mask_word != 0
+ // ⟺ "the one bit in the mask is set in the block"
+ //
+ // (Since mask_word has exactly 1 bit, `& mask != 0` is the
same
+ // as `& mask == mask` — but `!= 0` reads more naturally.)
+ for w in 0..8 {
+ assert_ne!(
+ original.0[orig_idx].0[w] & mask.0[w],
+ 0,
+ "Lemma 2 failed: mask bit not set in word {w} of block
{orig_idx}"
+ );
+ }
+ }
+
+ // --- Final bit-identical comparison ---
+ // Fold the original N-block filter down to N/2 blocks.
+ let mut folded = original.clone();
+ folded.fold_n(1);
+ assert_eq!(folded.num_blocks(), half);
+
+ // Build a fresh N/2-block filter with the same values.
+ let mut fresh = Sbbf::new_with_num_of_bytes(half * 32);
+ for &h in &hashes {
+ fresh.insert_hash(h);
+ }
+
+ // By lemmas 1 + 2, every block should be bit-identical.
+ for j in 0..half {
+ assert_eq!(
+ folded.0[j].0, fresh.0[j].0,
+ "Block {j} differs after fold (N={num_blocks} → {half})"
+ );
+ }
+ }
+ }
+
+ /// Inductive multi-step folding: folding k times from N blocks produces
+ /// a filter bit-identical to a fresh N/2^k-block filter.
+ ///
+ /// `test_sbbf_folded_equals_fresh` proves the base case (one fold).
+ /// This test applies folds *repeatedly*, checking after each step:
+ ///
+ /// ```text
+ /// 512 ─fold→ 256 ─fold→ 128 ─…→ 1 (9 folds total)
+ /// ```
+ ///
+ /// At each intermediate size we build a fresh filter and assert
+ /// bit-equality, confirming the lemma composes across folds.
+ #[test]
+ fn test_multi_step_fold() {
+ let values = (0..3000).map(|i| format!("x_{i}")).collect::<Vec<_>>();
+
+ // Start with a 512-block filter.
+ let mut filter = Sbbf::new_with_num_of_bytes(512 * 32);
+ for v in &values {
+ filter.insert(v.as_str());
+ }
+
+ // Fold one level at a time, comparing against a fresh filter each
step.
+ for expected_blocks in [256, 128, 64, 32, 16, 8, 4, 2, 1] {
+ filter.fold_n(1);
+ assert_eq!(filter.num_blocks(), expected_blocks);
+
+ let mut fresh = Sbbf::new_with_num_of_bytes(expected_blocks * 32);
+ for v in &values {
+ fresh.insert(v.as_str());
+ }
+ for (fb, rb) in filter.0.iter().zip(fresh.0.iter()) {
+ assert_eq!(fb.0, rb.0);
+ }
+ }
+ }
+
+ /// test that the fpp estimator's overestimation doesn't cause
fold_to_target_fpp
+ /// to produce significantly oversized filters
+ ///
+ /// compare the final size after folding against the theoretical optimal
size
+ #[test]
+ fn test_fold_size_vs_optimal_fixed_size() {
+ for (ndv, target_fpp) in [
+ (1000, 0.05),
+ (1000, 0.01),
+ (5000, 0.05),
+ (5000, 0.01),
+ (10000, 0.05),
+ ] {
+ let values = (0..ndv).map(|i|
format!("d_{i}")).collect::<Vec<_>>();
+
+ let mut folded = Sbbf::new_with_num_of_bytes(128 * 1024); // 128KB
+ for v in &values {
+ folded.insert(v.as_str());
+ }
+ folded.fold_to_target_fpp(target_fpp);
+
+ let folded_bytes = folded.num_blocks() * 32;
+
+ let optimal = Sbbf::new_with_ndv_fpp(ndv as u64,
target_fpp).unwrap();
+ let optimal_bytes = optimal.num_blocks() * 32;
+
+ let ratio = folded_bytes as f64 / optimal_bytes as f64;
+
+ assert_eq!(ratio, 1.0);
+ }
+ }
+
+ /// verify that a folded sbbf has the same empirical fpp as a fresh filter
of the same size
+ /// this bridges the bit-identity proof above with the FPP guarantee from
the folding paper
+ /// since the bits are identical, the false-positive rate must be too
+ ///
+ /// we measure fpp empirically by probing with values that were never
inserted
+ /// and counting how many are incorrectly marked as present
+ #[test]
+ fn test_folded_fpp_matches_fresh_fpp() {
+ let ndv = 2000;
+ let num_probes = 50_000;
+ let inserted = (0..ndv)
+ .map(|i| format!("ins_{i}"))
+ .collect::<Vec<String>>();
+
+ // probe values that were NOT inserted (different prefix guarantees no
overlap)
+ let probes = (0..num_probes)
+ .map(|i| format!("probe_{i}"))
+ .collect::<Vec<String>>();
+
+ // build a large filter and fold it down several times
+ let mut folded = Sbbf::new_with_num_of_bytes(512 * 32); // 512 blocks
+ for v in &inserted {
+ folded.insert(v.as_str());
+ }
+
+ // check FPP at each fold level
+ for expected_blocks in [256, 128, 64, 32, 16, 8, 4, 2, 1] {
+ folded.fold_n(1);
+ assert_eq!(folded.num_blocks(), expected_blocks);
+
+ // build a fresh filter of the same size with the same values
+ let mut fresh = Sbbf::new_with_num_of_bytes(expected_blocks * 32);
+ for v in &inserted {
+ fresh.insert(v.as_str());
+ }
+
+ // measure empirical FPP on both
+ let mut folded_fp = 0u64;
+ let mut fresh_fp = 0u64;
+ for p in &probes {
+ if folded.check(p.as_str()) {
+ folded_fp += 1;
+ }
+ if fresh.check(p.as_str()) {
+ fresh_fp += 1;
+ }
+ }
+
+ // bit-identity means these must be exactly equal
+ assert_eq!(folded_fp, fresh_fp);
+ }
+ }
}
diff --git a/parquet/src/column/writer/encoder.rs
b/parquet/src/column/writer/encoder.rs
index 11d4f3142a..ec1afca583 100644
--- a/parquet/src/column/writer/encoder.rs
+++ b/parquet/src/column/writer/encoder.rs
@@ -138,6 +138,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
min_value: Option<T::T>,
max_value: Option<T::T>,
bloom_filter: Option<Sbbf>,
+ bloom_filter_target_fpp: f64,
variable_length_bytes: Option<i64>,
geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
}
@@ -187,7 +188,9 @@ impl<T: DataType> ColumnValueEncoder for
ColumnValueEncoderImpl<T> {
type Values = [T::T];
fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
- self.bloom_filter.take()
+ let mut sbbf = self.bloom_filter.take()?;
+ sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
+ Some(sbbf)
}
fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) ->
Result<Self> {
@@ -205,10 +208,7 @@ impl<T: DataType> ColumnValueEncoder for
ColumnValueEncoderImpl<T> {
let statistics_enabled = props.statistics_enabled(descr.path());
- let bloom_filter = props
- .bloom_filter_properties(descr.path())
- .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
- .transpose()?;
+ let (bloom_filter, bloom_filter_target_fpp) =
create_bloom_filter(props, descr)?;
let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
@@ -219,6 +219,7 @@ impl<T: DataType> ColumnValueEncoder for
ColumnValueEncoderImpl<T> {
num_values: 0,
statistics_enabled,
bloom_filter,
+ bloom_filter_target_fpp,
min_value: None,
max_value: None,
variable_length_bytes: None,
@@ -384,6 +385,21 @@ fn replace_zero<T: ParquetValueType>(val: &T, descr:
&ColumnDescriptor, replace:
}
}
+/// Creates a bloom filter sized for the column's configured NDV, returning
the filter
+/// and the target FPP for folding.
+pub(crate) fn create_bloom_filter(
+ props: &WriterProperties,
+ descr: &ColumnDescPtr,
+) -> Result<(Option<Sbbf>, f64)> {
+ match props.bloom_filter_properties(descr.path()) {
+ Some(bf_props) => Ok((
+ Some(Sbbf::new_with_ndv_fpp(bf_props.ndv, bf_props.fpp)?),
+ bf_props.fpp,
+ )),
+ None => Ok((None, 0.0)),
+ }
+}
+
fn update_geo_stats_accumulator<'a, T, I>(bounder: &mut dyn
GeoStatsAccumulator, iter: I)
where
T: ParquetValueType + 'a,
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 640a7a075d..65630cfed2 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -53,8 +53,14 @@ pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs
version ", env!("CARGO_
pub const DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH: Option<usize> = Some(64);
/// Default value for [`BloomFilterProperties::fpp`]
pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05;
-/// Default value for [`BloomFilterProperties::ndv`]
-pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64;
+/// Default value for [`BloomFilterProperties::ndv`].
+///
+/// Note: this is only the fallback default used when constructing
[`BloomFilterProperties`]
+/// directly. When using [`WriterPropertiesBuilder`], columns with bloom
filters enabled
+/// but without an explicit NDV will have their NDV resolved at build time to
+/// [`WriterProperties::max_row_group_row_count`], which may differ from this
constant
+/// if the user configured a custom row group size.
+pub const DEFAULT_BLOOM_FILTER_NDV: u64 = DEFAULT_MAX_ROW_GROUP_ROW_COUNT as
u64;
/// Default values for [`WriterProperties::statistics_truncate_length`]
pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option<usize> = Some(64);
/// Default value for [`WriterProperties::offset_index_disabled`]
@@ -587,6 +593,18 @@ impl Default for WriterPropertiesBuilder {
impl WriterPropertiesBuilder {
/// Finalizes the configuration and returns immutable writer properties
struct.
pub fn build(self) -> WriterProperties {
+ // Resolve bloom filter NDV for columns where it wasn't explicitly set:
+ // default to max_row_group_row_count so the filter is never
undersized.
+ let default_ndv = self
+ .max_row_group_row_count
+ .unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT) as u64;
+ let mut default_column_properties = self.default_column_properties;
+ default_column_properties.resolve_bloom_filter_ndv(default_ndv);
+ let mut column_properties = self.column_properties;
+ for props in column_properties.values_mut() {
+ props.resolve_bloom_filter_ndv(default_ndv);
+ }
+
WriterProperties {
data_page_row_count_limit: self.data_page_row_count_limit,
write_batch_size: self.write_batch_size,
@@ -597,8 +615,8 @@ impl WriterPropertiesBuilder {
created_by: self.created_by,
offset_index_disabled: self.offset_index_disabled,
key_value_metadata: self.key_value_metadata,
- default_column_properties: self.default_column_properties,
- column_properties: self.column_properties,
+ default_column_properties,
+ column_properties,
sorting_columns: self.sorting_columns,
column_index_truncate_length: self.column_index_truncate_length,
statistics_truncate_length: self.statistics_truncate_length,
@@ -996,8 +1014,13 @@ impl WriterPropertiesBuilder {
self
}
- /// Sets default number of distinct values (ndv) for bloom filter for all
- /// columns (defaults to `1_000_000` via [`DEFAULT_BLOOM_FILTER_NDV`]).
+ /// Sets default maximum expected number of distinct values (ndv) for
bloom filter
+ /// for all columns (defaults to [`DEFAULT_BLOOM_FILTER_NDV`]).
+ ///
+ /// The bloom filter is initially sized for this many distinct values at
the
+ /// configured FPP, then folded down after all values are inserted to
achieve
+ /// optimal size. A good heuristic is to set this to the expected number
of rows
+ /// in the row group.
///
/// Implicitly enables bloom writing, as if [`set_bloom_filter_enabled`]
had
/// been called.
@@ -1191,6 +1214,13 @@ impl Default for EnabledStatistics {
}
/// Controls the bloom filter to be computed by the writer.
+///
+/// The bloom filter is initially sized for `ndv` distinct values at the given
`fpp`, then
+/// automatically folded down after all values are inserted to achieve optimal
size while
+/// maintaining the target `fpp`. See [`Sbbf::fold_to_target_fpp`] for details
on the
+/// folding algorithm.
+///
+/// [`Sbbf::fold_to_target_fpp`]: crate::bloom_filter::Sbbf::fold_to_target_fpp
#[derive(Debug, Clone, PartialEq)]
pub struct BloomFilterProperties {
/// False positive probability. This should be always between 0 and 1
exclusive. Defaults to [`DEFAULT_BLOOM_FILTER_FPP`].
@@ -1201,20 +1231,30 @@ pub struct BloomFilterProperties {
/// smaller the fpp, the more memory and disk space is required, thus
setting it to a reasonable value
/// e.g. 0.1, 0.05, or 0.001 is recommended.
///
- /// Setting to a very small number diminishes the value of the filter
itself, as the bitset size is
- /// even larger than just storing the whole value. You are also expected
to set `ndv` if it can
- /// be known in advance to greatly reduce space usage.
+ /// This value also serves as the target FPP for bloom filter folding:
after all values
+ /// are inserted, the filter is folded down to the smallest size that
still meets this FPP.
pub fpp: f64,
- /// Number of distinct values, should be non-negative to be meaningful.
Defaults to [`DEFAULT_BLOOM_FILTER_NDV`].
+ /// Maximum expected number of distinct values. Defaults to
[`DEFAULT_BLOOM_FILTER_NDV`].
///
/// You should set this value by calling
[`WriterPropertiesBuilder::set_bloom_filter_ndv`].
///
- /// Usage of bloom filter is most beneficial for columns with large
cardinality, so a good heuristic
- /// is to set ndv to the number of rows. However, it can reduce disk size
if you know in advance a smaller
- /// number of distinct values. For very small ndv value it is probably not
worth it to use bloom filter
- /// anyway.
- ///
- /// Increasing this value (without increasing fpp) will result in an
increase in disk or memory size.
+ /// When not explicitly set via the builder, this defaults to
+ /// [`max_row_group_row_count`](WriterProperties::max_row_group_row_count)
(resolved at
+ /// build time). The bloom filter is initially sized for this many
distinct values at the
+ /// given `fpp`, then folded down after insertion to achieve optimal size.
A good heuristic
+ /// is to set this to the expected number of rows in the row group. If
fewer distinct values
+ /// are actually written, the filter will be automatically compacted via
folding.
+ ///
+ /// Thus the only negative side of overestimating this value is that the
bloom filter
+ /// will use more memory during writing than necessary, but it will not
affect the final
+ /// bloom filter size on disk.
+ ///
+ /// If you wish to reduce memory usage during writing and are able to make
a reasonable estimate
+ /// of the number of distinct values in a row group, it is recommended to
set this value explicitly
+ /// rather than relying on the default dynamic sizing based on
`max_row_group_row_count`.
+ /// If you do set this value explicitly it is probably best to set it for
each column
+ /// individually via
[`WriterPropertiesBuilder::set_column_bloom_filter_ndv`] rather than globally,
+ /// since different columns may have different numbers of distinct values.
pub ndv: u64,
}
@@ -1242,6 +1282,8 @@ struct ColumnProperties {
write_page_header_statistics: Option<bool>,
/// bloom filter related properties
bloom_filter_properties: Option<BloomFilterProperties>,
+ /// Whether the bloom filter NDV was explicitly set by the user
+ bloom_filter_ndv_is_set: bool,
}
impl ColumnProperties {
@@ -1319,12 +1361,13 @@ impl ColumnProperties {
.fpp = value;
}
- /// Sets the number of distinct (unique) values for bloom filter for this
column, and implicitly
- /// enables bloom filter if not previously enabled.
+ /// Sets the maximum expected number of distinct (unique) values for bloom
filter for this
+ /// column, and implicitly enables bloom filter if not previously enabled.
fn set_bloom_filter_ndv(&mut self, value: u64) {
self.bloom_filter_properties
.get_or_insert_with(Default::default)
.ndv = value;
+ self.bloom_filter_ndv_is_set = true;
}
/// Returns optional encoding for this column.
@@ -1372,6 +1415,16 @@ impl ColumnProperties {
fn bloom_filter_properties(&self) -> Option<&BloomFilterProperties> {
self.bloom_filter_properties.as_ref()
}
+
+ /// If bloom filter is enabled and NDV was not explicitly set, resolve it
to the
+ /// given `default_ndv` (typically derived from `max_row_group_row_count`).
+ fn resolve_bloom_filter_ndv(&mut self, default_ndv: u64) {
+ if !self.bloom_filter_ndv_is_set {
+ if let Some(ref mut bf) = self.bloom_filter_properties {
+ bf.ndv = default_ndv;
+ }
+ }
+ }
}
/// Reference counted reader properties.
@@ -1703,8 +1756,8 @@ mod tests {
assert_eq!(
props.bloom_filter_properties(&ColumnPath::from("col")),
Some(&BloomFilterProperties {
- fpp: 0.05,
- ndv: 1_000_000_u64
+ fpp: DEFAULT_BLOOM_FILTER_FPP,
+ ndv: DEFAULT_BLOOM_FILTER_NDV,
})
);
}
@@ -1746,8 +1799,8 @@ mod tests {
.build()
.bloom_filter_properties(&ColumnPath::from("col")),
Some(&BloomFilterProperties {
- fpp: 0.05,
- ndv: 100
+ fpp: DEFAULT_BLOOM_FILTER_FPP,
+ ndv: 100,
})
);
assert_eq!(
@@ -1757,7 +1810,7 @@ mod tests {
.bloom_filter_properties(&ColumnPath::from("col")),
Some(&BloomFilterProperties {
fpp: 0.1,
- ndv: 1_000_000_u64
+ ndv: DEFAULT_BLOOM_FILTER_NDV,
})
);
}