This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new c6ea0a5e0c Add bloom filter folding to automatically size SBBF filters 
(#9628)
c6ea0a5e0c is described below

commit c6ea0a5e0cf6c5fc9f2fc3610fe2351bb5ec25c0
Author: Adrian Garcia Badaracco <[email protected]>
AuthorDate: Tue Apr 7 05:54:47 2026 -0500

    Add bloom filter folding to automatically size SBBF filters (#9628)
    
    ## Summary
    
    Bloom filters now support **folding mode**: allocate a conservatively
    large filter (sized for worst-case NDV), insert all values during
    writing, then fold down at flush time to meet a target FPP. This
    eliminates the need to guess NDV upfront and produces optimally-sized
    filters automatically.
    
    ### Changes
    
    - `BloomFilterProperties.ndv` changed from `u64` to `Option<u64>` — when
    `None` (new default), the filter is sized based on
    `max_row_group_row_count`; when `Some(n)`, the explicit NDV is used
    - `DEFAULT_BLOOM_FILTER_NDV` redefined to
    `DEFAULT_MAX_ROW_GROUP_ROW_COUNT as u64` (was hardcoded `1_000_000`)
    - Added `Sbbf::fold_to_target_fpp()` and supporting methods
    (`num_folds_for_target_fpp`, `fold_n`, `num_blocks`) with comprehensive
    documentation
    - `flush_bloom_filter()` in both `ColumnValueEncoderImpl` and
    `ByteArrayEncoder` now folds the filter before returning it
    - New `create_bloom_filter()` helper in `encoder.rs` centralizes bloom
    filter construction logic
    
    ### How folding works
    
    The SBBF fold operation merges adjacent block pairs (`block[2i] |
    block[2i+1]`) via bitwise OR, halving the filter size. This differs from
    standard Bloom filter folding (which merges halves at distance `m/2`)
    because SBBF uses multiplicative hashing for block selection:
    
    ```
    block_index = ((hash >> 32) * num_blocks) >> 32
    ```
    
    When `num_blocks` is halved, the new index becomes `floor(original_index
    / 2)`, so adjacent blocks map to the same position.
    
    The number of safe folds is determined analytically from the average
    per-block fill rate: after `k` folds, expected fill is `1 -
    (1-f)^(2^k)`, giving `FPP = fill^8`. This requires only a single
    popcount scan over the blocks (no scratch allocation), then O(log N)
    floating-point ops to find the optimal fold count. The actual fold is
    then performed in a single pass.
    
    ### Benchmarks
    
    Filter sized for 1M NDV, varying actual distinct values inserted.
    Measured on Apple M3 Pro.
    
    **Fold overhead (fold_to_target_fpp only):**
    
    | Actual NDV | Time | Throughput |
    |---|---|---|
    | 1,000 | 39.1 µs | 838 Melem/s |
    | 10,000 | 34.2 µs | 960 Melem/s |
    | 100,000 | 32.5 µs | 1.01 Gelem/s |
    
    **End-to-end (insert + fold) vs insert-only:**
    
    | Actual NDV | Insert only | Insert + fold | Fold overhead |
    |---|---|---|---|
    | 1,000 | 14.7 µs | 49.1 µs | 34.4 µs (70%) |
    | 10,000 | 30.7 µs | 58.1 µs | 27.4 µs (47%) |
    | 100,000 | 162.5 µs | 189.8 µs | 27.3 µs (14%) |
    
    The fold cost is dominated by the popcount scan over the initial (large)
    filter. For the common case (100K values into a 1M-NDV filter), folding
    adds only ~14% overhead to the total insert+fold time.
    
    ### References
    
    Sailhan & Stehr, ["Folding and Unfolding Bloom
    Filters"](https://hal.science/hal-01126174v1/document), IEEE iThings
    2012.
    
    Liang, ["Blocked Bloom Filters: Speeding Up Point Lookups in Tiger
    Postgres' Native
    
Columnstore"](https://www.tigerdata.com/blog/blocked-bloom-filters-speeding-up-point-lookups-in-tiger-postgres-native-columnstore)
    
    ### Breaking changes
    
    There are no breaking API changes
    
    However, when bloom filters are enabled without specifying the number of
    distinct values, the bloom filters are automatically sized. Previously
    they would be sized using the default value of
    `DEFAULT_BLOOM_FILTER_NDV`
    
    ## Test plan
    
    - [x] All existing bloom filter unit tests pass
    - [x] All existing integration tests (sync + async reader roundtrips)
    pass
    - [x] New unit tests: fold correctness, no false negatives after
    folding, FPP target respected, minimum size guard
    - [x] New unit tests: folded filter is bit-identical to a fresh filter
    of the same size (proves correctness via two lemmas about SBBF hashing)
    - [x] New unit tests: multi-step folding, folded FPP matches fresh FPP
    empirically, fold size matches optimal fixed-size filter
    - [x] New integration test: `i32_column_bloom_filter_fixed_ndv` —
    roundtrip with both overestimated and underestimated NDV
    - [x] Full `cargo test -p parquet` passes
    
    
    🤖 Generated with [Claude Code](https://claude.com/claude-code)
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 (1M context) <[email protected]>
    Co-authored-by: Matthew Kim 
<[email protected]>
    Co-authored-by: emkornfield <[email protected]>
    Co-authored-by: Liang-Chi Hsieh <[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 parquet/Cargo.toml                           |   4 +
 parquet/benches/bloom_filter.rs              | 113 +++++
 parquet/src/arrow/arrow_writer/byte_array.rs |  15 +-
 parquet/src/arrow/arrow_writer/mod.rs        |  47 +-
 parquet/src/bloom_filter/mod.rs              | 622 ++++++++++++++++++++++++++-
 parquet/src/column/writer/encoder.rs         |  26 +-
 parquet/src/file/properties.rs               |  99 ++++-
 7 files changed, 876 insertions(+), 50 deletions(-)

diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml
index efcd1fe219..696e9b5411 100644
--- a/parquet/Cargo.toml
+++ b/parquet/Cargo.toml
@@ -275,5 +275,9 @@ name = "row_selection_cursor"
 harness = false
 required-features = ["arrow"]
 
+[[bench]]
+name = "bloom_filter"
+harness = false
+
 [lib]
 bench = false
diff --git a/parquet/benches/bloom_filter.rs b/parquet/benches/bloom_filter.rs
new file mode 100644
index 0000000000..ca4f900067
--- /dev/null
+++ b/parquet/benches/bloom_filter.rs
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, 
criterion_main};
+use parquet::bloom_filter::Sbbf;
+
+/// Build a bloom filter sized for `initial_ndv` at `fpp`, insert `num_values` 
distinct values,
+/// and return it ready for folding.
+fn build_filter(initial_ndv: u64, fpp: f64, num_values: u64) -> Sbbf {
+    let mut sbbf = Sbbf::new_with_ndv_fpp(initial_ndv, fpp).unwrap();
+    for i in 0..num_values {
+        sbbf.insert(&i);
+    }
+    sbbf
+}
+
+fn bench_fold_to_target_fpp(c: &mut Criterion) {
+    let mut group = c.benchmark_group("fold_to_target_fpp");
+
+    // Realistic scenario: filter sized for 1M NDV, varying actual distinct 
values
+    let initial_ndv = 1_000_000u64;
+    let fpp = 0.05;
+
+    for num_values in [1_000u64, 10_000, 100_000] {
+        let filter = build_filter(initial_ndv, fpp, num_values);
+        let num_blocks = filter.num_blocks();
+        group.throughput(Throughput::Elements(num_blocks as u64));
+        group.bench_with_input(BenchmarkId::new("ndv", num_values), &filter, 
|b, filter| {
+            b.iter_batched(
+                || filter.clone(),
+                |mut f| {
+                    f.fold_to_target_fpp(fpp);
+                    f
+                },
+                criterion::BatchSize::SmallInput,
+            );
+        });
+    }
+    group.finish();
+}
+
+fn bench_insert_and_fold(c: &mut Criterion) {
+    let mut group = c.benchmark_group("insert_and_fold");
+
+    let initial_ndv = 1_000_000u64;
+    let fpp = 0.05;
+
+    for num_values in [1_000u64, 10_000, 100_000] {
+        group.throughput(Throughput::Elements(num_values));
+        group.bench_with_input(
+            BenchmarkId::new("values", num_values),
+            &num_values,
+            |b, &num_values| {
+                b.iter(|| {
+                    let mut sbbf = Sbbf::new_with_ndv_fpp(initial_ndv, 
fpp).unwrap();
+                    for i in 0..num_values {
+                        sbbf.insert(&i);
+                    }
+                    sbbf.fold_to_target_fpp(fpp);
+                    sbbf
+                });
+            },
+        );
+    }
+    group.finish();
+}
+
+fn bench_insert_only(c: &mut Criterion) {
+    let mut group = c.benchmark_group("insert_only");
+
+    let initial_ndv = 1_000_000u64;
+    let fpp = 0.05;
+
+    for num_values in [1_000u64, 10_000, 100_000] {
+        group.throughput(Throughput::Elements(num_values));
+        group.bench_with_input(
+            BenchmarkId::new("values", num_values),
+            &num_values,
+            |b, &num_values| {
+                b.iter(|| {
+                    let mut sbbf = Sbbf::new_with_ndv_fpp(initial_ndv, 
fpp).unwrap();
+                    for i in 0..num_values {
+                        sbbf.insert(&i);
+                    }
+                    sbbf
+                });
+            },
+        );
+    }
+    group.finish();
+}
+
+criterion_group!(
+    benches,
+    bench_fold_to_target_fpp,
+    bench_insert_and_fold,
+    bench_insert_only
+);
+criterion_main!(benches);
diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs 
b/parquet/src/arrow/arrow_writer/byte_array.rs
index 228d229b30..f56f9570ad 100644
--- a/parquet/src/arrow/arrow_writer/byte_array.rs
+++ b/parquet/src/arrow/arrow_writer/byte_array.rs
@@ -17,7 +17,9 @@
 
 use crate::basic::Encoding;
 use crate::bloom_filter::Sbbf;
-use crate::column::writer::encoder::{ColumnValueEncoder, DataPageValues, 
DictionaryPage};
+use crate::column::writer::encoder::{
+    ColumnValueEncoder, DataPageValues, DictionaryPage, create_bloom_filter,
+};
 use crate::data_type::{AsBytes, ByteArray, Int32Type};
 use crate::encodings::encoding::{DeltaBitPackEncoder, Encoder};
 use crate::encodings::rle::RleEncoder;
@@ -423,6 +425,7 @@ pub struct ByteArrayEncoder {
     min_value: Option<ByteArray>,
     max_value: Option<ByteArray>,
     bloom_filter: Option<Sbbf>,
+    bloom_filter_target_fpp: f64,
     geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
 }
 
@@ -430,7 +433,9 @@ impl ColumnValueEncoder for ByteArrayEncoder {
     type T = ByteArray;
     type Values = dyn Array;
     fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
-        self.bloom_filter.take()
+        let mut sbbf = self.bloom_filter.take()?;
+        sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
+        Some(sbbf)
     }
 
     fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> Result<Self>
@@ -443,10 +448,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
 
         let fallback = FallbackEncoder::new(descr, props)?;
 
-        let bloom_filter = props
-            .bloom_filter_properties(descr.path())
-            .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
-            .transpose()?;
+        let (bloom_filter, bloom_filter_target_fpp) = 
create_bloom_filter(props, descr)?;
 
         let statistics_enabled = props.statistics_enabled(descr.path());
 
@@ -456,6 +458,7 @@ impl ColumnValueEncoder for ByteArrayEncoder {
             fallback,
             statistics_enabled,
             bloom_filter,
+            bloom_filter_target_fpp,
             dict_encoder: dictionary,
             min_value: None,
             max_value: None,
diff --git a/parquet/src/arrow/arrow_writer/mod.rs 
b/parquet/src/arrow/arrow_writer/mod.rs
index 2ef71d5745..8422263b1f 100644
--- a/parquet/src/arrow/arrow_writer/mod.rs
+++ b/parquet/src/arrow/arrow_writer/mod.rs
@@ -2681,6 +2681,7 @@ mod tests {
         values: ArrayRef,
         schema: SchemaRef,
         bloom_filter: bool,
+        bloom_filter_ndv: Option<u64>,
         bloom_filter_position: BloomFilterPosition,
     }
 
@@ -2692,6 +2693,7 @@ mod tests {
                 values,
                 schema: Arc::new(schema),
                 bloom_filter: false,
+                bloom_filter_ndv: None,
                 bloom_filter_position: BloomFilterPosition::AfterRowGroup,
             }
         }
@@ -2712,6 +2714,7 @@ mod tests {
             values,
             schema,
             bloom_filter,
+            bloom_filter_ndv,
             bloom_filter_position,
         } = options;
 
@@ -2750,15 +2753,18 @@ mod tests {
             for encoding in &encodings {
                 for version in [WriterVersion::PARQUET_1_0, 
WriterVersion::PARQUET_2_0] {
                     for row_group_size in row_group_sizes {
-                        let props = WriterProperties::builder()
+                        let mut builder = WriterProperties::builder()
                             .set_writer_version(version)
                             .set_max_row_group_row_count(Some(row_group_size))
                             .set_dictionary_enabled(dictionary_size != 0)
                             
.set_dictionary_page_size_limit(dictionary_size.max(1))
                             .set_encoding(*encoding)
                             .set_bloom_filter_enabled(bloom_filter)
-                            .set_bloom_filter_position(bloom_filter_position)
-                            .build();
+                            .set_bloom_filter_position(bloom_filter_position);
+                        if let Some(ndv) = bloom_filter_ndv {
+                            builder = builder.set_bloom_filter_ndv(ndv);
+                        }
+                        let props = builder.build();
 
                         files.push(roundtrip_opts(&expected_batch, props))
                     }
@@ -3142,6 +3148,41 @@ mod tests {
         );
     }
 
+    /// Test that bloom filter folding produces correct results even when
+    /// the configured NDV differs significantly from actual NDV.
+    /// A large NDV means a larger initial filter that gets folded down;
+    /// a small NDV means a smaller initial filter.
+    #[test]
+    fn i32_column_bloom_filter_fixed_ndv() {
+        let array = Arc::new(Int32Array::from_iter(0..SMALL_SIZE as i32));
+
+        // NDV much larger than actual distinct values — tests folding a large 
filter down
+        let mut options = RoundTripOptions::new(array.clone(), false);
+        options.bloom_filter = true;
+        options.bloom_filter_ndv = Some(1_000_000);
+
+        let files = one_column_roundtrip_with_options(options);
+        check_bloom_filter(
+            files,
+            "col".to_string(),
+            (0..SMALL_SIZE as i32).collect(),
+            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
+        );
+
+        // NDV smaller than actual distinct values — tests the underestimate 
path
+        let mut options = RoundTripOptions::new(array, false);
+        options.bloom_filter = true;
+        options.bloom_filter_ndv = Some(3);
+
+        let files = one_column_roundtrip_with_options(options);
+        check_bloom_filter(
+            files,
+            "col".to_string(),
+            (0..SMALL_SIZE as i32).collect(),
+            (SMALL_SIZE as i32 + 1..SMALL_SIZE as i32 + 10).collect(),
+        );
+    }
+
     #[test]
     fn binary_column_bloom_filter() {
         let one_vec: Vec<u8> = (0..SMALL_SIZE as u8).collect();
diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs
index 933b5a269f..8e89ba406b 100644
--- a/parquet/src/bloom_filter/mod.rs
+++ b/parquet/src/bloom_filter/mod.rs
@@ -68,6 +68,46 @@
 //! | 1,000,000 | 0.00001   | 131,072 | 4,096     |
 //! | 1,000,000 | 0.000001  | 262,144 | 8,192     |
 //!
+//! # Structure: Filter → Blocks → Words → Bits
+//!
+//! An SBBF is an array of **blocks**. Each block is 256 bits (32 bytes),
+//! divided into eight 32-bit **words**. A word is just a `u32` — an array of
+//! 32 individual bits that can each be "set" (1) or "not set" (0).
+//!
+//! ```text
+//!   Sbbf (the whole filter)
+//!   ┌──────────┬──────────┬──────────┬─── ─── ──┬──────────┐
+//!   │ Block 0  │ Block 1  │ Block 2  │   ...    │ Block N-1│
+//!   └──────────┴──────────┴──────────┴─── ─── ──┴──────────┘
+//!        │
+//!        ▼
+//!   One Block = 256 bits = 8 words
+//!   ┌────────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┐
+//!   │ word 0 │ word 1 │ word 2 │ word 3 │ word 4 │ word 5 │ word 6 │ word 7 │
+//!   │ (u32)  │ (u32)  │ (u32)  │ (u32)  │ (u32)  │ (u32)  │ (u32)  │ (u32)  │
+//!   └────────┴────────┴────────┴────────┴────────┴────────┴────────┴────────┘
+//!        │
+//!        ▼
+//!   One Word = 32 individual bits
+//!   ┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
+//!   │0│0│1│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│0│  ← bit 
29 is set
+//!   └─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
+//! ```
+//!
+//! **Inserting** a value hashes it to a 64-bit number, then:
+//!  1. The upper 32 bits pick which **block** (via 
`Sbbf::hash_to_block_index`).
+//!  2. The lower 32 bits pick one bit position in each of the 8 **words** 
(via `Block::mask`).
+//!     So each insert sets exactly **8 bits** (one per word) in a single 
block.
+//!
+//! **Checking** does the same two steps and returns `true` only if all 8 bits
+//! are already set — meaning the value was *probably* inserted (or is a false
+//! positive).
+//!
+//! # Bloom Filter Folding
+//!
+//! After inserting all values into a bloom filter it can be "folded" to 
minimize it's size.
+//! See [`Sbbf::fold_to_target_fpp`] for details  on the algorithm and its 
mathematical basis.
+//!
 //! [parquet-bf-spec]: 
https://github.com/apache/parquet-format/blob/master/BloomFilter.md
 //! [sbbf-paper]: https://arxiv.org/pdf/2101.01719
 //! [bf-formulae]: http://tfk.mit.edu/pdf/bloom.pdf
@@ -114,23 +154,50 @@ pub struct BloomFilterHeader {
 }
 );
 
-/// Each block is 256 bits, broken up into eight contiguous "words", each 
consisting of 32 bits.
-/// Each word is thought of as an array of bits; each bit is either "set" or 
"not set".
+/// A single 256-bit block, the basic unit of the Split Block Bloom Filter.
+///
+/// A block is eight contiguous 32-bit **words** (`[u32; 8]`).
+/// Each word is an independent bit-array of 32 positions:
+///
+/// ```text
+///   Block (256 bits total)
+///   ┌────────┬────────┬────────┬────────┬────────┬────────┬────────┬────────┐
+///   │ word 0 │ word 1 │ word 2 │ word 3 │ word 4 │ word 5 │ word 6 │ word 7 │
+///   │ 32 bits│ 32 bits│ 32 bits│ 32 bits│ 32 bits│ 32 bits│ 32 bits│ 32 bits│
+///   └────────┴────────┴────────┴────────┴────────┴────────┴────────┴────────┘
+/// ```
+///
+/// When a value is inserted, [`Block::mask`] picks one bit in each word
+/// (8 bits total), and those bits are OR'd in. When checking, we verify
+/// all 8 bits are set.
 #[derive(Debug, Copy, Clone)]
 #[repr(transparent)]
 struct Block([u32; 8]);
 impl Block {
     const ZERO: Block = Block([0; 8]);
 
-    /// takes as its argument a single unsigned 32-bit integer and returns a 
block in which each
-    /// word has exactly one bit set.
+    /// Produce a block where each of the 8 words has exactly one bit set.
+    ///
+    /// For each word `i` the bit position is derived from `x`:
+    ///
+    /// ```text
+    ///   y = (x wrapping* SALT[i]) >> 27   // top 5 bits → value in 0..31
+    ///   word[i] = 1 << y                  // exactly one bit set per word
+    /// ```
+    ///
+    /// Because only the top 5 bits survive the shift, each word picks one of
+    /// 32 possible bit positions. The eight SALT constants spread the choices
+    /// so different words usually light up different positions.
+    ///
+    /// Key property: the mask depends *only* on `x` (a u32) and the fixed
+    /// SALT constants — it is independent of the filter size. This is why
+    /// folding preserves bit patterns (see Lemma 2 in tests).
     fn mask(x: u32) -> Self {
         let mut result = [0_u32; 8];
         for i in 0..8 {
-            // wrapping instead of checking for overflow
-            let y = x.wrapping_mul(SALT[i]);
-            let y = y >> 27;
-            result[i] = 1 << y;
+            let y = x.wrapping_mul(SALT[i]); // spread bits via multiply
+            let y = y >> 27; // keep top 5 bits → 0..31
+            result[i] = 1 << y; // set exactly that one bit
         }
         Self(result)
     }
@@ -155,7 +222,10 @@ impl Block {
         self
     }
 
-    /// setting every bit in the block that was also set in the result from 
mask
+    /// OR the mask bits into this block (`block[i] |= mask[i]`).
+    ///
+    /// After insertion the 8 bits chosen by `mask(hash)` are guaranteed set;
+    /// bits previously set by other hashes are preserved.
     fn insert(&mut self, hash: u32) {
         let mask = Self::mask(hash);
         for i in 0..8 {
@@ -163,7 +233,11 @@ impl Block {
         }
     }
 
-    /// returns true when every bit that is set in the result of mask is also 
set in the block.
+    /// Check membership: returns `true` when *every* bit from `mask(hash)` is
+    /// already set in this block (`block[i] & mask[i] != 0` for all 8 words).
+    ///
+    /// A `true` result means "probably present" (other inserts may have set
+    /// the same bits). A `false` is definitive — the value was never inserted.
     fn check(&self, hash: u32) -> bool {
         let mask = Self::mask(hash);
         for i in 0..8 {
@@ -191,7 +265,55 @@ impl std::ops::IndexMut<usize> for Block {
     }
 }
 
-/// A split block Bloom filter.
+impl std::ops::BitOr for Block {
+    type Output = Self;
+
+    #[inline]
+    fn bitor(self, rhs: Self) -> Self {
+        let mut result = [0u32; 8];
+        for (i, item) in result.iter_mut().enumerate() {
+            *item = self.0[i] | rhs.0[i];
+        }
+        Self(result)
+    }
+}
+
+impl std::ops::BitOrAssign for Block {
+    #[inline]
+    fn bitor_assign(&mut self, rhs: Self) {
+        for i in 0..8 {
+            self.0[i] |= rhs.0[i];
+        }
+    }
+}
+
+impl Block {
+    /// Count the total number of set bits across all 8 words.
+    ///
+    /// Computes popcount on each word separately and sums. Keeping the 
popcount
+    /// separate from the OR allows the compiler to batch SIMD popcount 
instructions
+    /// (e.g., `cnt.16b` on ARM NEON) instead of interleaving them with OR 
operations.
+    #[inline]
+    fn count_ones(self) -> u32 {
+        // Written as a fold over the array so the compiler sees 8 independent
+        // popcount operations it can vectorize into cnt.16b + horizontal sum.
+        self.0.iter().map(|w| w.count_ones()).sum()
+    }
+}
+
+/// A split block Bloom filter (SBBF).
+///
+/// An SBBF partitions its bit space into fixed-size 256-bit (32-byte) blocks, 
each fitting in a
+/// single CPU cache line. Each block contains eight 32-bit words, aligned 
with SIMD lanes for
+/// parallel bit manipulation. When checking membership, only one block is 
accessed per query,
+/// eliminating the cache-miss penalty of standard Bloom filters.
+///
+/// ## Sizing and folding
+///
+/// Filters are initially sized for a maximum expected number of distinct 
values (NDV) via
+/// [`Sbbf::new_with_ndv_fpp`]. After all values are inserted, the filter is 
compacted by
+/// calling [`Sbbf::fold_to_target_fpp`], which folds the filter down to the 
smallest size
+/// that still meets the target false positive probability.
 ///
 /// The creation of this structure is based on the 
[`crate::file::properties::BloomFilterProperties`]
 /// struct set via [`crate::file::properties::WriterProperties`] and is thus 
hidden by default.
@@ -395,10 +517,27 @@ impl Sbbf {
         Ok(Some(Self::new(&bitset)))
     }
 
+    /// Map a 64-bit hash to a block index in `[0, num_blocks)`.
+    ///
+    /// Uses the "multiply-and-shift" trick (a fast alternative to modulo):
+    ///
+    /// ```text
+    ///   upper32 = hash >> 32           // take the top 32 bits of the hash
+    ///   index   = (upper32 * N) >> 32  // ∈ [0, N)  where N = num_blocks
+    /// ```
+    ///
+    /// Why this matters for folding (Lemma 1): when N is a power of two and
+    /// you halve it to N/2, the index also halves:
+    ///
+    /// ```text
+    ///   index_N   = (upper32 * N)   >> 32
+    ///   index_N/2 = (upper32 * N/2) >> 32 = index_N / 2  (integer division)
+    /// ```
+    ///
+    /// So the block that held hash `h` in the big filter is at `index / 2` in
+    /// the half-sized filter — exactly where `fold` ORs it.
     #[inline]
     fn hash_to_block_index(&self, hash: u64) -> usize {
-        // unchecked_mul is unstable, but in reality this is safe, we'd just 
use saturating mul
-        // but it will not saturate
         (((hash >> 32).saturating_mul(self.0.len() as u64)) >> 32) as usize
     }
 
@@ -431,6 +570,140 @@ impl Sbbf {
         self.0.capacity() * std::mem::size_of::<Block>()
     }
 
+    /// Returns the number of blocks in this bloom filter.
+    pub fn num_blocks(&self) -> usize {
+        self.0.len()
+    }
+
+    /// Fold the bloom filter down to the smallest size that still meets the 
target FPP
+    /// (False Positive Percentage).
+    ///
+    /// Folds the filter by merging groups of adjacent blocks via bitwise OR, 
where each
+    /// fold level halves the number of blocks. The fold count is chosen as 
the maximum
+    /// number of folds whose estimated FPP stays within `target_fpp`. The 
filter stops
+    /// at a minimum size of 1 block (32 bytes).
+    ///
+    /// ## How it works
+    ///
+    /// SBBFs use multiplicative hashing for block selection:
+    ///
+    /// ```text
+    /// block_index = ((hash >> 32) * num_blocks) >> 32
+    /// ```
+    ///
+    /// A single fold halves the block count: when `num_blocks` is halved, the 
new index
+    /// becomes `floor(original_index / 2)`, so blocks `2i` and `2i+1` map to 
the same
+    /// position. More generally, `k` folds reduce the block count by `2^k`, 
merging
+    /// groups of `2^k` adjacent blocks in a single pass:
+    ///
+    /// ```text
+    /// folded[i] = blocks[i*2^k] | blocks[i*2^k + 1] | ... | blocks[i*2^k + 
2^k - 1]
+    /// ```
+    ///
+    /// This differs from standard Bloom filter folding, which merges the two 
halves
+    /// (`B[i] | B[i + m/2]`) because standard filters use modular hashing 
where
+    /// `h(x) mod (m/2)` maps indices `i` and `i + m/2` to the same position.
+    ///
+    /// ## Correctness
+    ///
+    /// Folding **never introduces false negatives**. Every bit that was set 
in the original
+    /// filter remains set in the folded filter (via bitwise OR). The only 
effect is a controlled
+    /// increase in FPP as set bits from different blocks are merged together.
+    /// This is was originally proven in [Sailhan & Stehr 2012] for standard 
bloom filters and is empirically
+    /// demonstrated for SBBFs in Lemma 1 and Lemma 2 of the tests.
+    ///
+    /// ## References
+    ///
+    /// [Sailhan & Stehr 2012]: https://doi.org/10.1109/GreenCom.2012.16
+    pub fn fold_to_target_fpp(&mut self, target_fpp: f64) {
+        let num_folds = self.num_folds_for_target_fpp(target_fpp);
+        if num_folds > 0 {
+            self.fold_n(num_folds);
+        }
+    }
+
+    /// Determine how many folds can be applied without exceeding `target_fpp`.
+    ///
+    /// Computes the average per-block fill rate in a single pass (no 
allocation),
+    /// then analytically estimates the FPP at each fold level.
+    ///
+    /// When two blocks with independent fill rate `f` are OR'd, the expected 
fill
+    /// of the merged block is `1 - (1-f)^2`. After `k` folds (merging `2^k` 
blocks):
+    ///
+    /// ```text
+    /// f_k = 1 - (1 - f)^(2^k)
+    /// ```
+    ///
+    /// SBBF membership checks perform `k=8` bit checks within one 256-bit 
block,
+    /// so the estimated FPP at fold level k is `f_k^8`.
+    fn num_folds_for_target_fpp(&self, target_fpp: f64) -> u32 {
+        let len = self.0.len();
+        if len < 2 {
+            return 0;
+        }
+
+        // Single pass: compute average per-block fill rate.
+        let total_set_bits: u64 = self.0.iter().map(|b| 
u64::from(b.count_ones())).sum();
+        let avg_fill = total_set_bits as f64 / (len as f64 * 256.0);
+
+        // Empty filter: can fold all the way down.
+        if avg_fill == 0.0 {
+            return len.trailing_zeros();
+        }
+
+        // Find max folds where estimated FPP stays within target.
+        // f_k = 1 - (1 - avg_fill)^(2^k), FPP_k = f_k^8
+        assert!(
+            len.is_power_of_two(),
+            "Number of blocks must be a power of 2 for folding"
+        );
+        let max_folds = len.trailing_zeros(); // log2(len) since len is power 
of 2
+        let one_minus_f = 1.0 - avg_fill;
+        let mut num_folds = 0u32;
+        let mut one_minus_fk = one_minus_f; // (1-f)^1 initially
+
+        for _ in 0..max_folds {
+            // After one more fold: (1-f)^(2^(k+1)) = ((1-f)^(2^k))^2
+            one_minus_fk = one_minus_fk * one_minus_fk;
+            let fk = 1.0 - one_minus_fk;
+            let estimated_fpp = fk.powi(8);
+            if estimated_fpp > target_fpp {
+                break;
+            }
+            num_folds += 1;
+        }
+
+        num_folds
+    }
+
+    /// Fold the filter `num_folds` times in a single pass.
+    ///
+    /// Merges groups of `2^num_folds` adjacent blocks via bitwise OR, 
producing
+    /// `len / 2^num_folds` output blocks. The original allocation is reused.
+    ///
+    /// # Panics
+    ///
+    /// Panics if `num_folds` is 0 or would reduce the filter below 1 block.
+    fn fold_n(&mut self, num_folds: u32) {
+        assert!(num_folds > 0, "num_folds must be at least 1");
+        let len = self.0.len();
+        let group_size = 1usize << num_folds;
+        assert!(
+            group_size <= len,
+            "Cannot fold {num_folds} times: need at least {group_size} blocks, 
have {len}"
+        );
+        let new_len = len / group_size;
+        for i in 0..new_len {
+            let start = i * group_size;
+            let mut merged = self.0[start];
+            for j in 1..group_size {
+                merged |= self.0[start + j];
+            }
+            self.0[i] = merged;
+        }
+        self.0.truncate(new_len);
+    }
+
     /// Reads a Sbff from Thrift encoded bytes
     ///
     /// # Examples
@@ -603,6 +876,86 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_fold_n_halves_block_count() {
+        let mut sbbf = Sbbf::new_with_num_of_bytes(1024); // 32 blocks
+        assert_eq!(sbbf.num_blocks(), 32);
+        sbbf.fold_n(1);
+        assert_eq!(sbbf.num_blocks(), 16);
+        sbbf.fold_n(1);
+        assert_eq!(sbbf.num_blocks(), 8);
+    }
+
+    #[test]
+    fn test_fold_preserves_inserted_values() {
+        // Create a large filter, insert values, fold, verify no false 
negatives
+        let mut sbbf = Sbbf::new_with_num_of_bytes(32 * 1024); // 32KB = 1024 
blocks
+        let values: Vec<String> = (0..1000).map(|i| 
format!("value_{i}")).collect();
+        for v in &values {
+            sbbf.insert(v.as_str());
+        }
+
+        // Fold several times
+        let original_blocks = sbbf.num_blocks();
+        sbbf.fold_to_target_fpp(0.05);
+        assert!(
+            sbbf.num_blocks() < original_blocks,
+            "should have folded at least once"
+        );
+
+        // All inserted values must still be found (no false negatives)
+        for v in &values {
+            assert!(
+                sbbf.check(v.as_str()),
+                "Value '{}' missing after folding (false negative!)",
+                v
+            );
+        }
+    }
+
+    #[test]
+    fn test_fold_to_target_fpp_stops_before_exceeding_target() {
+        let mut sbbf = Sbbf::new_with_num_of_bytes(64 * 1024); // 64KB
+        // Insert enough values to set some bits
+        for i in 0..5000 {
+            sbbf.insert(&i);
+        }
+
+        let target_fpp = 0.01;
+        sbbf.fold_to_target_fpp(target_fpp);
+
+        // After folding, the estimated FPP should be at or below target
+        // (the current state should not exceed target — we stopped before 
that would happen)
+        let total_bits = (sbbf.num_blocks() * 256) as f64;
+        let set_bits: u64 = sbbf
+            .0
+            .iter()
+            .flat_map(|b| b.0.iter())
+            .map(|w| w.count_ones() as u64)
+            .sum();
+        let fill = set_bits as f64 / total_bits;
+        let current_fpp = fill.powi(8);
+        assert!(
+            current_fpp <= target_fpp,
+            "FPP {current_fpp} exceeds target {target_fpp}"
+        );
+    }
+
+    #[test]
+    fn test_fold_empty_filter_folds_to_minimum() {
+        // An empty filter has fill=0, so estimated FPP is always 0 — should 
fold all the way down
+        let mut sbbf = Sbbf::new_with_num_of_bytes(1024); // 32 blocks
+        sbbf.fold_to_target_fpp(0.01);
+        assert_eq!(sbbf.num_blocks(), 1);
+    }
+
+    #[test]
+    #[should_panic(expected = "Cannot fold 1 times: need at least 2 blocks, 
have 1")]
+    fn test_fold_n_panics_at_minimum_size() {
+        let mut sbbf = Sbbf::new_with_num_of_bytes(32); // 1 block (minimum)
+        sbbf.fold_n(1);
+    }
+
     #[test]
     fn test_sbbf_write_round_trip() {
         // Create a bloom filter with a 32-byte bitset (minimum size)
@@ -641,4 +994,247 @@ mod tests {
             );
         }
     }
+
+    /// Prove that folding an SBBF by one level produces the exact same bits
+    /// as building a fresh filter at the smaller size from scratch.
+    ///
+    /// # What is folding?
+    ///
+    /// ```text
+    ///   Original (N = 8 blocks):
+    ///   ┌───┬───┬───┬───┬───┬───┬───┬───┐
+    ///   │ 0 │ 1 │ 2 │ 3 │ 4 │ 5 │ 6 │ 7 │
+    ///   └─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┴─┬─┘
+    ///     │   │   │   │   │   │   │   │
+    ///     └─OR┘   └─OR┘   └─OR┘   └─OR┘    pair-wise OR
+    ///       │       │       │       │
+    ///   ┌───┴──┬────┴──┬────┴──┬────┴──┐
+    ///   │ 0|1  │ 2|3   │ 4|5   │ 6|7   │   Folded (N/2 = 4 blocks)
+    ///   └──────┴───────┴───────┴───────┘
+    /// ```
+    ///
+    /// # Why folded == fresh (the two lemmas)
+    ///
+    /// An SBBF insertion does two things with a 64-bit hash `h`:
+    ///
+    ///   1. **Pick a block** — uses the upper 32 bits via 
`hash_to_block_index`
+    ///   2. **Set 8 bits in that block** — uses the lower 32 bits via 
`Block::mask`
+    ///
+    /// **Lemma 1 (block index halves):** `hash_to_block_index` uses
+    /// `(upper32 * N) >> 32`. When N halves, the index halves too:
+    /// `index_in(N/2) == index_in(N) / 2`. So the hash lands in the same
+    /// destination block whether you fold or build fresh.
+    ///
+    /// **Lemma 2 (mask is size-independent):** `Block::mask(h as u32)` depends
+    /// only on the lower 32 bits and the fixed SALT constants — the filter
+    /// size N is not involved. So the same 8 bits get set regardless.
+    ///
+    /// Combined: every hash sets the *same bits* in the *same destination
+    /// block* whether you fold or build fresh → filters are bit-identical.
+    #[test]
+    fn test_sbbf_folded_equals_fresh() {
+        let values = (0..5000).map(|i| 
format!("elem_{i}")).collect::<Vec<_>>();
+        let hashes = values
+            .iter()
+            .map(|v| hash_as_bytes(v.as_str()))
+            .collect::<Vec<_>>();
+
+        for num_blocks in [64, 256, 1024] {
+            let half = num_blocks / 2;
+
+            // Build a filter with N blocks and insert all values.
+            let mut original = Sbbf::new_with_num_of_bytes(num_blocks * 32);
+            assert_eq!(original.num_blocks(), num_blocks);
+            for &h in &hashes {
+                original.insert_hash(h);
+            }
+
+            // --- Per-hash verification of the two lemmas ---
+            for &h in hashes.iter() {
+                // mask(h as u32) gives the 8-bit pattern that this hash sets
+                // inside whichever block it lands in. It uses only the lower
+                // 32 bits of h, so it's the same regardless of filter size.
+                let mask = Block::mask(h as u32);
+
+                // Lemma 1 check: the block index in the original N-block
+                // filter, divided by 2, should equal the block index in a
+                // fresh N/2-block filter.
+                let orig_idx = original.hash_to_block_index(h);
+                assert!(orig_idx < num_blocks);
+
+                let fresh_idx = {
+                    let tmp = Sbbf(vec![Block::ZERO; half]);
+                    tmp.hash_to_block_index(h)
+                };
+                let folded_idx = orig_idx / 2;
+                assert_eq!(
+                    fresh_idx, folded_idx,
+                    "Lemma 1 failed: fresh index {fresh_idx} != folded index 
{folded_idx}"
+                );
+
+                // Lemma 2 check: every bit that mask wants to set is actually
+                // present in the original block.
+                //
+                // mask.0[w] has exactly ONE bit set (see Block::mask: `1 << 
y`).
+                // The block at orig_idx has many bits set from many inserts, 
so
+                // we can't test equality — we test that the specific mask bit 
is
+                // *present*:
+                //
+                //   block_word & mask_word != 0
+                //     ⟺  "the one bit in the mask is set in the block"
+                //
+                // (Since mask_word has exactly 1 bit, `& mask != 0` is the 
same
+                //  as `& mask == mask` — but `!= 0` reads more naturally.)
+                for w in 0..8 {
+                    assert_ne!(
+                        original.0[orig_idx].0[w] & mask.0[w],
+                        0,
+                        "Lemma 2 failed: mask bit not set in word {w} of block 
{orig_idx}"
+                    );
+                }
+            }
+
+            // --- Final bit-identical comparison ---
+            // Fold the original N-block filter down to N/2 blocks.
+            let mut folded = original.clone();
+            folded.fold_n(1);
+            assert_eq!(folded.num_blocks(), half);
+
+            // Build a fresh N/2-block filter with the same values.
+            let mut fresh = Sbbf::new_with_num_of_bytes(half * 32);
+            for &h in &hashes {
+                fresh.insert_hash(h);
+            }
+
+            // By lemmas 1 + 2, every block should be bit-identical.
+            for j in 0..half {
+                assert_eq!(
+                    folded.0[j].0, fresh.0[j].0,
+                    "Block {j} differs after fold (N={num_blocks} → {half})"
+                );
+            }
+        }
+    }
+
+    /// Inductive multi-step folding: folding k times from N blocks produces
+    /// a filter bit-identical to a fresh N/2^k-block filter.
+    ///
+    /// `test_sbbf_folded_equals_fresh` proves the base case (one fold).
+    /// This test applies folds *repeatedly*, checking after each step:
+    ///
+    /// ```text
+    ///   512 ─fold→ 256 ─fold→ 128 ─…→ 1  (9 folds total)
+    /// ```
+    ///
+    /// At each intermediate size we build a fresh filter and assert
+    /// bit-equality, confirming the lemma composes across folds.
+    #[test]
+    fn test_multi_step_fold() {
+        let values = (0..3000).map(|i| format!("x_{i}")).collect::<Vec<_>>();
+
+        // Start with a 512-block filter.
+        let mut filter = Sbbf::new_with_num_of_bytes(512 * 32);
+        for v in &values {
+            filter.insert(v.as_str());
+        }
+
+        // Fold one level at a time, comparing against a fresh filter each 
step.
+        for expected_blocks in [256, 128, 64, 32, 16, 8, 4, 2, 1] {
+            filter.fold_n(1);
+            assert_eq!(filter.num_blocks(), expected_blocks);
+
+            let mut fresh = Sbbf::new_with_num_of_bytes(expected_blocks * 32);
+            for v in &values {
+                fresh.insert(v.as_str());
+            }
+            for (fb, rb) in filter.0.iter().zip(fresh.0.iter()) {
+                assert_eq!(fb.0, rb.0);
+            }
+        }
+    }
+
+    /// test that the fpp estimator's overestimation doesn't cause 
fold_to_target_fpp
+    /// to produce significantly oversized filters
+    ///
+    /// compare the final size after folding against the theoretical optimal 
size
+    #[test]
+    fn test_fold_size_vs_optimal_fixed_size() {
+        for (ndv, target_fpp) in [
+            (1000, 0.05),
+            (1000, 0.01),
+            (5000, 0.05),
+            (5000, 0.01),
+            (10000, 0.05),
+        ] {
+            let values = (0..ndv).map(|i| 
format!("d_{i}")).collect::<Vec<_>>();
+
+            let mut folded = Sbbf::new_with_num_of_bytes(128 * 1024); // 128KB
+            for v in &values {
+                folded.insert(v.as_str());
+            }
+            folded.fold_to_target_fpp(target_fpp);
+
+            let folded_bytes = folded.num_blocks() * 32;
+
+            let optimal = Sbbf::new_with_ndv_fpp(ndv as u64, 
target_fpp).unwrap();
+            let optimal_bytes = optimal.num_blocks() * 32;
+
+            let ratio = folded_bytes as f64 / optimal_bytes as f64;
+
+            assert_eq!(ratio, 1.0);
+        }
+    }
+
+    /// verify that a folded sbbf has the same empirical fpp as a fresh filter 
of the same size
+    /// this bridges the bit-identity proof above with the FPP guarantee from 
the folding paper
+    ///     since the bits are identical, the false-positive rate must be too
+    ///
+    /// we measure fpp empirically by probing with values that were never 
inserted
+    /// and counting how many are incorrectly marked as present
+    #[test]
+    fn test_folded_fpp_matches_fresh_fpp() {
+        let ndv = 2000;
+        let num_probes = 50_000;
+        let inserted = (0..ndv)
+            .map(|i| format!("ins_{i}"))
+            .collect::<Vec<String>>();
+
+        // probe values that were NOT inserted (different prefix guarantees no 
overlap)
+        let probes = (0..num_probes)
+            .map(|i| format!("probe_{i}"))
+            .collect::<Vec<String>>();
+
+        // build a large filter and fold it down several times
+        let mut folded = Sbbf::new_with_num_of_bytes(512 * 32); // 512 blocks
+        for v in &inserted {
+            folded.insert(v.as_str());
+        }
+
+        // check FPP at each fold level
+        for expected_blocks in [256, 128, 64, 32, 16, 8, 4, 2, 1] {
+            folded.fold_n(1);
+            assert_eq!(folded.num_blocks(), expected_blocks);
+
+            // build a fresh filter of the same size with the same values
+            let mut fresh = Sbbf::new_with_num_of_bytes(expected_blocks * 32);
+            for v in &inserted {
+                fresh.insert(v.as_str());
+            }
+
+            // measure empirical FPP on both
+            let mut folded_fp = 0u64;
+            let mut fresh_fp = 0u64;
+            for p in &probes {
+                if folded.check(p.as_str()) {
+                    folded_fp += 1;
+                }
+                if fresh.check(p.as_str()) {
+                    fresh_fp += 1;
+                }
+            }
+
+            // bit-identity means these must be exactly equal
+            assert_eq!(folded_fp, fresh_fp);
+        }
+    }
 }
diff --git a/parquet/src/column/writer/encoder.rs 
b/parquet/src/column/writer/encoder.rs
index 11d4f3142a..ec1afca583 100644
--- a/parquet/src/column/writer/encoder.rs
+++ b/parquet/src/column/writer/encoder.rs
@@ -138,6 +138,7 @@ pub struct ColumnValueEncoderImpl<T: DataType> {
     min_value: Option<T::T>,
     max_value: Option<T::T>,
     bloom_filter: Option<Sbbf>,
+    bloom_filter_target_fpp: f64,
     variable_length_bytes: Option<i64>,
     geo_stats_accumulator: Option<Box<dyn GeoStatsAccumulator>>,
 }
@@ -187,7 +188,9 @@ impl<T: DataType> ColumnValueEncoder for 
ColumnValueEncoderImpl<T> {
     type Values = [T::T];
 
     fn flush_bloom_filter(&mut self) -> Option<Sbbf> {
-        self.bloom_filter.take()
+        let mut sbbf = self.bloom_filter.take()?;
+        sbbf.fold_to_target_fpp(self.bloom_filter_target_fpp);
+        Some(sbbf)
     }
 
     fn try_new(descr: &ColumnDescPtr, props: &WriterProperties) -> 
Result<Self> {
@@ -205,10 +208,7 @@ impl<T: DataType> ColumnValueEncoder for 
ColumnValueEncoderImpl<T> {
 
         let statistics_enabled = props.statistics_enabled(descr.path());
 
-        let bloom_filter = props
-            .bloom_filter_properties(descr.path())
-            .map(|props| Sbbf::new_with_ndv_fpp(props.ndv, props.fpp))
-            .transpose()?;
+        let (bloom_filter, bloom_filter_target_fpp) = 
create_bloom_filter(props, descr)?;
 
         let geo_stats_accumulator = try_new_geo_stats_accumulator(descr);
 
@@ -219,6 +219,7 @@ impl<T: DataType> ColumnValueEncoder for 
ColumnValueEncoderImpl<T> {
             num_values: 0,
             statistics_enabled,
             bloom_filter,
+            bloom_filter_target_fpp,
             min_value: None,
             max_value: None,
             variable_length_bytes: None,
@@ -384,6 +385,21 @@ fn replace_zero<T: ParquetValueType>(val: &T, descr: 
&ColumnDescriptor, replace:
     }
 }
 
+/// Creates a bloom filter sized for the column's configured NDV, returning 
the filter
+/// and the target FPP for folding.
+pub(crate) fn create_bloom_filter(
+    props: &WriterProperties,
+    descr: &ColumnDescPtr,
+) -> Result<(Option<Sbbf>, f64)> {
+    match props.bloom_filter_properties(descr.path()) {
+        Some(bf_props) => Ok((
+            Some(Sbbf::new_with_ndv_fpp(bf_props.ndv, bf_props.fpp)?),
+            bf_props.fpp,
+        )),
+        None => Ok((None, 0.0)),
+    }
+}
+
 fn update_geo_stats_accumulator<'a, T, I>(bounder: &mut dyn 
GeoStatsAccumulator, iter: I)
 where
     T: ParquetValueType + 'a,
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index 640a7a075d..65630cfed2 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -53,8 +53,14 @@ pub const DEFAULT_CREATED_BY: &str = concat!("parquet-rs 
version ", env!("CARGO_
 pub const DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH: Option<usize> = Some(64);
 /// Default value for [`BloomFilterProperties::fpp`]
 pub const DEFAULT_BLOOM_FILTER_FPP: f64 = 0.05;
-/// Default value for [`BloomFilterProperties::ndv`]
-pub const DEFAULT_BLOOM_FILTER_NDV: u64 = 1_000_000_u64;
+/// Default value for [`BloomFilterProperties::ndv`].
+///
+/// Note: this is only the fallback default used when constructing 
[`BloomFilterProperties`]
+/// directly. When using [`WriterPropertiesBuilder`], columns with bloom 
filters enabled
+/// but without an explicit NDV will have their NDV resolved at build time to
+/// [`WriterProperties::max_row_group_row_count`], which may differ from this 
constant
+/// if the user configured a custom row group size.
+pub const DEFAULT_BLOOM_FILTER_NDV: u64 = DEFAULT_MAX_ROW_GROUP_ROW_COUNT as 
u64;
 /// Default values for [`WriterProperties::statistics_truncate_length`]
 pub const DEFAULT_STATISTICS_TRUNCATE_LENGTH: Option<usize> = Some(64);
 /// Default value for [`WriterProperties::offset_index_disabled`]
@@ -587,6 +593,18 @@ impl Default for WriterPropertiesBuilder {
 impl WriterPropertiesBuilder {
     /// Finalizes the configuration and returns immutable writer properties 
struct.
     pub fn build(self) -> WriterProperties {
+        // Resolve bloom filter NDV for columns where it wasn't explicitly set:
+        // default to max_row_group_row_count so the filter is never 
undersized.
+        let default_ndv = self
+            .max_row_group_row_count
+            .unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT) as u64;
+        let mut default_column_properties = self.default_column_properties;
+        default_column_properties.resolve_bloom_filter_ndv(default_ndv);
+        let mut column_properties = self.column_properties;
+        for props in column_properties.values_mut() {
+            props.resolve_bloom_filter_ndv(default_ndv);
+        }
+
         WriterProperties {
             data_page_row_count_limit: self.data_page_row_count_limit,
             write_batch_size: self.write_batch_size,
@@ -597,8 +615,8 @@ impl WriterPropertiesBuilder {
             created_by: self.created_by,
             offset_index_disabled: self.offset_index_disabled,
             key_value_metadata: self.key_value_metadata,
-            default_column_properties: self.default_column_properties,
-            column_properties: self.column_properties,
+            default_column_properties,
+            column_properties,
             sorting_columns: self.sorting_columns,
             column_index_truncate_length: self.column_index_truncate_length,
             statistics_truncate_length: self.statistics_truncate_length,
@@ -996,8 +1014,13 @@ impl WriterPropertiesBuilder {
         self
     }
 
-    /// Sets default number of distinct values (ndv) for bloom filter for all
-    /// columns (defaults to `1_000_000` via [`DEFAULT_BLOOM_FILTER_NDV`]).
+    /// Sets default maximum expected number of distinct values (ndv) for 
bloom filter
+    /// for all columns (defaults to [`DEFAULT_BLOOM_FILTER_NDV`]).
+    ///
+    /// The bloom filter is initially sized for this many distinct values at 
the
+    /// configured FPP, then folded down after all values are inserted to 
achieve
+    /// optimal size. A good heuristic is to set this to the expected number 
of rows
+    /// in the row group.
     ///
     /// Implicitly enables bloom writing, as if [`set_bloom_filter_enabled`] 
had
     /// been called.
@@ -1191,6 +1214,13 @@ impl Default for EnabledStatistics {
 }
 
 /// Controls the bloom filter to be computed by the writer.
+///
+/// The bloom filter is initially sized for `ndv` distinct values at the given 
`fpp`, then
+/// automatically folded down after all values are inserted to achieve optimal 
size while
+/// maintaining the target `fpp`. See [`Sbbf::fold_to_target_fpp`] for details 
on the
+/// folding algorithm.
+///
+/// [`Sbbf::fold_to_target_fpp`]: crate::bloom_filter::Sbbf::fold_to_target_fpp
 #[derive(Debug, Clone, PartialEq)]
 pub struct BloomFilterProperties {
     /// False positive probability. This should be always between 0 and 1 
exclusive. Defaults to [`DEFAULT_BLOOM_FILTER_FPP`].
@@ -1201,20 +1231,30 @@ pub struct BloomFilterProperties {
     /// smaller the fpp, the more memory and disk space is required, thus 
setting it to a reasonable value
     /// e.g. 0.1, 0.05, or 0.001 is recommended.
     ///
-    /// Setting to a very small number diminishes the value of the filter 
itself, as the bitset size is
-    /// even larger than just storing the whole value. You are also expected 
to set `ndv` if it can
-    /// be known in advance to greatly reduce space usage.
+    /// This value also serves as the target FPP for bloom filter folding: 
after all values
+    /// are inserted, the filter is folded down to the smallest size that 
still meets this FPP.
     pub fpp: f64,
-    /// Number of distinct values, should be non-negative to be meaningful. 
Defaults to [`DEFAULT_BLOOM_FILTER_NDV`].
+    /// Maximum expected number of distinct values. Defaults to 
[`DEFAULT_BLOOM_FILTER_NDV`].
     ///
     /// You should set this value by calling 
[`WriterPropertiesBuilder::set_bloom_filter_ndv`].
     ///
-    /// Usage of bloom filter is most beneficial for columns with large 
cardinality, so a good heuristic
-    /// is to set ndv to the number of rows. However, it can reduce disk size 
if you know in advance a smaller
-    /// number of distinct values. For very small ndv value it is probably not 
worth it to use bloom filter
-    /// anyway.
-    ///
-    /// Increasing this value (without increasing fpp) will result in an 
increase in disk or memory size.
+    /// When not explicitly set via the builder, this defaults to
+    /// [`max_row_group_row_count`](WriterProperties::max_row_group_row_count) 
(resolved at
+    /// build time). The bloom filter is initially sized for this many 
distinct values at the
+    /// given `fpp`, then folded down after insertion to achieve optimal size. 
A good heuristic
+    /// is to set this to the expected number of rows in the row group. If 
fewer distinct values
+    /// are actually written, the filter will be automatically compacted via 
folding.
+    ///
+    /// Thus the only negative side of overestimating this value is that the 
bloom filter
+    /// will use more memory during writing than necessary, but it will not 
affect the final
+    /// bloom filter size on disk.
+    ///
+    /// If you wish to reduce memory usage during writing and are able to make 
a reasonable estimate
+    /// of the number of distinct values in a row group, it is recommended to 
set this value explicitly
+    /// rather than relying on the default dynamic sizing based on 
`max_row_group_row_count`.
+    /// If you do set this value explicitly it is probably best to set it for 
each column
+    /// individually via 
[`WriterPropertiesBuilder::set_column_bloom_filter_ndv`] rather than globally,
+    /// since different columns may have different numbers of distinct values.
     pub ndv: u64,
 }
 
@@ -1242,6 +1282,8 @@ struct ColumnProperties {
     write_page_header_statistics: Option<bool>,
     /// bloom filter related properties
     bloom_filter_properties: Option<BloomFilterProperties>,
+    /// Whether the bloom filter NDV was explicitly set by the user
+    bloom_filter_ndv_is_set: bool,
 }
 
 impl ColumnProperties {
@@ -1319,12 +1361,13 @@ impl ColumnProperties {
             .fpp = value;
     }
 
-    /// Sets the number of distinct (unique) values for bloom filter for this 
column, and implicitly
-    /// enables bloom filter if not previously enabled.
+    /// Sets the maximum expected number of distinct (unique) values for bloom 
filter for this
+    /// column, and implicitly enables bloom filter if not previously enabled.
     fn set_bloom_filter_ndv(&mut self, value: u64) {
         self.bloom_filter_properties
             .get_or_insert_with(Default::default)
             .ndv = value;
+        self.bloom_filter_ndv_is_set = true;
     }
 
     /// Returns optional encoding for this column.
@@ -1372,6 +1415,16 @@ impl ColumnProperties {
     fn bloom_filter_properties(&self) -> Option<&BloomFilterProperties> {
         self.bloom_filter_properties.as_ref()
     }
+
+    /// If bloom filter is enabled and NDV was not explicitly set, resolve it 
to the
+    /// given `default_ndv` (typically derived from `max_row_group_row_count`).
+    fn resolve_bloom_filter_ndv(&mut self, default_ndv: u64) {
+        if !self.bloom_filter_ndv_is_set {
+            if let Some(ref mut bf) = self.bloom_filter_properties {
+                bf.ndv = default_ndv;
+            }
+        }
+    }
 }
 
 /// Reference counted reader properties.
@@ -1703,8 +1756,8 @@ mod tests {
         assert_eq!(
             props.bloom_filter_properties(&ColumnPath::from("col")),
             Some(&BloomFilterProperties {
-                fpp: 0.05,
-                ndv: 1_000_000_u64
+                fpp: DEFAULT_BLOOM_FILTER_FPP,
+                ndv: DEFAULT_BLOOM_FILTER_NDV,
             })
         );
     }
@@ -1746,8 +1799,8 @@ mod tests {
                 .build()
                 .bloom_filter_properties(&ColumnPath::from("col")),
             Some(&BloomFilterProperties {
-                fpp: 0.05,
-                ndv: 100
+                fpp: DEFAULT_BLOOM_FILTER_FPP,
+                ndv: 100,
             })
         );
         assert_eq!(
@@ -1757,7 +1810,7 @@ mod tests {
                 .bloom_filter_properties(&ColumnPath::from("col")),
             Some(&BloomFilterProperties {
                 fpp: 0.1,
-                ndv: 1_000_000_u64
+                ndv: DEFAULT_BLOOM_FILTER_NDV,
             })
         );
     }

Reply via email to