This is an automated email from the ASF dual-hosted git repository.

pitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 108240fd60 GH-50007: [C++][Parquet] Add bloom filter folding to 
automatically size SBBF filters (#50008)
108240fd60 is described below

commit 108240fd609e1fcf2422b318fcbf79a85e5ef646
Author: Zehua Zou <[email protected]>
AuthorDate: Thu Jun 11 16:42:14 2026 +0800

    GH-50007: [C++][Parquet] Add bloom filter folding to automatically size 
SBBF filters (#50008)
    
    ### Rationale for this change
    
    This PR follows https://github.com/apache/arrow-rs/pull/9628. It supports 
optimizing the disk usage of the Bloom filter. So specifying an ndv value 
larger than the actual value will not affect disk usage.
    
    > Bloom filters now support folding mode: allocate a conservatively large 
filter (sized for worst-case NDV), insert all values during writing, then fold 
down at flush time to meet a target FPP. This eliminates the need to guess NDV 
upfront and produces optimally-sized filters automatically.
    
    ### What changes are included in this PR?
    
    `BloomFilterBuilder` will try to fold the bloom filter before writing it to 
the output stream.
    
    ### Are these changes tested?
    
    Yes.
    
    ### Are there any user-facing changes?
    
    Yes.
    
    The type of `ndv` in `BloomFilterOptions` is changed from `int32_t` to 
`std::optional<int64_t>`. And the argument type of `OptimalNumOfBytes` and 
`OptimalNumOfBits` in `BlockSplitBloomFilter` is changed from `uint32_t ndv` to 
`uint64_t ndv`.
    
    Add a new field `fold` in `BloomFilterOptions` and default value is `true`.
    
    * GitHub Issue: #50007
    
    Lead-authored-by: Zehua Zou <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/parquet/bloom_filter.cc                    |  93 +++++++++++++++-
 cpp/src/parquet/bloom_filter.h                     |  13 ++-
 cpp/src/parquet/bloom_filter_reader_writer_test.cc | 115 +++++++++++++++++++-
 cpp/src/parquet/bloom_filter_writer.cc             |  35 ++++--
 cpp/src/parquet/properties.h                       |  47 ++++++--
 cpp/src/parquet/properties_test.cc                 | 119 +++++++++++++++++++++
 6 files changed, 395 insertions(+), 27 deletions(-)

diff --git a/cpp/src/parquet/bloom_filter.cc b/cpp/src/parquet/bloom_filter.cc
index 577d26fe00..5959817ea2 100644
--- a/cpp/src/parquet/bloom_filter.cc
+++ b/cpp/src/parquet/bloom_filter.cc
@@ -15,13 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
+#include <bit>
+#include <cmath>
 #include <cstdint>
 #include <cstring>
 #include <limits>
 #include <memory>
 
 #include "arrow/io/memory.h"
-#include "arrow/result.h"
+#include "arrow/util/bitmap_ops.h"
 #include "arrow/util/logging_internal.h"
 #include "arrow/util/macros.h"
 
@@ -345,9 +348,90 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream* 
sink) const {
   PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
 }
 
+void BlockSplitBloomFilter::FoldToTargetFpp(double target_fpp) {
+  const auto num_bits = static_cast<int64_t>(num_bytes_) * 8;
+  const auto total_set_bits =
+      ::arrow::internal::CountSetBits(data_->data(), /*bit_offset=*/0, 
num_bits);
+  if (total_set_bits == 0) {
+    num_bytes_ = kMinimumBloomFilterBytes;
+    return;
+  }
+
+  const double avg_fill = static_cast<double>(total_set_bits) / num_bits;
+  const uint32_t num_folds = NumFoldsForTargetFpp(target_fpp, avg_fill);
+  if (num_folds > 0) {
+    Fold(num_folds);
+  }
+}
+
+uint32_t BlockSplitBloomFilter::NumFoldsForTargetFpp(double target_fpp,
+                                                     double avg_fill) const {
+  const uint32_t num_blocks = NumBlocks();
+  if (num_blocks < 2) {
+    return 0;
+  }
+  // Number of blocks is a power of two
+  DCHECK_EQ(num_blocks & (num_blocks - 1), 0);
+
+  // Estimate the fill rate after folding from the current average fill rate.
+  // Folding ORs block groups together, so each fold changes the estimated 
fill rate
+  // from f to 1 - (1 - f)^2. A membership check tests kBitsSetPerBlock bits, 
making
+  // the estimated FPP equal to std::pow(folded_fill_rate, kBitsSetPerBlock).
+  //
+  // See also: Sailhan and Stehr, "Folding and Unfolding Bloom Filters", 2012:
+  // https://hal.science/hal-01126174v1
+  const auto max_folds = static_cast<uint32_t>(std::countr_zero(num_blocks));
+
+  uint32_t num_folds = 0;
+  double unset_probability_after_folds = 1.0 - avg_fill;
+  for (uint32_t i = 0; i < max_folds; ++i) {
+    unset_probability_after_folds *= unset_probability_after_folds;
+    const double folded_fill_rate = 1.0 - unset_probability_after_folds;
+    const double estimated_fpp = std::pow(folded_fill_rate, kBitsSetPerBlock);
+    if (estimated_fpp > target_fpp) {
+      break;
+    }
+    ++num_folds;
+  }
+  return num_folds;
+}
+
+void BlockSplitBloomFilter::Fold(uint32_t num_folds) {
+  DCHECK_GT(num_folds, 0);
+
+  const uint32_t num_blocks = NumBlocks();
+  // A fold group is a consecutive run of blocks ORed into one output block.
+  // Keeping the group size as (1 << num_folds) preserves a power-of-two bitset
+  // size. Folding by this power-of-two group size keeps the old-to-new bucket
+  // remapping aligned with bucket lookup and avoids false negatives.
+  const uint32_t group_size = UINT32_C(1) << num_folds;
+  DCHECK_LE(group_size, num_blocks);
+
+  const uint32_t new_num_blocks = num_blocks / group_size;
+  auto* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
+
+  for (uint32_t dst_block = 0; dst_block < new_num_blocks; ++dst_block) {
+    uint32_t* dst = bitset32 + dst_block * kBitsSetPerBlock;
+
+    const uint32_t src_block = dst_block * group_size;
+    const uint32_t* src = bitset32 + src_block * kBitsSetPerBlock;
+    if (dst != src) {
+      std::copy_n(src, kBitsSetPerBlock, dst);
+    }
+
+    for (uint32_t fold_block = 1; fold_block < group_size; ++fold_block) {
+      src = bitset32 + (src_block + fold_block) * kBitsSetPerBlock;
+      for (int word = 0; word < kBitsSetPerBlock; ++word) {
+        dst[word] |= src[word];
+      }
+    }
+  }
+
+  num_bytes_ = new_num_blocks * kBytesPerFilterBlock;
+}
+
 bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
-  const uint32_t bucket_index =
-      static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / 
kBytesPerFilterBlock)) >> 32);
+  const uint32_t bucket_index = static_cast<uint32_t>(((hash >> 32) * 
NumBlocks()) >> 32);
   const uint32_t key = static_cast<uint32_t>(hash);
   const uint32_t* bitset32 = reinterpret_cast<const uint32_t*>(data_->data());
 
@@ -363,8 +447,7 @@ bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
 }
 
 void BlockSplitBloomFilter::InsertHashImpl(uint64_t hash) {
-  const uint32_t bucket_index =
-      static_cast<uint32_t>(((hash >> 32) * (num_bytes_ / 
kBytesPerFilterBlock)) >> 32);
+  const uint32_t bucket_index = static_cast<uint32_t>(((hash >> 32) * 
NumBlocks()) >> 32);
   const uint32_t key = static_cast<uint32_t>(hash);
   uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
 
diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h
index cabcd5b4a5..0b4c8df708 100644
--- a/cpp/src/parquet/bloom_filter.h
+++ b/cpp/src/parquet/bloom_filter.h
@@ -230,7 +230,7 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public 
BloomFilter {
   /// @param fpp The false positive probability.
   /// @return it always return a value between kMinimumBloomFilterBytes and
   /// kMaximumBloomFilterBytes, and the return value is always a power of 2
-  static uint32_t OptimalNumOfBytes(uint32_t ndv, double fpp) {
+  static uint32_t OptimalNumOfBytes(uint64_t ndv, double fpp) {
     uint32_t optimal_num_of_bits = OptimalNumOfBits(ndv, fpp);
     ARROW_DCHECK(::arrow::bit_util::IsMultipleOf8(optimal_num_of_bits));
     return optimal_num_of_bits >> 3;
@@ -243,7 +243,7 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public 
BloomFilter {
   /// @param fpp The false positive probability.
   /// @return it always return a value between kMinimumBloomFilterBytes * 8 and
   /// kMaximumBloomFilterBytes * 8, and the return value is always a power of 
16
-  static uint32_t OptimalNumOfBits(uint32_t ndv, double fpp) {
+  static uint32_t OptimalNumOfBits(uint64_t ndv, double fpp) {
     ARROW_DCHECK(fpp > 0.0 && fpp < 1.0);
     const double m = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8));
     uint32_t num_bits;
@@ -276,6 +276,9 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public 
BloomFilter {
   bool FindHash(uint64_t hash) const override;
   void InsertHash(uint64_t hash) override;
   void InsertHashes(const uint64_t* hashes, int num_values) override;
+  /// Fold the bloom filter down to the smallest size that still meets the 
target FPP
+  /// (False Positive Probability).
+  void FoldToTargetFpp(double target_fpp);
   void WriteTo(ArrowOutputStream* sink) const override;
   uint32_t GetBitsetSize() const override { return num_bytes_; }
 
@@ -350,6 +353,12 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public 
BloomFilter {
 
  private:
   inline void InsertHashImpl(uint64_t hash);
+  uint32_t NumBlocks() const {
+    ARROW_DCHECK_EQ(num_bytes_ % kBytesPerFilterBlock, 0);
+    return num_bytes_ / kBytesPerFilterBlock;
+  }
+  uint32_t NumFoldsForTargetFpp(double target_fpp, double avg_fill) const;
+  void Fold(uint32_t num_folds);
 
   // Bytes in a tiny Bloom filter block.
   static constexpr int kBytesPerFilterBlock = 32;
diff --git a/cpp/src/parquet/bloom_filter_reader_writer_test.cc 
b/cpp/src/parquet/bloom_filter_reader_writer_test.cc
index d646a2b8fc..2750fde029 100644
--- a/cpp/src/parquet/bloom_filter_reader_writer_test.cc
+++ b/cpp/src/parquet/bloom_filter_reader_writer_test.cc
@@ -91,9 +91,9 @@ TEST(BloomFilterBuilder, BasicRoundTrip) {
       "schema", Repetition::REPEATED, {schema::ByteArray("c1"), 
schema::ByteArray("c2")});
   schema.Init(root);
 
-  BloomFilterOptions bloom_filter_options{100, 0.05};
+  BloomFilterOptions bloom_filter_options{.ndv = 100, .fpp = 0.05};
   const auto bitset_size = BlockSplitBloomFilter::OptimalNumOfBytes(
-      bloom_filter_options.ndv, bloom_filter_options.fpp);
+      bloom_filter_options.ndv.value(), bloom_filter_options.fpp);
   WriterProperties::Builder properties_builder;
   properties_builder.enable_bloom_filter("c1", bloom_filter_options);
   auto writer_properties = properties_builder.build();
@@ -150,6 +150,115 @@ TEST(BloomFilterBuilder, BasicRoundTrip) {
   }
 }
 
+namespace {
+
+struct BloomFilterBuilderFoldingTestCase {
+  int64_t ndv;
+  bool fold;
+  int32_t inserted_count;
+  int64_t expected_bitset_ndv;
+};
+
+class BloomFilterBuilderFoldingTest
+    : public ::testing::TestWithParam<BloomFilterBuilderFoldingTestCase> {};
+
+}  // namespace
+
+TEST_P(BloomFilterBuilderFoldingTest, RespectsOption) {
+  const auto& test_case = GetParam();
+
+  SchemaDescriptor schema;
+  schema::NodePtr root =
+      schema::GroupNode::Make("schema", Repetition::REPEATED, 
{schema::ByteArray("c1")});
+  schema.Init(root);
+
+  constexpr double kFpp = 0.05;
+  BloomFilterOptions bloom_filter_options{
+      .ndv = test_case.ndv, .fpp = kFpp, .fold = test_case.fold};
+  const auto initial_bitset_size = BlockSplitBloomFilter::OptimalNumOfBytes(
+      bloom_filter_options.ndv.value(), bloom_filter_options.fpp);
+  WriterProperties::Builder properties_builder;
+  properties_builder.enable_bloom_filter("c1", bloom_filter_options);
+  auto writer_properties = properties_builder.build();
+  auto bloom_filter_builder = BloomFilterBuilder::Make(&schema, 
writer_properties.get());
+
+  bloom_filter_builder->AppendRowGroup();
+  auto bloom_filter = 
bloom_filter_builder->CreateBloomFilter(/*column_ordinal=*/0);
+  ASSERT_NE(bloom_filter, nullptr);
+  ASSERT_EQ(initial_bitset_size, bloom_filter->GetBitsetSize());
+
+  std::vector<uint64_t> hashes;
+  hashes.reserve(test_case.inserted_count);
+  for (int32_t i = 0; i < test_case.inserted_count; ++i) {
+    const auto hash = bloom_filter->Hash(i);
+    hashes.push_back(hash);
+    bloom_filter->InsertHash(hash);
+  }
+
+  auto sink = CreateOutputStream();
+  auto locations = bloom_filter_builder->WriteTo(sink.get());
+  ASSERT_EQ(locations.size(), 1);
+  ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+  const auto& location = locations.front().second;
+  ReaderProperties reader_properties;
+  ::arrow::io::BufferReader reader(
+      ::arrow::SliceBuffer(buffer, location.offset, location.length));
+  auto filter = parquet::BlockSplitBloomFilter::Deserialize(reader_properties, 
&reader);
+
+  const auto actual_bitset_size = filter.GetBitsetSize();
+  
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBytes(test_case.expected_bitset_ndv,
 kFpp),
+            actual_bitset_size);
+
+  if (test_case.fold) {
+    EXPECT_LE(actual_bitset_size, initial_bitset_size);
+  } else {
+    EXPECT_EQ(actual_bitset_size, initial_bitset_size);
+  }
+
+  for (uint64_t hash : hashes) {
+    EXPECT_TRUE(filter.FindHash(hash));
+  }
+
+  int32_t false_positives = 0;
+  constexpr int32_t kNonInsertedCount = 10'000;
+  for (int32_t i = test_case.inserted_count;
+       i < test_case.inserted_count + kNonInsertedCount; ++i) {
+    false_positives += filter.FindHash(filter.Hash(i));
+  }
+  const auto sample_fpp = static_cast<double>(false_positives) / 
kNonInsertedCount;
+  EXPECT_LT(sample_fpp, kFpp);
+
+  if (test_case.fold && test_case.inserted_count > 0) {
+    // If the actual fpp, as computed on this sample, is significantly below 
kFpp / 2,
+    // then we could have folded the bloom filter at least once more.
+    EXPECT_GT(sample_fpp, kFpp / 2.1);
+  }
+}
+
+INSTANTIATE_TEST_SUITE_P(
+    BloomFilterBuilder, BloomFilterBuilderFoldingTest,
+    ::testing::Values(BloomFilterBuilderFoldingTestCase{.ndv = 1'000'000,
+                                                        .fold = true,
+                                                        .inserted_count = 1000,
+                                                        .expected_bitset_ndv = 
1000},
+                      BloomFilterBuilderFoldingTestCase{.ndv = 1'000'000,
+                                                        .fold = false,
+                                                        .inserted_count = 1000,
+                                                        .expected_bitset_ndv = 
1'000'000},
+                      BloomFilterBuilderFoldingTestCase{.ndv = 1024,
+                                                        .fold = true,
+                                                        .inserted_count = 1024,
+                                                        .expected_bitset_ndv = 
1024},
+                      BloomFilterBuilderFoldingTestCase{.ndv = 1024,
+                                                        .fold = true,
+                                                        .inserted_count = 0,
+                                                        .expected_bitset_ndv = 
0},
+                      BloomFilterBuilderFoldingTestCase{.ndv = 1024,
+                                                        .fold = false,
+                                                        .inserted_count = 0,
+                                                        .expected_bitset_ndv = 
1024}));
+
 TEST(BloomFilterBuilder, InvalidOperations) {
   SchemaDescriptor schema;
   schema::NodePtr root = schema::GroupNode::Make(
@@ -158,7 +267,7 @@ TEST(BloomFilterBuilder, InvalidOperations) {
   schema.Init(root);
 
   WriterProperties::Builder properties_builder;
-  BloomFilterOptions bloom_filter_options{100, 0.05};
+  BloomFilterOptions bloom_filter_options{.ndv = 100, .fpp = 0.05};
   properties_builder.enable_bloom_filter("c1", bloom_filter_options);
   properties_builder.enable_bloom_filter("c2", bloom_filter_options);
   auto properties = properties_builder.build();
diff --git a/cpp/src/parquet/bloom_filter_writer.cc 
b/cpp/src/parquet/bloom_filter_writer.cc
index f06b866c30..a52ca25163 100644
--- a/cpp/src/parquet/bloom_filter_writer.cc
+++ b/cpp/src/parquet/bloom_filter_writer.cc
@@ -185,8 +185,16 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder {
   const WriterProperties* properties_;
   bool finished_ = false;
 
-  using RowGroupBloomFilters =
-      std::map</*column_id=*/int32_t, std::shared_ptr<BloomFilter>>;
+  struct RowGroupBloomFilters {
+    struct BloomFilterEntry {
+      std::shared_ptr<BlockSplitBloomFilter> filter;
+      double target_fpp;
+      bool try_fold;
+    };
+
+    std::map</*column_id=*/int32_t, BloomFilterEntry> entries;
+  };
+
   std::vector<RowGroupBloomFilters> bloom_filters_;  // indexed by row group 
ordinal
 };
 
@@ -206,7 +214,7 @@ BloomFilter* 
BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) {
 
   CheckState(column_ordinal);
 
-  auto& curr_rg_bfs = *bloom_filters_.rbegin();
+  auto& curr_rg_bfs = bloom_filters_.back().entries;
   if (curr_rg_bfs.find(column_ordinal) != curr_rg_bfs.cend()) {
     std::stringstream ss;
     ss << "Bloom filter already exists for column: " << column_ordinal
@@ -214,9 +222,15 @@ BloomFilter* 
BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) {
     throw ParquetException(ss.str());
   }
 
-  auto bf = 
std::make_unique<BlockSplitBloomFilter>(properties_->memory_pool());
-  bf->Init(BlockSplitBloomFilter::OptimalNumOfBytes(opts->ndv, opts->fpp));
-  return curr_rg_bfs.emplace(column_ordinal, 
std::move(bf)).first->second.get();
+  ARROW_DCHECK(opts->ndv.has_value());
+  auto bf = 
std::make_shared<BlockSplitBloomFilter>(properties_->memory_pool());
+  bf->Init(BlockSplitBloomFilter::OptimalNumOfBytes(opts->ndv.value(), 
opts->fpp));
+  return curr_rg_bfs
+      .emplace(
+          column_ordinal,
+          RowGroupBloomFilters::BloomFilterEntry{
+              .filter = std::move(bf), .target_fpp = opts->fpp, .try_fold = 
opts->fold})
+      .first->second.filter.get();
 }
 
 IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* 
sink) {
@@ -228,11 +242,14 @@ IndexLocations 
BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink)
   IndexLocations locations;
 
   for (size_t i = 0; i != bloom_filters_.size(); ++i) {
-    auto& row_group_bloom_filters = bloom_filters_[i];
-    for (const auto& [column_id, filter] : row_group_bloom_filters) {
+    auto& row_group_bloom_filters = bloom_filters_[i].entries;
+    for (auto& [column_id, entry] : row_group_bloom_filters) {
       // TODO(GH-43138): Determine the quality of bloom filter before writing 
it.
       PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());
-      filter->WriteTo(sink);
+      if (entry.try_fold) {
+        entry.filter->FoldToTargetFpp(entry.target_fpp);
+      }
+      entry.filter->WriteTo(sink);
       PARQUET_ASSIGN_OR_THROW(int64_t pos, sink->Tell());
 
       if (pos - offset > std::numeric_limits<int32_t>::max()) {
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 6634bac4f6..7eb37c3d52 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <memory>
+#include <optional>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
@@ -173,12 +174,13 @@ static constexpr SizeStatisticsLevel 
DEFAULT_SIZE_STATISTICS_LEVEL =
 struct PARQUET_EXPORT BloomFilterOptions {
   /// Expected number of distinct values (NDV) in the bloom filter.
   ///
-  /// Bloom filters are most effective for high-cardinality columns. A good 
default
-  /// is to set ndv equal to the number of rows. Lower values reduce disk 
usage but
-  /// may not be worthwhile for very small NDVs.
+  /// Bloom filters are most effective for high-cardinality columns. If unset, 
the
+  /// writer resolves ndv to the max row group row count. Lower values reduce 
disk
+  /// usage but may not be worthwhile for very small NDVs.
   ///
-  /// Increasing ndv (without increasing fpp) increases disk and memory usage.
-  int32_t ndv = 1 << 20;
+  /// Increasing ndv (without increasing fpp) increases memory usage. Folding 
only
+  /// shrinks a filter before serialization; it will not grow an undersized 
filter.
+  std::optional<int64_t> ndv = std::nullopt;
 
   /// False-positive probability (FPP) of the bloom filter.
   ///
@@ -202,6 +204,23 @@ struct PARQUET_EXPORT BloomFilterOptions {
   /// | 10,000,000 | 0.05  | 13.4     | 16384 KiB |
   /// | 10,000,000 | 0.01  | 13.4     | 16384 KiB |
   double fpp = 0.05;
+
+  /// Whether to fold the bloom filter before writing it.
+  ///
+  /// If true, the writer may fold the filter before serialization to reduce 
disk
+  /// usage while preserving the target fpp estimate. Highly skewed block 
occupancy
+  /// can make this estimate optimistic; disable folding to preserve the 
initial
+  /// filter size.
+  ///
+  /// The writer resolves ndv and fold behavior as follows:
+  ///
+  /// | fold  | ndv       | Resolved ndv             | Write behavior |
+  /// |:------|:----------|:-------------------------|:---------------|
+  /// | true  | unset     | max row group row count  | try to fold    |
+  /// | true  | specified | specified ndv            | try to fold    |
+  /// | false | unset     | max row group row count  | do not fold    |
+  /// | false | specified | specified ndv            | do not fold    |
+  bool fold = true;
 };
 
 class PARQUET_EXPORT ColumnProperties {
@@ -251,11 +270,15 @@ class PARQUET_EXPORT ColumnProperties {
   }
 
   void set_bloom_filter_options(const BloomFilterOptions& 
bloom_filter_options) {
-    if (bloom_filter_options.fpp >= 1.0 || bloom_filter_options.fpp <= 0.0) {
+    if (!(bloom_filter_options.fpp > 0.0 && bloom_filter_options.fpp < 1.0)) {
       throw ParquetException(
           "Bloom filter false positive probability must be in (0.0, 1.0), got 
" +
           std::to_string(bloom_filter_options.fpp));
     }
+    if (bloom_filter_options.ndv.has_value() && 
bloom_filter_options.ndv.value() < 0) {
+      throw ParquetException("Bloom filter number of distinct values must be 
>= 0, got " +
+                             std::to_string(bloom_filter_options.ndv.value()));
+    }
     bloom_filter_options_ = bloom_filter_options;
   }
 
@@ -863,8 +886,16 @@ class PARQUET_EXPORT WriterProperties {
         get(item.first).set_statistics_enabled(item.second);
       for (const auto& item : page_index_enabled_)
         get(item.first).set_page_index_enabled(item.second);
-      for (const auto& item : bloom_filter_options_)
-        get(item.first).set_bloom_filter_options(item.second);
+      for (const auto& item : bloom_filter_options_) {
+        const auto& bloom_filter_options = item.second;
+        if (bloom_filter_options.ndv.has_value()) {
+          get(item.first).set_bloom_filter_options(bloom_filter_options);
+        } else {
+          auto resolved_options = bloom_filter_options;
+          resolved_options.ndv = max_row_group_length_;
+          get(item.first).set_bloom_filter_options(resolved_options);
+        }
+      }
 
       return std::shared_ptr<WriterProperties>(new WriterProperties(
           pool_, dictionary_pagesize_limit_, write_batch_size_, 
max_row_group_length_,
diff --git a/cpp/src/parquet/properties_test.cc 
b/cpp/src/parquet/properties_test.cc
index 0743b7ad4d..324ea2026a 100644
--- a/cpp/src/parquet/properties_test.cc
+++ b/cpp/src/parquet/properties_test.cc
@@ -15,8 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
+#include <limits>
+#include <optional>
 #include <string>
 
 #include "arrow/buffer.h"
@@ -24,6 +27,7 @@
 
 #include "parquet/file_reader.h"
 #include "parquet/properties.h"
+#include "parquet/test_util.h"
 
 namespace parquet {
 
@@ -115,6 +119,104 @@ TEST(TestWriterProperties, SetCodecOptions) {
                     ->window_bits);
 }
 
+TEST(TestWriterProperties, BloomFilterNdvDefaults) {
+  BloomFilterOptions options;
+  ASSERT_FALSE(options.ndv.has_value());
+  ASSERT_TRUE(options.fold);
+  options.fpp = 0.05;
+
+  auto props = WriterProperties::Builder()
+                   .max_row_group_length(12345)
+                   ->enable_bloom_filter("a", options)
+                   ->build();
+
+  const auto resolved = 
props->bloom_filter_options(ColumnPath::FromDotString("a"));
+  ASSERT_TRUE(resolved.has_value());
+  ASSERT_TRUE(resolved->ndv.has_value());
+  ASSERT_EQ(12345, resolved->ndv.value());
+  ASSERT_EQ(options.fpp, resolved->fpp);
+  ASSERT_TRUE(resolved->fold);
+}
+
+TEST(TestWriterProperties, BloomFilterExplicitNdv) {
+  BloomFilterOptions options{.ndv = 777, .fpp = 0.05};
+
+  auto props = WriterProperties::Builder()
+                   .max_row_group_length(12345)
+                   ->enable_bloom_filter("a", options)
+                   ->build();
+
+  const auto resolved = 
props->bloom_filter_options(ColumnPath::FromDotString("a"));
+  ASSERT_TRUE(resolved.has_value());
+  ASSERT_TRUE(resolved->ndv.has_value());
+  ASSERT_EQ(777, resolved->ndv.value());
+}
+
+TEST(TestWriterProperties, BloomFilterNdvDefaultWithoutFolding) {
+  BloomFilterOptions options{.ndv = std::nullopt, .fpp = 0.05, .fold = false};
+
+  auto props = WriterProperties::Builder()
+                   .max_row_group_length(12345)
+                   ->enable_bloom_filter("a", options)
+                   ->build();
+
+  const auto resolved = 
props->bloom_filter_options(ColumnPath::FromDotString("a"));
+  ASSERT_TRUE(resolved.has_value());
+  ASSERT_TRUE(resolved->ndv.has_value());
+  ASSERT_EQ(12345, resolved->ndv.value());
+  ASSERT_EQ(options.fpp, resolved->fpp);
+  ASSERT_FALSE(resolved->fold);
+}
+
+TEST(TestWriterProperties, BloomFilterExplicitNdvWithoutFolding) {
+  BloomFilterOptions options{.ndv = 777, .fpp = 0.05, .fold = false};
+
+  auto props = WriterProperties::Builder()
+                   .max_row_group_length(12345)
+                   ->enable_bloom_filter("a", options)
+                   ->build();
+
+  const auto resolved = 
props->bloom_filter_options(ColumnPath::FromDotString("a"));
+  ASSERT_TRUE(resolved.has_value());
+  ASSERT_TRUE(resolved->ndv.has_value());
+  ASSERT_EQ(777, resolved->ndv.value());
+  ASSERT_FALSE(resolved->fold);
+}
+
+TEST(TestWriterProperties, BloomFilterRejectsNegativeNdv) {
+  BloomFilterOptions options{.ndv = -1, .fpp = 0.05};
+
+  EXPECT_THROW_THAT(
+      [&]() { WriterProperties::Builder().enable_bloom_filter("a", 
options)->build(); },
+      ParquetException,
+      ::testing::Property(
+          &ParquetException::what,
+          ::testing::HasSubstr("Bloom filter number of distinct values must be 
>= 0")));
+}
+
+TEST(TestWriterProperties, BloomFilterRejectsNanFpp) {
+  BloomFilterOptions options{.ndv = 777, .fpp = 
std::numeric_limits<double>::quiet_NaN()};
+
+  EXPECT_THROW_THAT(
+      [&]() { WriterProperties::Builder().enable_bloom_filter("a", 
options)->build(); },
+      ParquetException,
+      ::testing::Property(
+          &ParquetException::what,
+          ::testing::HasSubstr(
+              "Bloom filter false positive probability must be in (0.0, 
1.0)")));
+}
+
+TEST(TestWriterProperties, BloomFilterAllowsZeroNdv) {
+  BloomFilterOptions options{.ndv = 0, .fpp = 0.05};
+
+  auto props = WriterProperties::Builder().enable_bloom_filter("a", 
options)->build();
+
+  const auto resolved = 
props->bloom_filter_options(ColumnPath::FromDotString("a"));
+  ASSERT_TRUE(resolved.has_value());
+  ASSERT_TRUE(resolved->ndv.has_value());
+  ASSERT_EQ(0, resolved->ndv.value());
+}
+
 TEST(TestWriterProperties, ContentDefinedChunkingSettings) {
   WriterProperties::Builder builder;
   std::shared_ptr<WriterProperties> props = builder.build();
@@ -218,6 +320,16 @@ TEST_P(WriterPropertiesTest, RoundTripThroughBuilder) {
               column_properties.page_index_enabled());
     ASSERT_EQ(round_tripped_col.statistics_enabled(),
               column_properties.statistics_enabled());
+    const auto round_tripped_bloom_filter_options =
+        round_tripped_col.bloom_filter_options();
+    const auto bloom_filter_options = column_properties.bloom_filter_options();
+    ASSERT_EQ(round_tripped_bloom_filter_options.has_value(),
+              bloom_filter_options.has_value());
+    if (bloom_filter_options.has_value()) {
+      ASSERT_EQ(round_tripped_bloom_filter_options->ndv, 
bloom_filter_options->ndv);
+      ASSERT_EQ(round_tripped_bloom_filter_options->fpp, 
bloom_filter_options->fpp);
+      ASSERT_EQ(round_tripped_bloom_filter_options->fold, 
bloom_filter_options->fold);
+    }
   }
 }
 
@@ -285,6 +397,13 @@ std::vector<WriterPropertiesTestCase> 
writer_properties_test_cases() {
     builder.enable_write_page_index(column_a);
     test_cases.emplace_back(builder.build(), "page_index_column_override");
   }
+  {
+    WriterProperties::Builder builder;
+    builder.max_row_group_length(12345);
+    builder.enable_bloom_filter(
+        column_a, BloomFilterOptions{.ndv = std::nullopt, .fpp = 0.05, .fold = 
false});
+    test_cases.emplace_back(builder.build(), "bloom_filter_column_override");
+  }
 
   return test_cases;
 }

Reply via email to