[ https://issues.apache.org/jira/browse/PARQUET-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17702293#comment-17702293 ]
ASF GitHub Bot commented on PARQUET-2254: ----------------------------------------- yabola commented on code in PR #1042: URL: https://github.com/apache/parquet-mr/pull/1042#discussion_r1141357582 ########## parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java: ########## @@ -0,0 +1,317 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.parquet.column.values.bloomfilter; + +import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES; +import static org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Arrays; +import java.util.Objects; +import java.util.TreeSet; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.parquet.Preconditions; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.io.api.Binary; + +/** + * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as candidates and inserts values in + * the candidates at the same time. + * The purpose of this is to finally generate a bloom filter with the optimal bit size according to the number + * of real data distinct values. Use the largest bloom filter as an approximate deduplication counter, and then + * remove bad bloom filter candidate during data insertion. + */ +public class DynamicBlockBloomFilter implements BloomFilter { + + private static final Logger LOG = LoggerFactory.getLogger(DynamicBlockBloomFilter.class); + + // multiple candidates, inserting data at the same time. If the deduplication value is greater than the + // expected NDV of candidate, it will be removed and finally choose the smallest candidate to write out. + private final TreeSet<BloomFilterCandidate> candidates = new TreeSet<>(); + + // the largest among candidates and used as an approximate deduplication counter + private BloomFilterCandidate maxCandidate; + + // the accumulator of the number of distinct values that have been inserted so far + private int distinctValueCounter = 0; + + // indicates that the bloom filter candidate has been written out and new data should be no longer allowed to be inserted + private boolean finalized = false; + + private int maximumBytes = UPPER_BOUND_BYTES; + private int minimumBytes = LOWER_BOUND_BYTES; + // the hash strategy used in this bloom filter. + private final HashStrategy hashStrategy; + // the column to build bloom filter + private ColumnDescriptor column; + + public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, ColumnDescriptor column) { + this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, fpp, candidatesNum, column); + } + + public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int candidatesNum, double fpp, ColumnDescriptor column) { + this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, candidatesNum, column); + } + + public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int maximumBytes, HashStrategy hashStrategy, + double fpp, int candidatesNum, ColumnDescriptor column) { + if (minimumBytes > maximumBytes) { + throw new IllegalArgumentException("the minimum bytes should be less or equal than maximum bytes"); + } + + if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) { + this.minimumBytes = minimumBytes; + } + + if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) { + this.maximumBytes = maximumBytes; + } + this.column = column; + switch (hashStrategy) { + case XXH64: + this.hashStrategy = hashStrategy; + break; + default: + throw new RuntimeException("Unsupported hash strategy"); + } + initCandidates(numBytes, candidatesNum, fpp); + } + + /** + * Given the maximum acceptable bytes size of bloom filter, generate candidates according + * to the max expected distinct values. The size of the candidate bytes needs to be a + * square number of 2. Therefore, set the candidate size according to `maxBytes` of `1/2`, `1/4`, `1/8`, etc. + * + * @param maxBytes the maximum acceptable bit size + * @param candidatesNum the number of candidates + * @param fpp the false positive probability + */ + private void initCandidates(int maxBytes, int candidatesNum, double fpp) { Review Comment: In the original implementation, `Integer.highestOneBit(numBytes)` got the largest power of two less than `numBytes`. And I think `<< 1` here makes result larger to achieve the effect of receiving NVD values. But it could be a bug when NVD is empty and I fix it in https://github.com/apache/parquet-mr/pull/1043 > Build a BloomFilter with a more precise size > -------------------------------------------- > > Key: PARQUET-2254 > URL: https://issues.apache.org/jira/browse/PARQUET-2254 > Project: Parquet > Issue Type: Improvement > Reporter: Mars > Assignee: Mars > Priority: Major > > Now the usage is to specify the size, and then build BloomFilter. In general > scenarios, it is actually not sure how much the distinct value is. > If BloomFilter can be automatically generated according to the data, the file > size can be reduced and the reading efficiency can also be improved. > I have an idea that the user can specify a maximum BloomFilter filter size, > then we build multiple BloomFilter at the same time, we can use the largest > BloomFilter as a counting tool( If there is no hit when inserting a value, > the counter will be +1, of course this may be imprecise but enough) > Then at the end of the write, choose a BloomFilter of a more appropriate size > when the file is finally written. > I want to implement this feature and hope to get your opinions, thank you -- This message was sent by Atlassian Jira (v8.20.10#820010)