This is an automated email from the ASF dual-hosted git repository.
maplefu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 5fd30d3dd0 GH-49376: [Python][Parquet] Add ability to write Bloom
filters from pyarrow (#49377)
5fd30d3dd0 is described below
commit 5fd30d3dd00baa826d9c24a7c8caf28428c023c9
Author: Ed Seidl <[email protected]>
AuthorDate: Sun Apr 5 21:15:35 2026 -0700
GH-49376: [Python][Parquet] Add ability to write Bloom filters from pyarrow
(#49377)
Fixes #49376
### Rationale for this change
Adds ability to enable the writing of Parquet Bloom filters via pyarrow.
### What changes are included in this PR?
Adds `bloom_filter_options` to `parquet.write_table`.
### Are these changes tested?
Yes, new tests are added.
### Are there any user-facing changes?
Adds an option (defaults to `None`) to `parquet.write_table`.
* GitHub Issue: #49376
Authored-by: seidl <[email protected]>
Signed-off-by: mwish <[email protected]>
---
python/pyarrow/_parquet.pxd | 3 +-
python/pyarrow/_parquet.pyx | 66 +++++++++++++++++++++++++++--
python/pyarrow/includes/libparquet.pxd | 7 +++
python/pyarrow/parquet/core.py | 28 ++++++++++++
python/pyarrow/tests/parquet/test_basic.py | 68 ++++++++++++++++++++++++++++++
5 files changed, 168 insertions(+), 4 deletions(-)
diff --git a/python/pyarrow/_parquet.pxd b/python/pyarrow/_parquet.pxd
index ef9ed57657..36fc2ccf2f 100644
--- a/python/pyarrow/_parquet.pxd
+++ b/python/pyarrow/_parquet.pxd
@@ -56,7 +56,8 @@ cdef shared_ptr[WriterProperties] _create_writer_properties(
write_page_checksum=*,
sorting_columns=*,
store_decimal_as_integer=*,
- use_content_defined_chunking=*
+ use_content_defined_chunking=*,
+ bloom_filter_options=*
) except *
diff --git a/python/pyarrow/_parquet.pyx b/python/pyarrow/_parquet.pyx
index fa89b6812e..afd85da1ef 100644
--- a/python/pyarrow/_parquet.pyx
+++ b/python/pyarrow/_parquet.pyx
@@ -1973,6 +1973,60 @@ cdef vector[CSortingColumn]
_convert_sorting_columns(sorting_columns) except *:
return c_sorting_columns
+cdef void _set_bloom_opts_for_column(
+ WriterProperties.Builder* props,
+ column,
+ column_bloom_opts) except *:
+ """Set Bloom filter options for a single column"""
+ cdef:
+ BloomFilterOptions bloom_opts
+
+ if isinstance(column_bloom_opts, dict):
+ if "ndv" in column_bloom_opts:
+ ndv = column_bloom_opts["ndv"]
+ if isinstance(ndv, int):
+ if ndv <= 0:
+ raise ValueError(
+ f"'bloom_filter_options:ndv' for column '{column}'
must be greater than zero, got {ndv}")
+ bloom_opts.ndv = ndv
+ else:
+ raise TypeError(
+ f"'bloom_filter_options:ndv' for column '{column}' must be
an int")
+ if "fpp" in column_bloom_opts:
+ fpp = column_bloom_opts["fpp"]
+ if isinstance(fpp, float):
+ if fpp <= 0.0 or fpp >= 1.0:
+ raise ValueError(
+ f"'bloom_filter_options:fpp' for column '{column}'
must be in (0.0, 1.0), got {fpp}")
+ bloom_opts.fpp = fpp
+ else:
+ raise TypeError(
+ f"'bloom_filter_options:fpp' for column '{column}' must be
a float")
+ elif isinstance(column_bloom_opts, bool):
+ # if True then use the defaults set above, if False then disable
+ if not column_bloom_opts:
+ props.disable_bloom_filter(tobytes(column))
+ return
+ else:
+ raise TypeError(
+ f"'bloom_filter_options:{column}' must be a boolean or a
dictionary")
+
+ props.enable_bloom_filter(tobytes(column), bloom_opts)
+
+
+cdef void _set_bloom_filter_opts(
+ WriterProperties.Builder* props,
+ bloom_filter_options) except *:
+ """Set Bloom filter options for all columns"""
+ if bloom_filter_options is not None:
+ if isinstance(bloom_filter_options, dict):
+ # for each entry in bloom_filter_options, {"path": {"ndv": ndv,
"fpp", fpp}}
+ # convert (ndv,fpp) to BloomFilterOptions struct and pass to props
+ for column, _bloom_opts in bloom_filter_options.items():
+ _set_bloom_opts_for_column(props, column, _bloom_opts)
+ else:
+ raise TypeError("'bloom_filter_options' must be a dictionary")
+
cdef shared_ptr[WriterProperties] _create_writer_properties(
use_dictionary=None,
@@ -1992,7 +2046,8 @@ cdef shared_ptr[WriterProperties]
_create_writer_properties(
write_page_checksum=False,
sorting_columns=None,
store_decimal_as_integer=False,
- use_content_defined_chunking=False) except *:
+ use_content_defined_chunking=False,
+ bloom_filter_options=None) except *:
"""General writer properties"""
cdef:
@@ -2122,6 +2177,9 @@ cdef shared_ptr[WriterProperties]
_create_writer_properties(
raise TypeError(
"'column_encoding' should be a dictionary or a string")
+ # bloom filters
+ _set_bloom_filter_opts(&props, bloom_filter_options)
+
# size limits
if data_page_size is not None:
props.data_pagesize(data_page_size)
@@ -2317,7 +2375,8 @@ cdef class ParquetWriter(_Weakrefable):
sorting_columns=None,
store_decimal_as_integer=False,
use_content_defined_chunking=False,
- write_time_adjusted_to_utc=False):
+ write_time_adjusted_to_utc=False,
+ bloom_filter_options=None):
cdef:
shared_ptr[WriterProperties] properties
shared_ptr[ArrowWriterProperties] arrow_properties
@@ -2353,7 +2412,8 @@ cdef class ParquetWriter(_Weakrefable):
write_page_checksum=write_page_checksum,
sorting_columns=sorting_columns,
store_decimal_as_integer=store_decimal_as_integer,
- use_content_defined_chunking=use_content_defined_chunking
+ use_content_defined_chunking=use_content_defined_chunking,
+ bloom_filter_options=bloom_filter_options
)
arrow_properties = _create_arrow_writer_properties(
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
diff --git a/python/pyarrow/includes/libparquet.pxd
b/python/pyarrow/includes/libparquet.pxd
index f82ddd4197..bbbac67c02 100644
--- a/python/pyarrow/includes/libparquet.pxd
+++ b/python/pyarrow/includes/libparquet.pxd
@@ -464,6 +464,10 @@ cdef extern from "parquet/api/reader.h" namespace
"parquet" nogil:
cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
+ cdef cppclass BloomFilterOptions:
+ int32_t ndv
+ double fpp
+
cdef cppclass CdcOptions:
int64_t min_chunk_size
int64_t max_chunk_size
@@ -506,6 +510,9 @@ cdef extern from "parquet/api/writer.h" namespace "parquet"
nogil:
Builder* enable_content_defined_chunking()
Builder* disable_content_defined_chunking()
Builder* content_defined_chunking_options(CdcOptions options)
+ Builder* disable_bloom_filter(const c_string& path)
+ Builder* enable_bloom_filter(const c_string& path,
+ BloomFilterOptions
bloom_filter_options)
shared_ptr[WriterProperties] build()
cdef cppclass ArrowWriterProperties:
diff --git a/python/pyarrow/parquet/core.py b/python/pyarrow/parquet/core.py
index 60c9f5ac88..19d8250d51 100644
--- a/python/pyarrow/parquet/core.py
+++ b/python/pyarrow/parquet/core.py
@@ -956,6 +956,32 @@ write_time_adjusted_to_utc : bool, default False
are expressed in reference to midnight in the UTC timezone.
If False (the default), the TIME columns are assumed to be expressed
in reference to midnight in an unknown, presumably local, timezone.
+bloom_filter_options : dict, default None
+ Create Bloom filters for the columns specified by the provided `dict`.
+
+ Bloom filters can be configured with two parameters: number of distinct
values
+ (NDV), and false-positive probability (FPP).
+
+ Bloom filters are most effective for high-cardinality columns. A good
default
+ is to set NDV equal to the number of rows. Lower values reduce disk usage
but
+ may not be worthwhile for very small NDVs. Increasing NDV (without
increasing FPP)
+ increases disk and memory usage.
+
+ Lower FPP values require more disk and memory space. For a fixed NDV, the
+ space requirement grows roughly proportional to log(1/FPP). Recommended
+ values are 0.1, 0.05, or 0.01. Very small values are counterproductive as
+ the bitset may exceed the size of the actual data. Set NDV appropriately
+ to minimize space usage.
+
+ The keys of the `dict` are column paths. For each path, the value can be
either:
+
+ - A dictionary, with keys `ndv` and `fpp`. The value for `ndv` must be a
positive
+ integer. If the 'ndv' key is not present, the default value of `1048576`
will be
+ used. The value for `fpp` must be a float between 0.0 and 1.0. If the
`fpp` key
+ is not present, the default value of `0.05` will be used.
+ - A boolean, with ``True`` indicating that a Bloom filter should be
produced with
+ the above mentioned default values of `ndv=1048576` and `fpp=0.05`. This
is
+ equivalent to passing an empty dict.
"""
_parquet_writer_example_doc = """\
@@ -1985,6 +2011,7 @@ def write_table(table, where, row_group_size=None,
version='2.6',
store_decimal_as_integer=False,
write_time_adjusted_to_utc=False,
max_rows_per_page=None,
+ bloom_filter_options=None,
**kwargs):
# Implementor's note: when adding keywords here / updating defaults, also
# update it in write_to_dataset and _dataset_parquet.pyx
ParquetFileWriteOptions
@@ -2018,6 +2045,7 @@ def write_table(table, where, row_group_size=None,
version='2.6',
store_decimal_as_integer=store_decimal_as_integer,
write_time_adjusted_to_utc=write_time_adjusted_to_utc,
max_rows_per_page=max_rows_per_page,
+ bloom_filter_options=bloom_filter_options,
**kwargs) as writer:
writer.write_table(table, row_group_size=row_group_size)
except Exception:
diff --git a/python/pyarrow/tests/parquet/test_basic.py
b/python/pyarrow/tests/parquet/test_basic.py
index 03fcf2defe..4f26f22b10 100644
--- a/python/pyarrow/tests/parquet/test_basic.py
+++ b/python/pyarrow/tests/parquet/test_basic.py
@@ -19,6 +19,7 @@ import os
import sys
from collections import OrderedDict
import io
+import re
import warnings
from shutil import copytree
from decimal import Decimal
@@ -620,6 +621,73 @@ def test_lz4_raw_compression_alias():
_check_roundtrip(table, expected=table, compression="LZ4_RAW")
+def test_bloom_filter_options():
+ arr_int = pa.array(list(map(int, range(100))))
+ arr_bin = pa.array([str(x) for x in range(100)], type=pa.binary())
+ data = [arr_int, arr_bin]
+ table = pa.Table.from_arrays(data, names=['a', 'b'])
+
+ # bloom filter for one column
+ _check_roundtrip(table, expected=table, bloom_filter_options={
+ 'a': {'ndv': 100, 'fpp': 0.05}})
+
+ # bloom filter for two columns
+ _check_roundtrip(table, expected=table, bloom_filter_options={
+ 'a': {'ndv': 100, 'fpp': 0.05}, 'b': {'ndv': 10, 'fpp':
0.1}})
+
+ # bloom filter for one column with default ndv
+ _check_roundtrip(table, expected=table, bloom_filter_options={
+ 'a': {'fpp': 0.05}})
+
+ # bloom filter for one column with default fpp
+ _check_roundtrip(table, expected=table, bloom_filter_options={
+ 'a': {'ndv': 100}})
+
+ # bloom filter for one column with default ndv and fpp
+ _check_roundtrip(table, expected=table, bloom_filter_options={
+ 'a': {}})
+ _check_roundtrip(table, expected=table, bloom_filter_options={
+ 'a': True})
+
+ # should remain disabled
+ _check_roundtrip(table, expected=table, bloom_filter_options={
+ 'a': False})
+
+ # wrong type for ndv
+ buf = io.BytesIO()
+ expect = "'bloom_filter_options:ndv' for column 'a' must be an int"
+ with pytest.raises(TypeError, match=expect):
+ _write_table(table, buf, bloom_filter_options={
+ 'a': {'ndv': '100', 'fpp': 0.05}})
+
+ # wrong type for fpp
+ expect = "'bloom_filter_options:fpp' for column 'a' must be a float"
+ with pytest.raises(TypeError, match=expect):
+ _write_table(table, buf, bloom_filter_options={
+ 'a': {'ndv': 100, 'fpp': '0.05'}})
+
+ # wrong type for options
+ with pytest.raises(TypeError, match="'bloom_filter_options' must be a
dictionary"):
+ _write_table(table, buf, bloom_filter_options=True)
+
+ # invalid ndv value
+ expect = \
+ "'bloom_filter_options:ndv' for column 'a' must be greater than zero,
got -10"
+ with pytest.raises(ValueError, match=expect):
+ _write_table(table, buf, bloom_filter_options={
+ 'a': {'ndv': -10}})
+
+ # invalid fpp values
+ expect = "'bloom_filter_options:fpp' for column 'a' must be in (0.0, 1.0),
got 2.0"
+ with pytest.raises(ValueError, match=re.escape(expect)):
+ _write_table(table, buf, bloom_filter_options={
+ 'a': {'fpp': 2.0}})
+ expect = "'bloom_filter_options:fpp' for column 'a' must be in (0.0, 1.0),
got -0.5"
+ with pytest.raises(ValueError, match=re.escape(expect)):
+ _write_table(table, buf, bloom_filter_options={
+ 'a': {'fpp': -0.5}})
+
+
def test_sanitized_spark_field_names():
a0 = pa.array([0, 1, 2, 3, 4])
name = 'prohib; ,\t{}'