[ 
https://issues.apache.org/jira/browse/PARQUET-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17715552#comment-17715552
 ] 

ASF GitHub Bot commented on PARQUET-2254:
-----------------------------------------

wgtmac commented on code in PR #1042:
URL: https://github.com/apache/parquet-mr/pull/1042#discussion_r1174492807


##########
parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java:
##########
@@ -503,6 +523,30 @@ public Builder withBloomFilterEnabled(boolean enabled) {
       return this;
     }
 
+    /**
+     * Whether to use dynamic bloom filter to automatically adjust the bloom 
filter size according to
+     * `parquet.bloom.filter.max.bytes`.
+     * If NDV (number of distinct values) for a specified column is set, it 
will be ignored
+     *
+     * @param enabled whether to use dynamic bloom filter
+     */
+    public Builder withDynamicBloomFilterEnabled(boolean enabled) {
+      this.dynamicBloomFilterEnabled.withDefaultValue(enabled);
+      return this;
+    }
+
+    /**
+     * When `DynamicBloomFilter` is enabled, set how many bloomFilters to 
split as candidates.

Review Comment:
   ```suggestion
        * When `DynamicBloomFilter` is enabled, set how many bloom filter 
candidates to use.
   ```



##########
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java:
##########
@@ -19,6 +19,11 @@
 package org.apache.parquet.hadoop;
 
 import static java.util.Arrays.asList;
+import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;

Review Comment:
   Why is this file changed?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {

Review Comment:
   ```suggestion
     public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
numCandidates, double fpp, ColumnDescriptor column) {
   ```



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, 
candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int 
maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or 
equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate 
candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of 
`1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, 
minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = 
candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to 
create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct 
values.
+   * The expected number result may be slightly smaller than what `numBytes` 
can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;

Review Comment:
   Should we put the magic number as a static variable and define it on top of 
the class? And why 500 instead of a power of two?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {

Review Comment:
   It could be either `DynamicBloomFilter` or `DynamicBlockSplitBloomFilter`, 
but not in between. Please also fix the comment above.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java:
##########
@@ -152,6 +153,8 @@ public static enum JobSummaryLevel {
   public static final String BLOOM_FILTER_EXPECTED_NDV = 
"parquet.bloom.filter.expected.ndv";
   public static final String BLOOM_FILTER_MAX_BYTES = 
"parquet.bloom.filter.max.bytes";
   public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
+  public static final String DYNAMIC_BLOOM_FILTER_ENABLED = 
"parquet.bloom.filter.dynamic.enabled";

Review Comment:
   ```suggestion
     public static final String DYNAMIC_BLOOM_FILTER_ENABLED = 
"parquet.dynamic.bloom.filter.enabled";
   ```



##########
parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java:
##########
@@ -97,7 +97,13 @@ abstract class ColumnWriterBase implements ColumnWriter {
       int optimalNumOfBits = 
BlockSplitBloomFilter.optimalNumOfBits(ndv.getAsLong(), fpp.getAsDouble());
       this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8, 
maxBloomFilterSize);
     } else {
-      this.bloomFilter = new BlockSplitBloomFilter(maxBloomFilterSize, 
maxBloomFilterSize);
+      boolean useDynamicBloomFilter = props.getDynamicBloomFilterEnabled(path);
+      if(useDynamicBloomFilter) {

Review Comment:
   ```suggestion
         if (props.getDynamicBloomFilterEnabled(path)) {
   ```



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java:
##########
@@ -619,6 +619,28 @@ public SELF withBloomFilterFPP(String columnPath, double 
fpp) {
       return self();
     }
 
+    /**
+     * When NDV (number of distinct values) for a specified column is not set, 
whether to use
+     * `DynamicBloomFilter` to automatically adjust the BloomFilter size 
according to `parquet.bloom.filter.max.bytes`
+     *
+     * @param enabled whether to write bloom filter for the column
+     */
+    public SELF withDynamicBloomFilterEnabled(boolean enabled) {
+      encodingPropsBuilder.withDynamicBloomFilterEnabled(enabled);
+      return self();
+    }
+
+    /**
+     * When `DynamicBloomFilter` is enabled, set how many bloomFilters to 
split as candidates.

Review Comment:
   ```suggestion
        * When `DynamicBloomFilter` is enabled, set how many bloom filter 
candidates to use.
   ```



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;

Review Comment:
   ```suggestion
     private BloomFilterCandidate largestCandidate;
   ```



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {

Review Comment:
   ```suggestion
   public class DynamicBlockSplitBloomFilter implements BloomFilter {
   ```



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {

Review Comment:
   ```suggestion
     public DynamicBlockBloomFilter(int numBytes, int numCandidates, double 
fpp, ColumnDescriptor column) {
   ```



##########
parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDynamicBlockBloomFiltering.java:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.parquet.ParquetReadOptions;
+import org.apache.parquet.crypto.FileDecryptionProperties;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+
+public class TestDynamicBlockBloomFiltering extends TestBloomFiltering {
+
+  @BeforeClass
+  public static void createFiles() throws IOException {
+    createFiles(true);
+  }
+
+  public TestDynamicBlockBloomFiltering(Path file, boolean isEncrypted) {
+    super(file, isEncrypted);
+  }
+
+  @Test
+  public void testSimpleFiltering() throws IOException {
+    super.testSimpleFiltering();
+  }
+
+  @Test
+  public void testNestedFiltering() throws IOException {
+    super.testNestedFiltering();
+  }
+
+  @Test
+  public void checkBloomFilterSize() throws IOException {

Review Comment:
   Should we add a case where a bloom filter will not be created eventually?



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java:
##########
@@ -152,6 +153,8 @@ public static enum JobSummaryLevel {
   public static final String BLOOM_FILTER_EXPECTED_NDV = 
"parquet.bloom.filter.expected.ndv";
   public static final String BLOOM_FILTER_MAX_BYTES = 
"parquet.bloom.filter.max.bytes";
   public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
+  public static final String DYNAMIC_BLOOM_FILTER_ENABLED = 
"parquet.bloom.filter.dynamic.enabled";
+  public static final String BLOOM_FILTER_CANDIDATE_SIZE = 
"parquet.bloom.filter.candidate.size";

Review Comment:
   ```suggestion
     public static final String NUM_DYNAMIC_BLOOM_FILTER_CANDIDATES = 
"parquet.num.dynamic.bloom.filter.candidates";
   ```



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, 
candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int 
maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or 
equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate 
candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of 
`1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, 
minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = 
candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to 
create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct 
values.
+   * The expected number result may be slightly smaller than what `numBytes` 
can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;
+    while (optimalBytes < numBytes) {
+      expectedNDV += step;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) 
/ 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= step;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bitsets size should be power of 2, see 
[[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateTwoPowerSize(int numBytes) {

Review Comment:
   The name is not precise as it may be clamped by the min/max values. 



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java:
##########
@@ -152,6 +153,8 @@ public static enum JobSummaryLevel {
   public static final String BLOOM_FILTER_EXPECTED_NDV = 
"parquet.bloom.filter.expected.ndv";
   public static final String BLOOM_FILTER_MAX_BYTES = 
"parquet.bloom.filter.max.bytes";
   public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
+  public static final String DYNAMIC_BLOOM_FILTER_ENABLED = 
"parquet.bloom.filter.dynamic.enabled";

Review Comment:
   Please document them in the README.md as well. It would be good to say that 
it may fail to create a bloom filter in the end.



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;

Review Comment:
   Is `int` enough? 



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, 
candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int 
maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or 
equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate 
candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of 
`1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, 
minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = 
candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to 
create one valid bloom filter");

Review Comment:
   Should we simply skip this candidate and add a warn log here? It may not be 
a fatal error.



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, 
candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int 
maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or 
equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate 
candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of 
`1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {

Review Comment:
   ```suggestion
     private void initCandidates(int maxBytes, int numCandidates, double fpp) {
   ```



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, 
candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int 
maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or 
equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate 
candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of 
`1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, 
minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = 
candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to 
create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct 
values.
+   * The expected number result may be slightly smaller than what `numBytes` 
can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;
+    while (optimalBytes < numBytes) {
+      expectedNDV += step;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) 
/ 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= step;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bitsets size should be power of 2, see 
[[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateTwoPowerSize(int numBytes) {
+    if (numBytes < minimumBytes) {
+      numBytes = minimumBytes;
+    }
+    // if `numBytes` is not power of 2, get the next largest power of two less 
than `numBytes`
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes);
+    }
+    numBytes = Math.min(numBytes, maximumBytes);
+    numBytes = Math.max(numBytes, minimumBytes);
+    return numBytes;
+  }
+
+  /**
+   * Used at the end of the insertion, select the candidate of the smallest 
size.
+   * At least one of the largest candidates will be kept when inserting data.
+   *
+   * @return the smallest and optimal candidate
+   */
+  protected BloomFilterCandidate optimalCandidate() {
+    return candidates.stream().min(BloomFilterCandidate::compareTo).get();

Review Comment:
   Is `candidates.get(0)` enough? Should we throw if candidates is empty?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, 
candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int 
maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or 
equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate 
candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of 
`1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, 
minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);

Review Comment:
   If `candidateByteSize` is power of two already in the line 113, can be 
simply apply `candidateByteSize /= 2` here?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, 
candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int 
maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or 
equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate 
candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of 
`1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, 
minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = 
candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to 
create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct 
values.
+   * The expected number result may be slightly smaller than what `numBytes` 
can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;
+    while (optimalBytes < numBytes) {
+      expectedNDV += step;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) 
/ 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= step;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bitsets size should be power of 2, see 
[[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateTwoPowerSize(int numBytes) {
+    if (numBytes < minimumBytes) {
+      numBytes = minimumBytes;
+    }
+    // if `numBytes` is not power of 2, get the next largest power of two less 
than `numBytes`
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes);
+    }
+    numBytes = Math.min(numBytes, maximumBytes);
+    numBytes = Math.max(numBytes, minimumBytes);
+    return numBytes;
+  }
+
+  /**
+   * Used at the end of the insertion, select the candidate of the smallest 
size.
+   * At least one of the largest candidates will be kept when inserting data.
+   *
+   * @return the smallest and optimal candidate
+   */
+  protected BloomFilterCandidate optimalCandidate() {
+    return candidates.stream().min(BloomFilterCandidate::compareTo).get();
+  }
+
+  protected List<BloomFilterCandidate> getCandidates() {
+    return candidates;
+  }
+
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    finalized = true;
+    BloomFilterCandidate optimalBloomFilter = optimalCandidate();
+    optimalBloomFilter.bloomFilter.writeTo(out);
+    String columnName = column != null && column.getPath() != null ? 
Arrays.toString(column.getPath()) : "unknown";
+    LOG.info("The number of distinct values in {} is approximately {}, the 
optimal bloom filter NDV is {}, byte size is {}.",
+      columnName, distinctValueCounter, optimalBloomFilter.getExpectedNDV(),
+      optimalBloomFilter.bloomFilter.getBitsetSize());
+  }
+
+  /**
+   * Insert an element to the multiple bloom filter candidates and remove the 
bad candidate
+   * if the number of distinct values exceeds its expected size.
+   *
+   * @param hash the hash result of element.
+   */
+  @Override
+  public void insertHash(long hash) {
+    Preconditions.checkArgument(!finalized,

Review Comment:
   Should we check `finalized` and `candidates.isEmpty()` ?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, 
candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int 
maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or 
equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate 
candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of 
`1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, 
minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = 
candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to 
create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct 
values.
+   * The expected number result may be slightly smaller than what `numBytes` 
can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;
+    while (optimalBytes < numBytes) {
+      expectedNDV += step;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) 
/ 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= step;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bitsets size should be power of 2, see 
[[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateTwoPowerSize(int numBytes) {
+    if (numBytes < minimumBytes) {
+      numBytes = minimumBytes;
+    }
+    // if `numBytes` is not power of 2, get the next largest power of two less 
than `numBytes`
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes);
+    }
+    numBytes = Math.min(numBytes, maximumBytes);
+    numBytes = Math.max(numBytes, minimumBytes);
+    return numBytes;
+  }
+
+  /**
+   * Used at the end of the insertion, select the candidate of the smallest 
size.
+   * At least one of the largest candidates will be kept when inserting data.
+   *
+   * @return the smallest and optimal candidate
+   */
+  protected BloomFilterCandidate optimalCandidate() {
+    return candidates.stream().min(BloomFilterCandidate::compareTo).get();
+  }
+
+  protected List<BloomFilterCandidate> getCandidates() {
+    return candidates;
+  }
+
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    finalized = true;

Review Comment:
   Check `finalized` first in case of re-entry.



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, 
candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int 
maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or 
equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate 
candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of 
`1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, 
minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = 
candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to 
create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct 
values.
+   * The expected number result may be slightly smaller than what `numBytes` 
can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;
+    while (optimalBytes < numBytes) {
+      expectedNDV += step;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) 
/ 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= step;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bitsets size should be power of 2, see 
[[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateTwoPowerSize(int numBytes) {
+    if (numBytes < minimumBytes) {
+      numBytes = minimumBytes;
+    }
+    // if `numBytes` is not power of 2, get the next largest power of two less 
than `numBytes`
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes);
+    }
+    numBytes = Math.min(numBytes, maximumBytes);
+    numBytes = Math.max(numBytes, minimumBytes);
+    return numBytes;
+  }
+
+  /**
+   * Used at the end of the insertion, select the candidate of the smallest 
size.
+   * At least one of the largest candidates will be kept when inserting data.
+   *
+   * @return the smallest and optimal candidate
+   */
+  protected BloomFilterCandidate optimalCandidate() {
+    return candidates.stream().min(BloomFilterCandidate::compareTo).get();
+  }
+
+  protected List<BloomFilterCandidate> getCandidates() {
+    return candidates;
+  }
+
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    finalized = true;
+    BloomFilterCandidate optimalBloomFilter = optimalCandidate();
+    optimalBloomFilter.bloomFilter.writeTo(out);
+    String columnName = column != null && column.getPath() != null ? 
Arrays.toString(column.getPath()) : "unknown";
+    LOG.info("The number of distinct values in {} is approximately {}, the 
optimal bloom filter NDV is {}, byte size is {}.",
+      columnName, distinctValueCounter, optimalBloomFilter.getExpectedNDV(),
+      optimalBloomFilter.bloomFilter.getBitsetSize());
+  }
+
+  /**
+   * Insert an element to the multiple bloom filter candidates and remove the 
bad candidate
+   * if the number of distinct values exceeds its expected size.
+   *
+   * @param hash the hash result of element.
+   */
+  @Override
+  public void insertHash(long hash) {
+    Preconditions.checkArgument(!finalized,
+      "Dynamic bloom filter insertion has been mark as finalized, no more data 
is allowed!");
+    if (!maxCandidate.bloomFilter.findHash(hash)) {
+      distinctValueCounter++;
+    }
+    // distinct values exceed the expected size, remove the bad bloom filter 
(leave at least the max bloom filter candidate)
+    candidates.removeIf(candidate -> candidate.getExpectedNDV() < 
distinctValueCounter && candidate != maxCandidate);
+    candidates.forEach(candidate -> 
candidate.getBloomFilter().insertHash(hash));
+  }
+
+  @Override
+  public int getBitsetSize() {
+    return optimalCandidate().getBloomFilter().getBitsetSize();
+  }
+
+  @Override
+  public boolean findHash(long hash) {
+    return maxCandidate.bloomFilter.findHash(hash);

Review Comment:
   Wrap `maxCandidate` in a function and throw if none of candidates are alive. 
Same for functions below.



##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java:
##########
@@ -152,6 +153,8 @@ public static enum JobSummaryLevel {
   public static final String BLOOM_FILTER_EXPECTED_NDV = 
"parquet.bloom.filter.expected.ndv";
   public static final String BLOOM_FILTER_MAX_BYTES = 
"parquet.bloom.filter.max.bytes";
   public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
+  public static final String DYNAMIC_BLOOM_FILTER_ENABLED = 
"parquet.bloom.filter.dynamic.enabled";

Review Comment:
   BTW, I am not sure if `dynamic bloom filter` is a generally accepted name to 
summarize this feature.



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, 
candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int 
maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or 
equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate 
candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of 
`1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, 
minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = 
candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to 
create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct 
values.
+   * The expected number result may be slightly smaller than what `numBytes` 
can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;
+    while (optimalBytes < numBytes) {
+      expectedNDV += step;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) 
/ 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= step;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bitsets size should be power of 2, see 
[[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateTwoPowerSize(int numBytes) {

Review Comment:
   We can simply rename it as `calculateBoundedPowerOfTwo`?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp, 
candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int minimumBytes, int 
maximumBytes, HashStrategy hashStrategy,
+    double fpp, int candidatesNum, ColumnDescriptor column) {
+    if (minimumBytes > maximumBytes) {
+      throw new IllegalArgumentException("the minimum bytes should be less or 
equal than maximum bytes");
+    }
+
+    if (minimumBytes > LOWER_BOUND_BYTES && minimumBytes < UPPER_BOUND_BYTES) {
+      this.minimumBytes = minimumBytes;
+    }
+
+    if (maximumBytes > LOWER_BOUND_BYTES && maximumBytes < UPPER_BOUND_BYTES) {
+      this.maximumBytes = maximumBytes;
+    }
+    this.column = column;
+    switch (hashStrategy) {
+      case XXH64:
+        this.hashStrategy = hashStrategy;
+        break;
+      default:
+        throw new RuntimeException("Unsupported hash strategy");
+    }
+    initCandidates(numBytes, candidatesNum, fpp);
+  }
+
+  /**
+   * Given the maximum acceptable bytes size of bloom filter, generate 
candidates according
+   * to the bytes size. The bytes size of the candidate needs to be a
+   * power of 2. Therefore, set the candidate size according to `maxBytes` of 
`1/2`, `1/4`, `1/8`, etc.
+   *
+   * @param maxBytes      the maximum acceptable bit size
+   * @param candidatesNum the number of candidates
+   * @param fpp           the false positive probability
+   */
+  private void initCandidates(int maxBytes, int candidatesNum, double fpp) {
+    int candidateByteSize = calculateTwoPowerSize(maxBytes);
+    for (int i = 1; i <= candidatesNum; i++) {
+      int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+      // `candidateByteSize` is too small, just drop it
+      if (candidateExpectedNDV <= 0) {
+        break;
+      }
+      BloomFilterCandidate candidate =
+        new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize, 
minimumBytes, maximumBytes, hashStrategy);
+      candidates.add(candidate);
+      candidateByteSize = calculateTwoPowerSize(candidateByteSize / 2);
+    }
+    Optional<BloomFilterCandidate> maxBloomFilter = 
candidates.stream().max(BloomFilterCandidate::compareTo);
+    if (maxBloomFilter.isPresent()) {
+      maxCandidate = maxBloomFilter.get();
+    } else {
+      throw new IllegalArgumentException("`maximumBytes` is too small to 
create one valid bloom filter");
+    }
+  }
+
+  /**
+   * According to the size of bytes, calculate the expected number of distinct 
values.
+   * The expected number result may be slightly smaller than what `numBytes` 
can support.
+   *
+   * @param numBytes the bytes size
+   * @param fpp      the false positive probability
+   * @return the expected number of distinct values
+   */
+  private int expectedNDV(int numBytes, double fpp) {
+    int expectedNDV = 0;
+    int optimalBytes = 0;
+    int step = 500;
+    while (optimalBytes < numBytes) {
+      expectedNDV += step;
+      optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp) 
/ 8;
+    }
+    // make sure it is slightly smaller than what `numBytes` can support
+    expectedNDV -= step;
+    // numBytes is too small
+    if (expectedNDV <= 0) {
+      expectedNDV = 0;
+    }
+    return expectedNDV;
+  }
+
+  /**
+   * BloomFilter bitsets size should be power of 2, see 
[[BlockSplitBloomFilter#initBitset]]
+   *
+   * @param numBytes the bytes size
+   * @return the largest power of 2 less or equal to numBytes
+   */
+  private int calculateTwoPowerSize(int numBytes) {
+    if (numBytes < minimumBytes) {
+      numBytes = minimumBytes;
+    }
+    // if `numBytes` is not power of 2, get the next largest power of two less 
than `numBytes`
+    if ((numBytes & (numBytes - 1)) != 0) {
+      numBytes = Integer.highestOneBit(numBytes);
+    }
+    numBytes = Math.min(numBytes, maximumBytes);
+    numBytes = Math.max(numBytes, minimumBytes);
+    return numBytes;
+  }
+
+  /**
+   * Used at the end of the insertion, select the candidate of the smallest 
size.
+   * At least one of the largest candidates will be kept when inserting data.
+   *
+   * @return the smallest and optimal candidate
+   */
+  protected BloomFilterCandidate optimalCandidate() {
+    return candidates.stream().min(BloomFilterCandidate::compareTo).get();
+  }
+
+  protected List<BloomFilterCandidate> getCandidates() {
+    return candidates;
+  }
+
+  @Override
+  public void writeTo(OutputStream out) throws IOException {
+    finalized = true;
+    BloomFilterCandidate optimalBloomFilter = optimalCandidate();
+    optimalBloomFilter.bloomFilter.writeTo(out);
+    String columnName = column != null && column.getPath() != null ? 
Arrays.toString(column.getPath()) : "unknown";
+    LOG.info("The number of distinct values in {} is approximately {}, the 
optimal bloom filter NDV is {}, byte size is {}.",
+      columnName, distinctValueCounter, optimalBloomFilter.getExpectedNDV(),
+      optimalBloomFilter.bloomFilter.getBitsetSize());
+  }
+
+  /**
+   * Insert an element to the multiple bloom filter candidates and remove the 
bad candidate
+   * if the number of distinct values exceeds its expected size.
+   *
+   * @param hash the hash result of element.
+   */
+  @Override
+  public void insertHash(long hash) {
+    Preconditions.checkArgument(!finalized,
+      "Dynamic bloom filter insertion has been mark as finalized, no more data 
is allowed!");
+    if (!maxCandidate.bloomFilter.findHash(hash)) {
+      distinctValueCounter++;
+    }
+    // distinct values exceed the expected size, remove the bad bloom filter 
(leave at least the max bloom filter candidate)
+    candidates.removeIf(candidate -> candidate.getExpectedNDV() < 
distinctValueCounter && candidate != maxCandidate);
+    candidates.forEach(candidate -> 
candidate.getBloomFilter().insertHash(hash));
+  }
+
+  @Override
+  public int getBitsetSize() {
+    return optimalCandidate().getBloomFilter().getBitsetSize();

Review Comment:
   What if optimal does not exist? Will it throw in the `optimalCandidate()`?



##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/DynamicBlockBloomFilter.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static 
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as 
candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal 
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an 
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class DynamicBlockBloomFilter implements BloomFilter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+
+  // multiple candidates, inserting data at the same time. If the distinct 
values are greater than the
+  // expected NDV of candidates, it will be removed. Finally we will choose 
the smallest candidate to write out.
+  private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+  // the largest among candidates and used as an approximate deduplication 
counter
+  private BloomFilterCandidate maxCandidate;
+
+  // the accumulator of the number of distinct values that have been inserted 
so far
+  private int distinctValueCounter = 0;
+
+  // indicates that the bloom filter candidate has been written out and new 
data should be no longer allowed to be inserted
+  private boolean finalized = false;
+
+  private int maximumBytes = UPPER_BOUND_BYTES;
+  private int minimumBytes = LOWER_BOUND_BYTES;
+  // the hash strategy used in this bloom filter.
+  private final HashStrategy hashStrategy;
+  // the column to build bloom filter
+  private ColumnDescriptor column;
+
+  public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp, 
ColumnDescriptor column) {
+    this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64, 
fpp, candidatesNum, column);
+  }
+
+  public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int 
candidatesNum, double fpp, ColumnDescriptor column) {

Review Comment:
   I know these constructors are following the same style of 
`BlockSplitBloomFilter`, but I really doubt if this one is necessary at all. 
Specifically, how do we know `numBytes` in advance?





> Build a BloomFilter with a more precise size
> --------------------------------------------
>
>                 Key: PARQUET-2254
>                 URL: https://issues.apache.org/jira/browse/PARQUET-2254
>             Project: Parquet
>          Issue Type: Improvement
>            Reporter: Mars
>            Assignee: Mars
>            Priority: Major
>
> h3. Why are the changes needed?
> Now the usage of bloom filter is to specify the NDV(number of distinct 
> values), and then build BloomFilter. In general scenarios, it is actually not 
> sure how much the distinct value is.
> If BloomFilter can be automatically generated according to the data, the file 
> size can be reduced and the reading efficiency can also be improved.
> h3. What changes were proposed in this pull request?
> {{DynamicBlockBloomFilter}} contains multiple {{BlockSplitBloomFilter}} as 
> candidates and inserts values in the candidates at the same time. Use the 
> largest bloom filter as an approximate deduplication counter, and then remove 
> incapable bloom filter candidates during data insertion.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to