[ https://issues.apache.org/jira/browse/PARQUET-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17707541#comment-17707541 ]
ASF GitHub Bot commented on PARQUET-2254: ----------------------------------------- yabola commented on code in PR #1042: URL: https://github.com/apache/parquet-mr/pull/1042#discussion_r1155095790 ########## parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java: ########## @@ -0,0 +1,316 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.column.values.bloomfilter; + +import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES; +import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Objects; +import java.util.TreeSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.parquet.Preconditions; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.io.api.Binary; + +/** + * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in + * the candidates at the same time. + * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number + * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then + * remove incapable bloom filter candidate during data insertion. + */ +public class DynamicBlockBloomFilter implements BloomFilter { + + private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class); + + // multiple candidates, inserting data at the same time. If the distinct values are greater than the + // expected NDV of candidates, it will be removed. Finally we will choose the smallest candidate to write out. + private final TreeSet<BloomFilterCandidate> candidates = new TreeSet<>(); + + // the largest among candidates and used as an approximate deduplication counter + private BloomFilterCandidate maxCandidate; + + // the accumulator of the number of distinct values that have been inserted so far + private int distinctValueCounter = 0; + + // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted + private boolean finalized = false; + + private int maximumBytes = UPPER_BOUND_BYTES; + private int minimumBytes = LOWER_BOUND_BYTES; + // the hash strategy used in this bloom filter. + private final HashStrategy hashStrategy; + // the column to build bloom filter + private ColumnDescriptor column; + + public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) { + this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column); + } + + public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) { + this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, candidatesNum, column); + } + + public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy, + double fpp, int candidatesNum, ColumnDescriptor column) { + if (minimumBytes > maximumBytes) { + throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes"); + } + + if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) { + this.minimumBytes = minimumBytes; + } + + if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) { + this.maximumBytes = maximumBytes; + } + this.column = column; + switch (hashStrategy) { + case XXH64: + this.hashStrategy = hashStrategy; + break; + default: + throw new RuntimeException("Unsupported hash strategy"); + } + initCandidates(numBytes, candidatesNum, fpp); + } + + /** + * Given the maximum acceptable bytes size of bloom filter, generate candidates according + * to the bytes size. The bytes size of the candidate needs to be a + * power of 2. Therefore, set the candidate size according to `maxBytes` of `1/2`, `1/4`, `1/8`, etc. + * + * @param maxBytes the maximum acceptable bit size + * @param candidatesNum the number of candidates + * @param fpp the false positive probability + */ + private void initCandidates(int maxBytes, int candidatesNum, double fpp) { + int candidateByteSize = calculateTwoPowerSize(maxBytes); + for (int i = 1; i <= candidatesNum; i++) { + int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp); + // `candidateByteSize` is too small, just drop it + if (candidateExpectedNDV <= 0) { + break; + } + BloomFilterCandidate candidate = + new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, minimumBytes, maximumBytes, hashStrategy); + candidates.add(candidate); + candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2); + } + maxCandidate = candidates.last(); + } + + /** + * According to the size of bytes, calculate the expected number of distinct values. + * The expected number result may be slightly smaller than what `numBytes` can support. + * + * @param numBytes the bytes size + * @param fpp the false positive probability + * @return the expected number of distinct values + */ + private int expectedNDV(int numBytes, double fpp) { + int expectedNDV = 0; + int optimalBytes = 0; + int step = 500; + while (optimalBytes < numBytes) { + expectedNDV += step; + optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) / 8; + } + // make sure it is slightly smaller than what `numBytes` can support + expectedNDV -= step; + // numBytes is too small + if (expectedNDV <= 0) { + expectedNDV = 0; + } + return expectedNDV; + } + + /** + * BloomFilter bitsets size should be power of 2, see [[BlockSplitBloomFilter#initBitset]] + * + * @param numBytes the bytes size + * @return the largest power of 2 less or equal to numBytes + */ + private int calculateTwoPowerSize(int numBytes) { + if (numBytes < minimumBytes) { + numBytes = minimumBytes; + } + // if `numBytes` is not power of 2, get the next largest power of two less than `numBytes` + if ((numBytes & (numBytes - 1)) != 0) { + numBytes = Integer.highestOneBit(numBytes); + } + numBytes = Math.min(numBytes, maximumBytes); + numBytes = Math.max(numBytes, minimumBytes); + return numBytes; + } + + /** + * Used at the end of the insertion, select the candidate of the smallest size. + * At least one of the largest candidates will be kept when inserting data. + * + * @return the smallest and optimal candidate + */ + protected BloomFilterCandidate optimalCandidate() { + return candidates.stream() Review Comment: I changed to use `ArrayList` > Build a BloomFilter with a more precise size > -------------------------------------------- > > Key: PARQUET-2254 > URL: https://issues.apache.org/jira/browse/PARQUET-2254 > Project: Parquet > Issue Type: Improvement > Reporter: Mars > Assignee: Mars > Priority: Major > > h3. Why are the changes needed? > Now the usage of bloom filter is to specify the NDV(number of distinct > values), and then build BloomFilter. In general scenarios, it is actually not > sure how much the distinct value is. > If BloomFilter can be automatically generated according to the data, the file > size can be reduced and the reading efficiency can also be improved. > h3. What changes were proposed in this pull request? > {{DynamicBlockBloomFilter}} contains multiple {{BlockSplitBloomFilter}} as > candidates and inserts values in the candidates at the same time. Use the > largest bloom filter as an approximate deduplication counter, and then remove > incapable bloom filter candidates during data insertion. -- This message was sent by Atlassian Jira (v8.20.10#820010)