[ 
https://issues.apache.org/jira/browse/PARQUET-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17702291#comment-17702291
 ] 

ASF GitHub Bot commented on PARQUET-2254:
-----------------------------------------

yabola commented on code in PR #1042:
URL: https://github.com/apache/parquet-mr/pull/1042#discussion_r1141356477


##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove bad bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the 
deduplication value is greater than the
+  // expected NDV of candidate, it will be removed and finally choose the 
smallest candidate to write out.
+  private final TreeSet<BloomFilterCandidate> candidates = new TreeSet<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, 
candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int 
maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or 
equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate 
candidates according
+   * to the max expected distinct values. The size of the candidate bytes 
needs to be a
+   * square number of 2. Therefore, set the candidate size according to 
`maxBytes` of `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {

Review Comment:
   In `BlockSplitBloomFilter` , the `byteSize` will be set to the size of the 
power of 2, therefore here the candidates are also divided according to the 
power of 2 (`maxBytes` of `1/2`, `1/4`, `1/8`, etc )
   
   
https://github.com/apache/parquet-mr/blob/1235003e742e6a76bf6cb8f7ed33e942fa12d0d5/parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/BlockSplitBloomFilter.java#L206-L219



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove bad bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the 
deduplication value is greater than the
+  // expected NDV of candidate, it will be removed and finally choose the 
smallest candidate to write out.
+  private final TreeSet<BloomFilterCandidate> candidates = new TreeSet<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, 
candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int 
maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or 
equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate 
candidates according
+   * to the max expected distinct values. The size of the candidate bytes 
needs to be a
+   * square number of 2. Therefore, set the candidate size according to 
`maxBytes` of `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {

Review Comment:
   In the original implementation,  `Integer.highestOneBit(numBytes)` got the 
largest power of two less than `numBytes`.  And I think `<< 1` here makes 
result larger to achieve the effect of receiving NVD values.





> Build a BloomFilter with a more precise size
> --------------------------------------------
>
>                 Key: PARQUET-2254
>                 URL: https://issues.apache.org/jira/browse/PARQUET-2254
>             Project: Parquet
>          Issue Type: Improvement
>            Reporter: Mars
>            Assignee: Mars
>            Priority: Major
>
> Now the usage is to specify the size, and then build BloomFilter. In general 
> scenarios, it is actually not sure how much the distinct value is. 
> If BloomFilter can be automatically generated according to the data, the file 
> size can be reduced and the reading efficiency can also be improved.
> I have an idea that the user can specify a maximum BloomFilter filter size, 
> then we build multiple BloomFilter at the same time, we can use the largest 
> BloomFilter as a counting tool( If there is no hit when inserting a value, 
> the counter will be +1, of course this may be imprecise but enough)
> Then at the end of the write, choose a BloomFilter of a more appropriate size 
> when the file is finally written.
> I want to implement this feature and hope to get your opinions, thank you



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to