This is an automated email from the ASF dual-hosted git repository.
pitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 108240fd60 GH-50007: [C++][Parquet] Add bloom filter folding to
automatically size SBBF filters (#50008)
108240fd60 is described below
commit 108240fd609e1fcf2422b318fcbf79a85e5ef646
Author: Zehua Zou <[email protected]>
AuthorDate: Thu Jun 11 16:42:14 2026 +0800
GH-50007: [C++][Parquet] Add bloom filter folding to automatically size
SBBF filters (#50008)
### Rationale for this change
This PR follows https://github.com/apache/arrow-rs/pull/9628. It supports
optimizing the disk usage of the Bloom filter. So specifying an ndv value
larger than the actual value will not affect disk usage.
> Bloom filters now support folding mode: allocate a conservatively large
filter (sized for worst-case NDV), insert all values during writing, then fold
down at flush time to meet a target FPP. This eliminates the need to guess NDV
upfront and produces optimally-sized filters automatically.
### What changes are included in this PR?
`BloomFilterBuilder` will try to fold the bloom filter before writing it to
the output stream.
### Are these changes tested?
Yes.
### Are there any user-facing changes?
Yes.
The type of `ndv` in `BloomFilterOptions` is changed from `int32_t` to
`std::optional<int64_t>`. And the argument type of `OptimalNumOfBytes` and
`OptimalNumOfBits` in `BlockSplitBloomFilter` is changed from `uint32_t ndv` to
`uint64_t ndv`.
Add a new field `fold` in `BloomFilterOptions` and default value is `true`.
* GitHub Issue: #50007
Lead-authored-by: Zehua Zou <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/parquet/bloom_filter.cc | 93 +++++++++++++++-
cpp/src/parquet/bloom_filter.h | 13 ++-
cpp/src/parquet/bloom_filter_reader_writer_test.cc | 115 +++++++++++++++++++-
cpp/src/parquet/bloom_filter_writer.cc | 35 ++++--
cpp/src/parquet/properties.h | 47 ++++++--
cpp/src/parquet/properties_test.cc | 119 +++++++++++++++++++++
6 files changed, 395 insertions(+), 27 deletions(-)
diff --git a/cpp/src/parquet/bloom_filter.cc b/cpp/src/parquet/bloom_filter.cc
index 577d26fe00..5959817ea2 100644
--- a/cpp/src/parquet/bloom_filter.cc
+++ b/cpp/src/parquet/bloom_filter.cc
@@ -15,13 +15,16 @@
// specific language governing permissions and limitations
// under the License.
+#include <algorithm>
+#include <bit>
+#include <cmath>
#include <cstdint>
#include <cstring>
#include <limits>
#include <memory>
#include "arrow/io/memory.h"
-#include "arrow/result.h"
+#include "arrow/util/bitmap_ops.h"
#include "arrow/util/logging_internal.h"
#include "arrow/util/macros.h"
@@ -345,9 +348,90 @@ void BlockSplitBloomFilter::WriteTo(ArrowOutputStream*
sink) const {
PARQUET_THROW_NOT_OK(sink->Write(data_->data(), num_bytes_));
}
+void BlockSplitBloomFilter::FoldToTargetFpp(double target_fpp) {
+ const auto num_bits = static_cast<int64_t>(num_bytes_) * 8;
+ const auto total_set_bits =
+ ::arrow::internal::CountSetBits(data_->data(), /*bit_offset=*/0,
num_bits);
+ if (total_set_bits == 0) {
+ num_bytes_ = kMinimumBloomFilterBytes;
+ return;
+ }
+
+ const double avg_fill = static_cast<double>(total_set_bits) / num_bits;
+ const uint32_t num_folds = NumFoldsForTargetFpp(target_fpp, avg_fill);
+ if (num_folds > 0) {
+ Fold(num_folds);
+ }
+}
+
+uint32_t BlockSplitBloomFilter::NumFoldsForTargetFpp(double target_fpp,
+ double avg_fill) const {
+ const uint32_t num_blocks = NumBlocks();
+ if (num_blocks < 2) {
+ return 0;
+ }
+ // Number of blocks is a power of two
+ DCHECK_EQ(num_blocks & (num_blocks - 1), 0);
+
+ // Estimate the fill rate after folding from the current average fill rate.
+ // Folding ORs block groups together, so each fold changes the estimated
fill rate
+ // from f to 1 - (1 - f)^2. A membership check tests kBitsSetPerBlock bits,
making
+ // the estimated FPP equal to std::pow(folded_fill_rate, kBitsSetPerBlock).
+ //
+ // See also: Sailhan and Stehr, "Folding and Unfolding Bloom Filters", 2012:
+ // https://hal.science/hal-01126174v1
+ const auto max_folds = static_cast<uint32_t>(std::countr_zero(num_blocks));
+
+ uint32_t num_folds = 0;
+ double unset_probability_after_folds = 1.0 - avg_fill;
+ for (uint32_t i = 0; i < max_folds; ++i) {
+ unset_probability_after_folds *= unset_probability_after_folds;
+ const double folded_fill_rate = 1.0 - unset_probability_after_folds;
+ const double estimated_fpp = std::pow(folded_fill_rate, kBitsSetPerBlock);
+ if (estimated_fpp > target_fpp) {
+ break;
+ }
+ ++num_folds;
+ }
+ return num_folds;
+}
+
+void BlockSplitBloomFilter::Fold(uint32_t num_folds) {
+ DCHECK_GT(num_folds, 0);
+
+ const uint32_t num_blocks = NumBlocks();
+ // A fold group is a consecutive run of blocks ORed into one output block.
+ // Keeping the group size as (1 << num_folds) preserves a power-of-two bitset
+ // size. Folding by this power-of-two group size keeps the old-to-new bucket
+ // remapping aligned with bucket lookup and avoids false negatives.
+ const uint32_t group_size = UINT32_C(1) << num_folds;
+ DCHECK_LE(group_size, num_blocks);
+
+ const uint32_t new_num_blocks = num_blocks / group_size;
+ auto* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
+
+ for (uint32_t dst_block = 0; dst_block < new_num_blocks; ++dst_block) {
+ uint32_t* dst = bitset32 + dst_block * kBitsSetPerBlock;
+
+ const uint32_t src_block = dst_block * group_size;
+ const uint32_t* src = bitset32 + src_block * kBitsSetPerBlock;
+ if (dst != src) {
+ std::copy_n(src, kBitsSetPerBlock, dst);
+ }
+
+ for (uint32_t fold_block = 1; fold_block < group_size; ++fold_block) {
+ src = bitset32 + (src_block + fold_block) * kBitsSetPerBlock;
+ for (int word = 0; word < kBitsSetPerBlock; ++word) {
+ dst[word] |= src[word];
+ }
+ }
+ }
+
+ num_bytes_ = new_num_blocks * kBytesPerFilterBlock;
+}
+
bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
- const uint32_t bucket_index =
- static_cast<uint32_t>(((hash >> 32) * (num_bytes_ /
kBytesPerFilterBlock)) >> 32);
+ const uint32_t bucket_index = static_cast<uint32_t>(((hash >> 32) *
NumBlocks()) >> 32);
const uint32_t key = static_cast<uint32_t>(hash);
const uint32_t* bitset32 = reinterpret_cast<const uint32_t*>(data_->data());
@@ -363,8 +447,7 @@ bool BlockSplitBloomFilter::FindHash(uint64_t hash) const {
}
void BlockSplitBloomFilter::InsertHashImpl(uint64_t hash) {
- const uint32_t bucket_index =
- static_cast<uint32_t>(((hash >> 32) * (num_bytes_ /
kBytesPerFilterBlock)) >> 32);
+ const uint32_t bucket_index = static_cast<uint32_t>(((hash >> 32) *
NumBlocks()) >> 32);
const uint32_t key = static_cast<uint32_t>(hash);
uint32_t* bitset32 = reinterpret_cast<uint32_t*>(data_->mutable_data());
diff --git a/cpp/src/parquet/bloom_filter.h b/cpp/src/parquet/bloom_filter.h
index cabcd5b4a5..0b4c8df708 100644
--- a/cpp/src/parquet/bloom_filter.h
+++ b/cpp/src/parquet/bloom_filter.h
@@ -230,7 +230,7 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public
BloomFilter {
/// @param fpp The false positive probability.
/// @return it always return a value between kMinimumBloomFilterBytes and
/// kMaximumBloomFilterBytes, and the return value is always a power of 2
- static uint32_t OptimalNumOfBytes(uint32_t ndv, double fpp) {
+ static uint32_t OptimalNumOfBytes(uint64_t ndv, double fpp) {
uint32_t optimal_num_of_bits = OptimalNumOfBits(ndv, fpp);
ARROW_DCHECK(::arrow::bit_util::IsMultipleOf8(optimal_num_of_bits));
return optimal_num_of_bits >> 3;
@@ -243,7 +243,7 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public
BloomFilter {
/// @param fpp The false positive probability.
/// @return it always return a value between kMinimumBloomFilterBytes * 8 and
/// kMaximumBloomFilterBytes * 8, and the return value is always a power of
16
- static uint32_t OptimalNumOfBits(uint32_t ndv, double fpp) {
+ static uint32_t OptimalNumOfBits(uint64_t ndv, double fpp) {
ARROW_DCHECK(fpp > 0.0 && fpp < 1.0);
const double m = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8));
uint32_t num_bits;
@@ -276,6 +276,9 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public
BloomFilter {
bool FindHash(uint64_t hash) const override;
void InsertHash(uint64_t hash) override;
void InsertHashes(const uint64_t* hashes, int num_values) override;
+ /// Fold the bloom filter down to the smallest size that still meets the
target FPP
+ /// (False Positive Probability).
+ void FoldToTargetFpp(double target_fpp);
void WriteTo(ArrowOutputStream* sink) const override;
uint32_t GetBitsetSize() const override { return num_bytes_; }
@@ -350,6 +353,12 @@ class PARQUET_EXPORT BlockSplitBloomFilter : public
BloomFilter {
private:
inline void InsertHashImpl(uint64_t hash);
+ uint32_t NumBlocks() const {
+ ARROW_DCHECK_EQ(num_bytes_ % kBytesPerFilterBlock, 0);
+ return num_bytes_ / kBytesPerFilterBlock;
+ }
+ uint32_t NumFoldsForTargetFpp(double target_fpp, double avg_fill) const;
+ void Fold(uint32_t num_folds);
// Bytes in a tiny Bloom filter block.
static constexpr int kBytesPerFilterBlock = 32;
diff --git a/cpp/src/parquet/bloom_filter_reader_writer_test.cc
b/cpp/src/parquet/bloom_filter_reader_writer_test.cc
index d646a2b8fc..2750fde029 100644
--- a/cpp/src/parquet/bloom_filter_reader_writer_test.cc
+++ b/cpp/src/parquet/bloom_filter_reader_writer_test.cc
@@ -91,9 +91,9 @@ TEST(BloomFilterBuilder, BasicRoundTrip) {
"schema", Repetition::REPEATED, {schema::ByteArray("c1"),
schema::ByteArray("c2")});
schema.Init(root);
- BloomFilterOptions bloom_filter_options{100, 0.05};
+ BloomFilterOptions bloom_filter_options{.ndv = 100, .fpp = 0.05};
const auto bitset_size = BlockSplitBloomFilter::OptimalNumOfBytes(
- bloom_filter_options.ndv, bloom_filter_options.fpp);
+ bloom_filter_options.ndv.value(), bloom_filter_options.fpp);
WriterProperties::Builder properties_builder;
properties_builder.enable_bloom_filter("c1", bloom_filter_options);
auto writer_properties = properties_builder.build();
@@ -150,6 +150,115 @@ TEST(BloomFilterBuilder, BasicRoundTrip) {
}
}
+namespace {
+
+struct BloomFilterBuilderFoldingTestCase {
+ int64_t ndv;
+ bool fold;
+ int32_t inserted_count;
+ int64_t expected_bitset_ndv;
+};
+
+class BloomFilterBuilderFoldingTest
+ : public ::testing::TestWithParam<BloomFilterBuilderFoldingTestCase> {};
+
+} // namespace
+
+TEST_P(BloomFilterBuilderFoldingTest, RespectsOption) {
+ const auto& test_case = GetParam();
+
+ SchemaDescriptor schema;
+ schema::NodePtr root =
+ schema::GroupNode::Make("schema", Repetition::REPEATED,
{schema::ByteArray("c1")});
+ schema.Init(root);
+
+ constexpr double kFpp = 0.05;
+ BloomFilterOptions bloom_filter_options{
+ .ndv = test_case.ndv, .fpp = kFpp, .fold = test_case.fold};
+ const auto initial_bitset_size = BlockSplitBloomFilter::OptimalNumOfBytes(
+ bloom_filter_options.ndv.value(), bloom_filter_options.fpp);
+ WriterProperties::Builder properties_builder;
+ properties_builder.enable_bloom_filter("c1", bloom_filter_options);
+ auto writer_properties = properties_builder.build();
+ auto bloom_filter_builder = BloomFilterBuilder::Make(&schema,
writer_properties.get());
+
+ bloom_filter_builder->AppendRowGroup();
+ auto bloom_filter =
bloom_filter_builder->CreateBloomFilter(/*column_ordinal=*/0);
+ ASSERT_NE(bloom_filter, nullptr);
+ ASSERT_EQ(initial_bitset_size, bloom_filter->GetBitsetSize());
+
+ std::vector<uint64_t> hashes;
+ hashes.reserve(test_case.inserted_count);
+ for (int32_t i = 0; i < test_case.inserted_count; ++i) {
+ const auto hash = bloom_filter->Hash(i);
+ hashes.push_back(hash);
+ bloom_filter->InsertHash(hash);
+ }
+
+ auto sink = CreateOutputStream();
+ auto locations = bloom_filter_builder->WriteTo(sink.get());
+ ASSERT_EQ(locations.size(), 1);
+ ASSERT_OK_AND_ASSIGN(auto buffer, sink->Finish());
+
+ const auto& location = locations.front().second;
+ ReaderProperties reader_properties;
+ ::arrow::io::BufferReader reader(
+ ::arrow::SliceBuffer(buffer, location.offset, location.length));
+ auto filter = parquet::BlockSplitBloomFilter::Deserialize(reader_properties,
&reader);
+
+ const auto actual_bitset_size = filter.GetBitsetSize();
+
EXPECT_EQ(BlockSplitBloomFilter::OptimalNumOfBytes(test_case.expected_bitset_ndv,
kFpp),
+ actual_bitset_size);
+
+ if (test_case.fold) {
+ EXPECT_LE(actual_bitset_size, initial_bitset_size);
+ } else {
+ EXPECT_EQ(actual_bitset_size, initial_bitset_size);
+ }
+
+ for (uint64_t hash : hashes) {
+ EXPECT_TRUE(filter.FindHash(hash));
+ }
+
+ int32_t false_positives = 0;
+ constexpr int32_t kNonInsertedCount = 10'000;
+ for (int32_t i = test_case.inserted_count;
+ i < test_case.inserted_count + kNonInsertedCount; ++i) {
+ false_positives += filter.FindHash(filter.Hash(i));
+ }
+ const auto sample_fpp = static_cast<double>(false_positives) /
kNonInsertedCount;
+ EXPECT_LT(sample_fpp, kFpp);
+
+ if (test_case.fold && test_case.inserted_count > 0) {
+ // If the actual fpp, as computed on this sample, is significantly below
kFpp / 2,
+ // then we could have folded the bloom filter at least once more.
+ EXPECT_GT(sample_fpp, kFpp / 2.1);
+ }
+}
+
+INSTANTIATE_TEST_SUITE_P(
+ BloomFilterBuilder, BloomFilterBuilderFoldingTest,
+ ::testing::Values(BloomFilterBuilderFoldingTestCase{.ndv = 1'000'000,
+ .fold = true,
+ .inserted_count = 1000,
+ .expected_bitset_ndv =
1000},
+ BloomFilterBuilderFoldingTestCase{.ndv = 1'000'000,
+ .fold = false,
+ .inserted_count = 1000,
+ .expected_bitset_ndv =
1'000'000},
+ BloomFilterBuilderFoldingTestCase{.ndv = 1024,
+ .fold = true,
+ .inserted_count = 1024,
+ .expected_bitset_ndv =
1024},
+ BloomFilterBuilderFoldingTestCase{.ndv = 1024,
+ .fold = true,
+ .inserted_count = 0,
+ .expected_bitset_ndv =
0},
+ BloomFilterBuilderFoldingTestCase{.ndv = 1024,
+ .fold = false,
+ .inserted_count = 0,
+ .expected_bitset_ndv =
1024}));
+
TEST(BloomFilterBuilder, InvalidOperations) {
SchemaDescriptor schema;
schema::NodePtr root = schema::GroupNode::Make(
@@ -158,7 +267,7 @@ TEST(BloomFilterBuilder, InvalidOperations) {
schema.Init(root);
WriterProperties::Builder properties_builder;
- BloomFilterOptions bloom_filter_options{100, 0.05};
+ BloomFilterOptions bloom_filter_options{.ndv = 100, .fpp = 0.05};
properties_builder.enable_bloom_filter("c1", bloom_filter_options);
properties_builder.enable_bloom_filter("c2", bloom_filter_options);
auto properties = properties_builder.build();
diff --git a/cpp/src/parquet/bloom_filter_writer.cc
b/cpp/src/parquet/bloom_filter_writer.cc
index f06b866c30..a52ca25163 100644
--- a/cpp/src/parquet/bloom_filter_writer.cc
+++ b/cpp/src/parquet/bloom_filter_writer.cc
@@ -185,8 +185,16 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder {
const WriterProperties* properties_;
bool finished_ = false;
- using RowGroupBloomFilters =
- std::map</*column_id=*/int32_t, std::shared_ptr<BloomFilter>>;
+ struct RowGroupBloomFilters {
+ struct BloomFilterEntry {
+ std::shared_ptr<BlockSplitBloomFilter> filter;
+ double target_fpp;
+ bool try_fold;
+ };
+
+ std::map</*column_id=*/int32_t, BloomFilterEntry> entries;
+ };
+
std::vector<RowGroupBloomFilters> bloom_filters_; // indexed by row group
ordinal
};
@@ -206,7 +214,7 @@ BloomFilter*
BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) {
CheckState(column_ordinal);
- auto& curr_rg_bfs = *bloom_filters_.rbegin();
+ auto& curr_rg_bfs = bloom_filters_.back().entries;
if (curr_rg_bfs.find(column_ordinal) != curr_rg_bfs.cend()) {
std::stringstream ss;
ss << "Bloom filter already exists for column: " << column_ordinal
@@ -214,9 +222,15 @@ BloomFilter*
BloomFilterBuilderImpl::CreateBloomFilter(int32_t column_ordinal) {
throw ParquetException(ss.str());
}
- auto bf =
std::make_unique<BlockSplitBloomFilter>(properties_->memory_pool());
- bf->Init(BlockSplitBloomFilter::OptimalNumOfBytes(opts->ndv, opts->fpp));
- return curr_rg_bfs.emplace(column_ordinal,
std::move(bf)).first->second.get();
+ ARROW_DCHECK(opts->ndv.has_value());
+ auto bf =
std::make_shared<BlockSplitBloomFilter>(properties_->memory_pool());
+ bf->Init(BlockSplitBloomFilter::OptimalNumOfBytes(opts->ndv.value(),
opts->fpp));
+ return curr_rg_bfs
+ .emplace(
+ column_ordinal,
+ RowGroupBloomFilters::BloomFilterEntry{
+ .filter = std::move(bf), .target_fpp = opts->fpp, .try_fold =
opts->fold})
+ .first->second.filter.get();
}
IndexLocations BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream*
sink) {
@@ -228,11 +242,14 @@ IndexLocations
BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink)
IndexLocations locations;
for (size_t i = 0; i != bloom_filters_.size(); ++i) {
- auto& row_group_bloom_filters = bloom_filters_[i];
- for (const auto& [column_id, filter] : row_group_bloom_filters) {
+ auto& row_group_bloom_filters = bloom_filters_[i].entries;
+ for (auto& [column_id, entry] : row_group_bloom_filters) {
// TODO(GH-43138): Determine the quality of bloom filter before writing
it.
PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());
- filter->WriteTo(sink);
+ if (entry.try_fold) {
+ entry.filter->FoldToTargetFpp(entry.target_fpp);
+ }
+ entry.filter->WriteTo(sink);
PARQUET_ASSIGN_OR_THROW(int64_t pos, sink->Tell());
if (pos - offset > std::numeric_limits<int32_t>::max()) {
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 6634bac4f6..7eb37c3d52 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -18,6 +18,7 @@
#pragma once
#include <memory>
+#include <optional>
#include <string>
#include <unordered_map>
#include <unordered_set>
@@ -173,12 +174,13 @@ static constexpr SizeStatisticsLevel
DEFAULT_SIZE_STATISTICS_LEVEL =
struct PARQUET_EXPORT BloomFilterOptions {
/// Expected number of distinct values (NDV) in the bloom filter.
///
- /// Bloom filters are most effective for high-cardinality columns. A good
default
- /// is to set ndv equal to the number of rows. Lower values reduce disk
usage but
- /// may not be worthwhile for very small NDVs.
+ /// Bloom filters are most effective for high-cardinality columns. If unset,
the
+ /// writer resolves ndv to the max row group row count. Lower values reduce
disk
+ /// usage but may not be worthwhile for very small NDVs.
///
- /// Increasing ndv (without increasing fpp) increases disk and memory usage.
- int32_t ndv = 1 << 20;
+ /// Increasing ndv (without increasing fpp) increases memory usage. Folding
only
+ /// shrinks a filter before serialization; it will not grow an undersized
filter.
+ std::optional<int64_t> ndv = std::nullopt;
/// False-positive probability (FPP) of the bloom filter.
///
@@ -202,6 +204,23 @@ struct PARQUET_EXPORT BloomFilterOptions {
/// | 10,000,000 | 0.05 | 13.4 | 16384 KiB |
/// | 10,000,000 | 0.01 | 13.4 | 16384 KiB |
double fpp = 0.05;
+
+ /// Whether to fold the bloom filter before writing it.
+ ///
+ /// If true, the writer may fold the filter before serialization to reduce
disk
+ /// usage while preserving the target fpp estimate. Highly skewed block
occupancy
+ /// can make this estimate optimistic; disable folding to preserve the
initial
+ /// filter size.
+ ///
+ /// The writer resolves ndv and fold behavior as follows:
+ ///
+ /// | fold | ndv | Resolved ndv | Write behavior |
+ /// |:------|:----------|:-------------------------|:---------------|
+ /// | true | unset | max row group row count | try to fold |
+ /// | true | specified | specified ndv | try to fold |
+ /// | false | unset | max row group row count | do not fold |
+ /// | false | specified | specified ndv | do not fold |
+ bool fold = true;
};
class PARQUET_EXPORT ColumnProperties {
@@ -251,11 +270,15 @@ class PARQUET_EXPORT ColumnProperties {
}
void set_bloom_filter_options(const BloomFilterOptions&
bloom_filter_options) {
- if (bloom_filter_options.fpp >= 1.0 || bloom_filter_options.fpp <= 0.0) {
+ if (!(bloom_filter_options.fpp > 0.0 && bloom_filter_options.fpp < 1.0)) {
throw ParquetException(
"Bloom filter false positive probability must be in (0.0, 1.0), got
" +
std::to_string(bloom_filter_options.fpp));
}
+ if (bloom_filter_options.ndv.has_value() &&
bloom_filter_options.ndv.value() < 0) {
+ throw ParquetException("Bloom filter number of distinct values must be
>= 0, got " +
+ std::to_string(bloom_filter_options.ndv.value()));
+ }
bloom_filter_options_ = bloom_filter_options;
}
@@ -863,8 +886,16 @@ class PARQUET_EXPORT WriterProperties {
get(item.first).set_statistics_enabled(item.second);
for (const auto& item : page_index_enabled_)
get(item.first).set_page_index_enabled(item.second);
- for (const auto& item : bloom_filter_options_)
- get(item.first).set_bloom_filter_options(item.second);
+ for (const auto& item : bloom_filter_options_) {
+ const auto& bloom_filter_options = item.second;
+ if (bloom_filter_options.ndv.has_value()) {
+ get(item.first).set_bloom_filter_options(bloom_filter_options);
+ } else {
+ auto resolved_options = bloom_filter_options;
+ resolved_options.ndv = max_row_group_length_;
+ get(item.first).set_bloom_filter_options(resolved_options);
+ }
+ }
return std::shared_ptr<WriterProperties>(new WriterProperties(
pool_, dictionary_pagesize_limit_, write_batch_size_,
max_row_group_length_,
diff --git a/cpp/src/parquet/properties_test.cc
b/cpp/src/parquet/properties_test.cc
index 0743b7ad4d..324ea2026a 100644
--- a/cpp/src/parquet/properties_test.cc
+++ b/cpp/src/parquet/properties_test.cc
@@ -15,8 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+#include <gmock/gmock.h>
#include <gtest/gtest.h>
+#include <limits>
+#include <optional>
#include <string>
#include "arrow/buffer.h"
@@ -24,6 +27,7 @@
#include "parquet/file_reader.h"
#include "parquet/properties.h"
+#include "parquet/test_util.h"
namespace parquet {
@@ -115,6 +119,104 @@ TEST(TestWriterProperties, SetCodecOptions) {
->window_bits);
}
+TEST(TestWriterProperties, BloomFilterNdvDefaults) {
+ BloomFilterOptions options;
+ ASSERT_FALSE(options.ndv.has_value());
+ ASSERT_TRUE(options.fold);
+ options.fpp = 0.05;
+
+ auto props = WriterProperties::Builder()
+ .max_row_group_length(12345)
+ ->enable_bloom_filter("a", options)
+ ->build();
+
+ const auto resolved =
props->bloom_filter_options(ColumnPath::FromDotString("a"));
+ ASSERT_TRUE(resolved.has_value());
+ ASSERT_TRUE(resolved->ndv.has_value());
+ ASSERT_EQ(12345, resolved->ndv.value());
+ ASSERT_EQ(options.fpp, resolved->fpp);
+ ASSERT_TRUE(resolved->fold);
+}
+
+TEST(TestWriterProperties, BloomFilterExplicitNdv) {
+ BloomFilterOptions options{.ndv = 777, .fpp = 0.05};
+
+ auto props = WriterProperties::Builder()
+ .max_row_group_length(12345)
+ ->enable_bloom_filter("a", options)
+ ->build();
+
+ const auto resolved =
props->bloom_filter_options(ColumnPath::FromDotString("a"));
+ ASSERT_TRUE(resolved.has_value());
+ ASSERT_TRUE(resolved->ndv.has_value());
+ ASSERT_EQ(777, resolved->ndv.value());
+}
+
+TEST(TestWriterProperties, BloomFilterNdvDefaultWithoutFolding) {
+ BloomFilterOptions options{.ndv = std::nullopt, .fpp = 0.05, .fold = false};
+
+ auto props = WriterProperties::Builder()
+ .max_row_group_length(12345)
+ ->enable_bloom_filter("a", options)
+ ->build();
+
+ const auto resolved =
props->bloom_filter_options(ColumnPath::FromDotString("a"));
+ ASSERT_TRUE(resolved.has_value());
+ ASSERT_TRUE(resolved->ndv.has_value());
+ ASSERT_EQ(12345, resolved->ndv.value());
+ ASSERT_EQ(options.fpp, resolved->fpp);
+ ASSERT_FALSE(resolved->fold);
+}
+
+TEST(TestWriterProperties, BloomFilterExplicitNdvWithoutFolding) {
+ BloomFilterOptions options{.ndv = 777, .fpp = 0.05, .fold = false};
+
+ auto props = WriterProperties::Builder()
+ .max_row_group_length(12345)
+ ->enable_bloom_filter("a", options)
+ ->build();
+
+ const auto resolved =
props->bloom_filter_options(ColumnPath::FromDotString("a"));
+ ASSERT_TRUE(resolved.has_value());
+ ASSERT_TRUE(resolved->ndv.has_value());
+ ASSERT_EQ(777, resolved->ndv.value());
+ ASSERT_FALSE(resolved->fold);
+}
+
+TEST(TestWriterProperties, BloomFilterRejectsNegativeNdv) {
+ BloomFilterOptions options{.ndv = -1, .fpp = 0.05};
+
+ EXPECT_THROW_THAT(
+ [&]() { WriterProperties::Builder().enable_bloom_filter("a",
options)->build(); },
+ ParquetException,
+ ::testing::Property(
+ &ParquetException::what,
+ ::testing::HasSubstr("Bloom filter number of distinct values must be
>= 0")));
+}
+
+TEST(TestWriterProperties, BloomFilterRejectsNanFpp) {
+ BloomFilterOptions options{.ndv = 777, .fpp =
std::numeric_limits<double>::quiet_NaN()};
+
+ EXPECT_THROW_THAT(
+ [&]() { WriterProperties::Builder().enable_bloom_filter("a",
options)->build(); },
+ ParquetException,
+ ::testing::Property(
+ &ParquetException::what,
+ ::testing::HasSubstr(
+ "Bloom filter false positive probability must be in (0.0,
1.0)")));
+}
+
+TEST(TestWriterProperties, BloomFilterAllowsZeroNdv) {
+ BloomFilterOptions options{.ndv = 0, .fpp = 0.05};
+
+ auto props = WriterProperties::Builder().enable_bloom_filter("a",
options)->build();
+
+ const auto resolved =
props->bloom_filter_options(ColumnPath::FromDotString("a"));
+ ASSERT_TRUE(resolved.has_value());
+ ASSERT_TRUE(resolved->ndv.has_value());
+ ASSERT_EQ(0, resolved->ndv.value());
+}
+
TEST(TestWriterProperties, ContentDefinedChunkingSettings) {
WriterProperties::Builder builder;
std::shared_ptr<WriterProperties> props = builder.build();
@@ -218,6 +320,16 @@ TEST_P(WriterPropertiesTest, RoundTripThroughBuilder) {
column_properties.page_index_enabled());
ASSERT_EQ(round_tripped_col.statistics_enabled(),
column_properties.statistics_enabled());
+ const auto round_tripped_bloom_filter_options =
+ round_tripped_col.bloom_filter_options();
+ const auto bloom_filter_options = column_properties.bloom_filter_options();
+ ASSERT_EQ(round_tripped_bloom_filter_options.has_value(),
+ bloom_filter_options.has_value());
+ if (bloom_filter_options.has_value()) {
+ ASSERT_EQ(round_tripped_bloom_filter_options->ndv,
bloom_filter_options->ndv);
+ ASSERT_EQ(round_tripped_bloom_filter_options->fpp,
bloom_filter_options->fpp);
+ ASSERT_EQ(round_tripped_bloom_filter_options->fold,
bloom_filter_options->fold);
+ }
}
}
@@ -285,6 +397,13 @@ std::vector<WriterPropertiesTestCase>
writer_properties_test_cases() {
builder.enable_write_page_index(column_a);
test_cases.emplace_back(builder.build(), "page_index_column_override");
}
+ {
+ WriterProperties::Builder builder;
+ builder.max_row_group_length(12345);
+ builder.enable_bloom_filter(
+ column_a, BloomFilterOptions{.ndv = std::nullopt, .fpp = 0.05, .fold =
false});
+ test_cases.emplace_back(builder.build(), "bloom_filter_column_override");
+ }
return test_cases;
}