ShyamalaGowri commented on code in PR #15311: URL: https://github.com/apache/iceberg/pull/15311#discussion_r2983218040
########## spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/BloomFilterIndexUtil.java: ########## @@ -0,0 +1,695 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.BitSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.PartitionScanTask; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.StatisticsFile; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.puffin.Blob; +import org.apache.iceberg.puffin.BlobMetadata; +import org.apache.iceberg.puffin.FileMetadata; +import org.apache.iceberg.puffin.Puffin; +import org.apache.iceberg.puffin.PuffinCompressionCodec; +import org.apache.iceberg.puffin.PuffinReader; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.SparkTableUtil; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.functions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Minimal utilities for building and consulting a Bloom-filter-based file-skipping index stored in + * Puffin statistics files. + * + * <p>This is a proof-of-concept implementation intended to demonstrate that Bloom filters can + * improve query performance by pruning data files. It is intentionally limited in scope: + * + * <ul> + * <li>Only equality predicates on a single column are supported. + * <li>Only un-nested, primitive columns are supported. + * <li>Bloom filters are built per data file using Spark and stored in a single statistics file + * per snapshot. + * <li>The index is best-effort: if anything looks inconsistent or unsupported, callers should + * ignore it and fall back to normal planning. + * </ul> + */ +public class BloomFilterIndexUtil { + + private static final Logger LOG = LoggerFactory.getLogger(BloomFilterIndexUtil.class); + + // Blob type used for Bloom filters inside Puffin statistics files. + // Kept package-private so both actions and scan-side code can share it. + static final String BLOOM_FILTER_BLOB_TYPE = "bloom-filter-v1"; + + // Property keys on each Bloom blob + static final String PROP_DATA_FILE = "data-file"; + static final String PROP_COLUMN_NAME = "column-name"; + static final String PROP_FPP = "fpp"; + static final String PROP_NUM_VALUES = "num-values"; + static final String PROP_NUM_BITS = "num-bits"; + static final String PROP_NUM_HASHES = "num-hashes"; + + // Heuristic Bloom filter sizing for the POC + private static final double DEFAULT_FPP = 0.01; + private static final long DEFAULT_EXPECTED_VALUES_PER_FILE = 100_000L; + + private BloomFilterIndexUtil() {} + + /** + * Build Bloom-filter blobs for a single column of a snapshot and return them in memory. + * + * <p>This uses Spark to read the table at a given snapshot, groups by input file, and collects + * values for the target column on the driver to build per-file Bloom filters. It is suitable for + * small/medium demo tables, not for production-scale index building. + */ + static List<Blob> buildBloomBlobsForColumn( + SparkSession spark, Table table, Snapshot snapshot, String columnName) { + Preconditions.checkNotNull(snapshot, "snapshot must not be null"); + Preconditions.checkArgument( + columnName != null && !columnName.isEmpty(), "columnName must not be null/empty"); + + Dataset<Row> df = SparkTableUtil.loadTable(spark, table, snapshot.snapshotId()); + + // Attach per-row file path using Spark's built-in function. This relies on the underlying + // reader exposing file information, which is already true for Iceberg's Spark integration. + Column fileCol = functions.input_file_name().alias("_file"); + Dataset<Row> fileAndValue = + df.select(functions.col(columnName), fileCol).na().drop(); // drop nulls for simplicity + + Dataset<Row> perFileValues = + fileAndValue.groupBy("_file").agg(functions.collect_list(columnName).alias("values")); + + List<Row> rows = perFileValues.collectAsList(); + LOG.info( + "Building Bloom-filter blobs for column {} in table {} (snapshot {}, {} file group(s))", + columnName, + table.name(), + snapshot.snapshotId(), + rows.size()); + + Schema schema = table.schemas().get(snapshot.schemaId()); + Types.NestedField field = schema.findField(columnName); + Preconditions.checkArgument( + field != null, "Cannot find column %s in schema %s", columnName, schema); + Preconditions.checkArgument( + supportedFieldType(field.type()), + "Unsupported Bloom index column type %s for column %s (supported: string, int, long, uuid)", + field.type(), + columnName); + + List<Blob> blobs = Lists.newArrayListWithExpectedSize(rows.size()); + + for (Row row : rows) { + String filePath = row.getString(0); + @SuppressWarnings("unchecked") + List<Object> values = row.getList(1); + + if (values == null || values.isEmpty()) { + continue; + } + + SimpleBloomFilter bloom = + SimpleBloomFilter.create( + (int) Math.max(DEFAULT_EXPECTED_VALUES_PER_FILE, values.size()), DEFAULT_FPP); + + long nonNullCount = 0L; + for (Object value : values) { + if (value != null) { + byte[] canonicalBytes = canonicalBytes(value); + Preconditions.checkArgument( + canonicalBytes != null, + "Unsupported Bloom index value type %s for column %s", + value.getClass().getName(), + columnName); + bloom.put(canonicalBytes); + nonNullCount++; + } + } + + if (nonNullCount == 0L) { + continue; + } + + ByteBuffer serialized = serializeBloomBits(bloom); + + Map<String, String> properties = + ImmutableMap.of( + PROP_DATA_FILE, + filePath, + PROP_COLUMN_NAME, + columnName, + PROP_FPP, + String.valueOf(DEFAULT_FPP), + PROP_NUM_VALUES, + String.valueOf(nonNullCount), + PROP_NUM_BITS, + String.valueOf(bloom.numBits()), + PROP_NUM_HASHES, + String.valueOf(bloom.numHashFunctions())); + + Blob blob = + new Blob( + BLOOM_FILTER_BLOB_TYPE, + ImmutableList.of(field.fieldId()), + snapshot.snapshotId(), + snapshot.sequenceNumber(), + serialized, + PuffinCompressionCodec.ZSTD, + properties); + + blobs.add(blob); + } + + return blobs; + } + + private static ByteBuffer serializeBloomBits(SimpleBloomFilter bloom) { + return ByteBuffer.wrap(bloom.toBitsetBytes()); + } + + private static byte[] toByteArray(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + } + + /** + * Use Bloom-filter blobs stored in statistics files to prune data files for an equality predicate + * on a single column. + * + * <p>The caller is responsible for: + * + * <ul> + * <li>Ensuring the predicate is an equality of a supported literal type. + * <li>Passing the snapshot used for planning (so we can ignore stale stats). + * <li>Falling back cleanly if this method returns the original tasks. + * </ul> + * + * @param table the Iceberg table + * @param snapshot the snapshot being scanned (may be null) + * @param tasksSupplier supplier that returns the already planned tasks (typically calls + * super.tasks()) + * @param columnName the column name used in the equality predicate + * @param literalValue the literal value used in the equality predicate + * @return either the original tasks or a filtered subset if Bloom pruning was applied + */ + public static <T extends PartitionScanTask> List<T> pruneTasksWithBloomIndex( + Table table, + Snapshot snapshot, + Supplier<List<T>> tasksSupplier, + String columnName, + Object literalValue) { + + if (snapshot == null) { + return tasksSupplier.get(); + } + + List<StatisticsFile> statsFilesForSnapshot = + table.statisticsFiles().stream() + .filter(sf -> sf.snapshotId() == snapshot.snapshotId()) + .collect(Collectors.toList()); + + if (statsFilesForSnapshot.isEmpty()) { + return tasksSupplier.get(); + } + + byte[] literalBytes = canonicalBytes(literalValue); + if (literalBytes == null) { + // Unsupported literal type for this portable encoding; do not prune. + return tasksSupplier.get(); + } + String columnNameLower = columnName.toLowerCase(Locale.ROOT); + + Set<String> candidateFiles = + loadCandidateFilesFromBloom(table, statsFilesForSnapshot, columnNameLower, literalBytes); + + if (candidateFiles == null) { + // Index missing/unusable; do not change planning. + return tasksSupplier.get(); + } + + if (candidateFiles.isEmpty()) { + // Bloom filters have no false negatives. If the index is usable but no files matched, we can + // safely prune to an empty scan without planning tasks. + return ImmutableList.of(); + } + + List<T> tasks = tasksSupplier.get(); + List<T> filtered = + tasks.stream() + .filter( + task -> { + if (task instanceof ContentScanTask) { + ContentScanTask<?> contentTask = (ContentScanTask<?>) task; + String path = contentTask.file().path().toString(); + return candidateFiles.contains(path); + } + // If we don't know how to interpret the task, keep it for safety. + return true; + }) + .collect(Collectors.toList()); + + if (filtered.size() == tasks.size()) { Review Comment: @huaxingao have a query regarding this. I had tried testing this PR, where my test data was 24 files and based on bloom filter pruning it resulted in 12 files. But it never went beyond `if (filtered.size() == tasks.size())` to summarize the test results. I notice that following could be happening - 1. Bloom Filter Loads FIRST Set<String> candidateFiles = loadCandidateFilesFromBloom(...); Reads Bloom filter statistics file Tests all 24 Bloom filters against literal 'a' (my search string) Result: candidateFiles = 12 file paths Happens BEFORE tasksSupplier.get() is called 2. Tasks Are Created SECOND List<T> tasks = tasksSupplier.get(); // NOW super::tasks is evaluated Only now does super.tasks() get called By this time, candidateFiles already exists with 12 files ------- I am also attaching the test data with which i have tried // Create table ``` spark.sql(""" CREATE TABLE my_catalog.db.bloom_test_skewed ( id BIGINT, user_id STRING, timestamp TIMESTAMP, name STRING, category STRING ) USING iceberg """) ``` // 80% of data has category 'premium' with names a-e ``` spark.range(0, 800000) .selectExpr( "id", "concat('user_', cast(id as string)) as user_id", "current_timestamp() as timestamp", "chr(97 + cast(floor(rand() * 5) as int)) as name", // a-e "'premium' as category" ) .writeTo("my_catalog.db.bloom_test_skewed") .append() ``` // 20% of data has category 'standard' with names v-z ``` spark.range(800000, 1000000) .selectExpr( "id", "concat('user_', cast(id as string)) as user_id", "current_timestamp() as timestamp", "chr(118 + cast(floor(rand() * 5) as int)) as name", // v-z "'standard' as category" ) .writeTo("my_catalog.db.bloom_test_skewed") .append() val stable1 = Spark3Util.loadIcebergTable( spark, "my_catalog.db.bloom_test_skewed" ) ``` // Build bloom filter ``` SparkActions.get() .buildBloomFilterIndex(stable1) .column("name") .execute() ``` // Test queries that should show pruning `spark.sql("SELECT * FROM my_catalog.db.bloom_test_skewed WHERE name = 'a'").count() // Should prune files without a` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
