yabola commented on code in PR #1042:
URL: https://github.com/apache/parquet-mr/pull/1042#discussion_r1180115266


##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, 
candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int 
maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or 
equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate 
candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of 
`1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, 
minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = 
candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to 
create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct 
values.
+   * The expected number result may be slightly smaller than what `numBytes` 
can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;
+    while (optimalBytes < numBytes) {
+      expectedNDV += step;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) 
/ 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= step;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bitsets size should be power of 2, see 
[[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateTwoPowerSize(int numBytes) {

Review Comment:
   done



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;

Review Comment:
   changed to use long



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to