yabola commented on code in PR #1042:
URL: https://github.com/apache/parquet-mr/pull/1042#discussion_r1190671525


##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/AdaptiveBlockSplitBloomFilter.java:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values.
+ * `AdaptiveBlockSplitBloomFilter` contains multiple `BlockSplitBloomFilter` 
as candidates and inserts values in
+ * the candidates at the same time. Finally we will choose the smallest 
candidate to write out.
+ */
+public class AdaptiveBlockSplitBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AdaptiveBlockSplitBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of one candidate, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and also used as an approximate 
deduplication counter
+  private BloomFilterCandidate largestCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private long distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  // indicates the step size to find the NDV value corresponding to numBytes
+  private static final int NDV_STEP = 500;
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  private int minimumCandidateNdv = 16;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter.
+   *
+   * @param maximumBytes  the maximum bytes size of candidate
+   * @param numCandidates the number of candidates
+   * @param fpp           the false positive probability
+   */
+  public AdaptiveBlockSplitBloomFilter(int maximumBytes, int numCandidates, 
double fpp, ColumnDescriptor column) {
+    this(maximumBytes, HashStrategy.XXH64, fpp, numCandidates, column);
+  }
+
+  public AdaptiveBlockSplitBloomFilter(int maximumBytes, HashStrategy 
hashStrategy, double fpp,
+    int numCandidates, ColumnDescriptor column) {
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(maximumBytes, numCandidates, fpp);
+  }
+
+  /**
+   * This method will generate candidates according to the maximum acceptable 
bytes size of bloom filter.
+   * Because the bytes size of the candidate need to be a power of 2, here we 
set the candidate size to be
+   * a proportion of `maxBytes` like `1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum bytes size of candidate
+   * @param numCandidates the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int numCandidates, double fpp) {
+    int candidateByteSize = calculateBoundedPowerOfTwo(maxBytes);
+    for (int i = 0; i < numCandidates; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, 
minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateBoundedPowerOfTwo(candidateByteSize / 2);
+    }
+    if (candidates.isEmpty()) {
+      // `maxBytes` is too small, but at least one candidate will be 
generated, 32 bytes size and can accept 16 distinct values.
+      candidates.add(new BloomFilterCandidate(minimumCandidateNdv, 
minimumBytes, minimumBytes, maximumBytes, hashStrategy));
+    }
+    largestCandidate = 
candidates.stream().max(BloomFilterCandidate::compareTo).get();
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct 
values.
+   * The expected number result may be slightly smaller than what `numBytes` 
can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    while (optimalBytes < numBytes) {
+      expectedNDV += NDV_STEP;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) 
/ 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= NDV_STEP;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bytes size should be power of 2, see 
[[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateBoundedPowerOfTwo(int numBytes) {
+    if (numBytes < minimumBytes) {
+      numBytes = minimumBytes;
+    }
+    // if `numBytes` is not power of 2, get the largest power of two less than 
`numBytes`
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes);
+    }
+    numBytes = Math.min(numBytes, maximumBytes);
+    numBytes = Math.max(numBytes, minimumBytes);
+    return numBytes;
+  }
+
+  /**
+   * Used at the end of the insertion, select the candidate of the smallest 
size.
+   * At least one of the largest candidates will be kept when inserting data.
+   *
+   * @return the smallest and optimal candidate
+   */
+  protected BloomFilterCandidate optimalCandidate() {
+    return candidates.stream().min(BloomFilterCandidate::compareTo).get();
+  }
+
+  protected List<BloomFilterCandidate> getCandidates() {
+    return candidates;
+  }
+
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    finalized = true;
+    BloomFilterCandidate optimalBloomFilter = optimalCandidate();
+    optimalBloomFilter.bloomFilter.writeTo(out);
+    String columnName = column != null && column.getPath() != null ? 
Arrays.toString(column.getPath()) : "unknown";
+    LOG.info("The number of distinct values in {} is approximately {}, the 
optimal bloom filter can accept {}"
+        + " distinct values, byte size is {}.",
+      columnName, distinctValueCounter, optimalBloomFilter.getExpectedNDV(),
+      optimalBloomFilter.bloomFilter.getBitsetSize());
+  }
+
+  /**
+   * Insert an element to the multiple bloom filter candidates and remove the 
bad candidate
+   * if the number of distinct values exceeds its expected size.
+   *
+   * @param hash the hash result of element.
+   */
+  @Override
+  public void insertHash(long hash) {
+    Preconditions.checkArgument(!finalized,
+      "AdaptiveBlockSplitBloomFilter insertion has been mark as finalized, no 
more data is allowed!");
+    if (!largestCandidate.bloomFilter.findHash(hash)) {
+      distinctValueCounter++;

Review Comment:
   Yeah, I modified the variable name, thank you.



##########
parquet-hadoop/README.md:
##########
@@ -211,6 +211,23 @@ conf.set("parquet.bloom.filter.enabled#column.path", 
false);
 
 ---
 
+**Property:** `parquet.bloom.filter.adaptive.enabled`  
+**Description:** Whether to enable writing adaptive bloom filter.  
+If it is true, the bloom filter will be generated with the optimal bit size 
+according to the number of real data distinct values. If it is false, it will 
not take effect.
+Note that the maximum bytes of the bloom filter will not exceed 
`parquet.bloom.filter.max.bytes` configuration.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to