[
https://issues.apache.org/jira/browse/PARQUET-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17719591#comment-17719591
]
ASF GitHub Bot commented on PARQUET-2254:
-----------------------------------------
wgtmac commented on code in PR #1042:
URL: https://github.com/apache/parquet-mr/pull/1042#discussion_r1185642897
##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/AdaptiveBlockSplitBloomFilter.java:
##########
@@ -38,57 +38,51 @@
import org.apache.parquet.io.api.Binary;
/**
- * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as
candidates and inserts values in
+ * `AdaptiveBlockSplitBloomFilter` contains multiple `BlockSplitBloomFilter`
as candidates and inserts values in
* the candidates at the same time.
* The purpose of this is to finally generate a bloom filter with the optimal
bit size according to the number
* of real data distinct values. Use the largest bloom filter as an
approximate deduplication counter, and then
* remove incapable bloom filter candidate during data insertion.
*/
-public class DynamicBlockBloomFilter implements BloomFilter {
+public class AdaptiveBlockSplitBloomFilter implements BloomFilter {
- private static final Logger LOG =
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(AdaptiveBlockSplitBloomFilter.class);
// multiple candidates, inserting data at the same time. If the distinct
values are greater than the
// expected NDV of candidates, it will be removed. Finally we will choose
the smallest candidate to write out.
private final List<BloomFilterCandidate> candidates = new ArrayList<>();
// the largest among candidates and used as an approximate deduplication
counter
- private BloomFilterCandidate maxCandidate;
+ private BloomFilterCandidate largestCandidate;
// the accumulator of the number of distinct values that have been inserted
so far
- private int distinctValueCounter = 0;
+ private long distinctValueCounter = 0;
// indicates that the bloom filter candidate has been written out and new
data should be no longer allowed to be inserted
private boolean finalized = false;
+ // indicates the step size to find the NDV value corresponding to numBytes
+ private static final int NDV_STEP = 500;
private int maximumBytes = UPPER_BOUND_BYTES;
private int minimumBytes = LOWER_BOUND_BYTES;
// the hash strategy used in this bloom filter.
private final HashStrategy hashStrategy;
// the column to build bloom filter
private ColumnDescriptor column;
- public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp,
ColumnDescriptor column) {
- this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64,
fpp, candidatesNum, column);
- }
-
- public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int
candidatesNum, double fpp, ColumnDescriptor column) {
- this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp,
candidatesNum, column);
+ /**
+ * Given the maximum acceptable bytes size of bloom filter, generate
candidates according it.
Review Comment:
```suggestion
* Generate bloom filter candidates according to the maximum acceptable
byte size.
```
##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/AdaptiveBlockSplitBloomFilter.java:
##########
@@ -38,57 +38,51 @@
import org.apache.parquet.io.api.Binary;
/**
- * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as
candidates and inserts values in
+ * `AdaptiveBlockSplitBloomFilter` contains multiple `BlockSplitBloomFilter`
as candidates and inserts values in
* the candidates at the same time.
* The purpose of this is to finally generate a bloom filter with the optimal
bit size according to the number
* of real data distinct values. Use the largest bloom filter as an
approximate deduplication counter, and then
* remove incapable bloom filter candidate during data insertion.
*/
-public class DynamicBlockBloomFilter implements BloomFilter {
+public class AdaptiveBlockSplitBloomFilter implements BloomFilter {
- private static final Logger LOG =
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(AdaptiveBlockSplitBloomFilter.class);
// multiple candidates, inserting data at the same time. If the distinct
values are greater than the
// expected NDV of candidates, it will be removed. Finally we will choose
the smallest candidate to write out.
private final List<BloomFilterCandidate> candidates = new ArrayList<>();
// the largest among candidates and used as an approximate deduplication
counter
- private BloomFilterCandidate maxCandidate;
+ private BloomFilterCandidate largestCandidate;
// the accumulator of the number of distinct values that have been inserted
so far
- private int distinctValueCounter = 0;
+ private long distinctValueCounter = 0;
// indicates that the bloom filter candidate has been written out and new
data should be no longer allowed to be inserted
private boolean finalized = false;
+ // indicates the step size to find the NDV value corresponding to numBytes
+ private static final int NDV_STEP = 500;
private int maximumBytes = UPPER_BOUND_BYTES;
private int minimumBytes = LOWER_BOUND_BYTES;
// the hash strategy used in this bloom filter.
private final HashStrategy hashStrategy;
// the column to build bloom filter
private ColumnDescriptor column;
- public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp,
ColumnDescriptor column) {
- this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64,
fpp, candidatesNum, column);
- }
-
- public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int
candidatesNum, double fpp, ColumnDescriptor column) {
- this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp,
candidatesNum, column);
+ /**
+ * Given the maximum acceptable bytes size of bloom filter, generate
candidates according it.
+ *
+ * @param maximumBytes the maximum bit size of candidate
Review Comment:
Is it bit size of byte size?
##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/AdaptiveBlockSplitBloomFilter.java:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `AdaptiveBlockSplitBloomFilter` contains multiple `BlockSplitBloomFilter`
as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class AdaptiveBlockSplitBloomFilter implements BloomFilter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AdaptiveBlockSplitBloomFilter.class);
+
+ // multiple candidates, inserting data at the same time. If the distinct
values are greater than the
+ // expected NDV of candidates, it will be removed. Finally we will choose
the smallest candidate to write out.
+ private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+ // the largest among candidates and used as an approximate deduplication
counter
+ private BloomFilterCandidate largestCandidate;
+
+ // the accumulator of the number of distinct values that have been inserted
so far
+ private long distinctValueCounter = 0;
+
+ // indicates that the bloom filter candidate has been written out and new
data should be no longer allowed to be inserted
+ private boolean finalized = false;
+
+ // indicates the step size to find the NDV value corresponding to numBytes
+ private static final int NDV_STEP = 500;
+ private int maximumBytes = UPPER_BOUND_BYTES;
+ private int minimumBytes = LOWER_BOUND_BYTES;
+ // the hash strategy used in this bloom filter.
+ private final HashStrategy hashStrategy;
+ // the column to build bloom filter
+ private ColumnDescriptor column;
+
+ /**
+ * Given the maximum acceptable bytes size of bloom filter, generate
candidates according it.
+ *
+ * @param maximumBytes the maximum bit size of candidate
+ * @param numCandidates the number of candidates
+ * @param fpp the false positive probability
+ */
+ public AdaptiveBlockSplitBloomFilter(int maximumBytes, int numCandidates,
double fpp, ColumnDescriptor column) {
+ this(maximumBytes, HashStrategy.XXH64, fpp, numCandidates, column);
+ }
+
+ public AdaptiveBlockSplitBloomFilter(int maximumBytes, HashStrategy
hashStrategy, double fpp,
+ int numCandidates, ColumnDescriptor column) {
+ this.column = column;
+ switch (hashStrategy) {
+ case XXH64:
+ this.hashStrategy = hashStrategy;
+ break;
+ default:
+ throw new RuntimeException("Unsupported hash strategy");
+ }
+ initCandidates(maximumBytes, numCandidates, fpp);
+ }
+
+ /**
+ * Given the maximum acceptable bytes size of bloom filter, generate
candidates according
+ * to the bytes size. Because the bytes size of the candidate need to be a
+ * power of 2, we setting the candidate size according to `maxBytes` of
`1/2`, `1/4`, `1/8`, etc.
+ *
+ * @param maxBytes the maximum bit size of candidate
+ * @param numCandidates the number of candidates
+ * @param fpp the false positive probability
+ */
+ private void initCandidates(int maxBytes, int numCandidates, double fpp) {
+ int candidateByteSize = calculateBoundedPowerOfTwo(maxBytes);
+ for (int i = 1; i <= numCandidates; i++) {
+ int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+ // `candidateByteSize` is too small, just drop it
+ if (candidateExpectedNDV <= 0) {
+ break;
+ }
+ BloomFilterCandidate candidate =
+ new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize,
minimumBytes, maximumBytes, hashStrategy);
+ candidates.add(candidate);
+ candidateByteSize = calculateBoundedPowerOfTwo(candidateByteSize / 2);
+ }
+ Optional<BloomFilterCandidate> maxBloomFilter =
candidates.stream().max(BloomFilterCandidate::compareTo);
+ if (maxBloomFilter.isPresent()) {
+ largestCandidate = maxBloomFilter.get();
+ } else {
+ throw new IllegalArgumentException("`maximumBytes` is too small to
create one valid bloom filter");
Review Comment:
My previous comment here means we should not fail the writer if bloom filter
cannot be generated. This is not a fatal error.
If the writer will fail due to a misconfiguration, we should probably
document this so users will not be surprised?
WDYT?
##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/AdaptiveBlockSplitBloomFilter.java:
##########
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.column.values.bloomfilter;
+
+import static
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.LOWER_BOUND_BYTES;
+import static
org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter.UPPER_BOUND_BYTES;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.io.api.Binary;
+
+/**
+ * `AdaptiveBlockSplitBloomFilter` contains multiple `BlockSplitBloomFilter`
as candidates and inserts values in
+ * the candidates at the same time.
+ * The purpose of this is to finally generate a bloom filter with the optimal
bit size according to the number
+ * of real data distinct values. Use the largest bloom filter as an
approximate deduplication counter, and then
+ * remove incapable bloom filter candidate during data insertion.
+ */
+public class AdaptiveBlockSplitBloomFilter implements BloomFilter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AdaptiveBlockSplitBloomFilter.class);
+
+ // multiple candidates, inserting data at the same time. If the distinct
values are greater than the
+ // expected NDV of candidates, it will be removed. Finally we will choose
the smallest candidate to write out.
+ private final List<BloomFilterCandidate> candidates = new ArrayList<>();
+
+ // the largest among candidates and used as an approximate deduplication
counter
+ private BloomFilterCandidate largestCandidate;
+
+ // the accumulator of the number of distinct values that have been inserted
so far
+ private long distinctValueCounter = 0;
+
+ // indicates that the bloom filter candidate has been written out and new
data should be no longer allowed to be inserted
+ private boolean finalized = false;
+
+ // indicates the step size to find the NDV value corresponding to numBytes
+ private static final int NDV_STEP = 500;
+ private int maximumBytes = UPPER_BOUND_BYTES;
+ private int minimumBytes = LOWER_BOUND_BYTES;
+ // the hash strategy used in this bloom filter.
+ private final HashStrategy hashStrategy;
+ // the column to build bloom filter
+ private ColumnDescriptor column;
+
+ /**
+ * Given the maximum acceptable bytes size of bloom filter, generate
candidates according it.
+ *
+ * @param maximumBytes the maximum bit size of candidate
+ * @param numCandidates the number of candidates
+ * @param fpp the false positive probability
+ */
+ public AdaptiveBlockSplitBloomFilter(int maximumBytes, int numCandidates,
double fpp, ColumnDescriptor column) {
+ this(maximumBytes, HashStrategy.XXH64, fpp, numCandidates, column);
+ }
+
+ public AdaptiveBlockSplitBloomFilter(int maximumBytes, HashStrategy
hashStrategy, double fpp,
+ int numCandidates, ColumnDescriptor column) {
+ this.column = column;
+ switch (hashStrategy) {
+ case XXH64:
+ this.hashStrategy = hashStrategy;
+ break;
+ default:
+ throw new RuntimeException("Unsupported hash strategy");
+ }
+ initCandidates(maximumBytes, numCandidates, fpp);
+ }
+
+ /**
+ * Given the maximum acceptable bytes size of bloom filter, generate
candidates according
+ * to the bytes size. Because the bytes size of the candidate need to be a
+ * power of 2, we setting the candidate size according to `maxBytes` of
`1/2`, `1/4`, `1/8`, etc.
+ *
+ * @param maxBytes the maximum bit size of candidate
+ * @param numCandidates the number of candidates
+ * @param fpp the false positive probability
+ */
+ private void initCandidates(int maxBytes, int numCandidates, double fpp) {
+ int candidateByteSize = calculateBoundedPowerOfTwo(maxBytes);
+ for (int i = 1; i <= numCandidates; i++) {
+ int candidateExpectedNDV = expectedNDV(candidateByteSize, fpp);
+ // `candidateByteSize` is too small, just drop it
+ if (candidateExpectedNDV <= 0) {
+ break;
+ }
+ BloomFilterCandidate candidate =
+ new BloomFilterCandidate(candidateExpectedNDV, candidateByteSize,
minimumBytes, maximumBytes, hashStrategy);
+ candidates.add(candidate);
+ candidateByteSize = calculateBoundedPowerOfTwo(candidateByteSize / 2);
+ }
+ Optional<BloomFilterCandidate> maxBloomFilter =
candidates.stream().max(BloomFilterCandidate::compareTo);
+ if (maxBloomFilter.isPresent()) {
+ largestCandidate = maxBloomFilter.get();
+ } else {
+ throw new IllegalArgumentException("`maximumBytes` is too small to
create one valid bloom filter");
+ }
+ }
+
+ /**
+ * According to the size of bytes, calculate the expected number of distinct
values.
+ * The expected number result may be slightly smaller than what `numBytes`
can support.
+ *
+ * @param numBytes the bytes size
+ * @param fpp the false positive probability
+ * @return the expected number of distinct values
+ */
+ private int expectedNDV(int numBytes, double fpp) {
+ int expectedNDV = 0;
+ int optimalBytes = 0;
+ while (optimalBytes < numBytes) {
+ expectedNDV += NDV_STEP;
+ optimalBytes = BlockSplitBloomFilter.optimalNumOfBits(expectedNDV, fpp)
/ 8;
+ }
+ // make sure it is slightly smaller than what `numBytes` can support
+ expectedNDV -= NDV_STEP;
+ // numBytes is too small
+ if (expectedNDV <= 0) {
+ expectedNDV = 0;
+ }
+ return expectedNDV;
+ }
+
+ /**
+ * BloomFilter bitsets size should be power of 2, see
[[BlockSplitBloomFilter#initBitset]]
+ *
+ * @param numBytes the bytes size
+ * @return the largest power of 2 less or equal to numBytes
+ */
+ private int calculateBoundedPowerOfTwo(int numBytes) {
+ if (numBytes < minimumBytes) {
+ numBytes = minimumBytes;
+ }
+ // if `numBytes` is not power of 2, get the next largest power of two less
than `numBytes`
+ if ((numBytes & (numBytes - 1)) != 0) {
+ numBytes = Integer.highestOneBit(numBytes);
+ }
+ numBytes = Math.min(numBytes, maximumBytes);
+ numBytes = Math.max(numBytes, minimumBytes);
+ return numBytes;
+ }
+
+ /**
+ * Used at the end of the insertion, select the candidate of the smallest
size.
+ * At least one of the largest candidates will be kept when inserting data.
+ *
+ * @return the smallest and optimal candidate
+ */
+ protected BloomFilterCandidate optimalCandidate() {
+ return candidates.stream().min(BloomFilterCandidate::compareTo).get();
+ }
+
+ protected List<BloomFilterCandidate> getCandidates() {
+ return candidates;
+ }
+
+ @Override
+ public void writeTo(OutputStream out) throws IOException {
+ finalized = true;
+ BloomFilterCandidate optimalBloomFilter = optimalCandidate();
+ optimalBloomFilter.bloomFilter.writeTo(out);
+ String columnName = column != null && column.getPath() != null ?
Arrays.toString(column.getPath()) : "unknown";
+ LOG.info("The number of distinct values in {} is approximately {}, the
optimal bloom filter NDV is {}, byte size is {}.",
+ columnName, distinctValueCounter, optimalBloomFilter.getExpectedNDV(),
+ optimalBloomFilter.bloomFilter.getBitsetSize());
+ }
+
+ /**
+ * Insert an element to the multiple bloom filter candidates and remove the
bad candidate
+ * if the number of distinct values exceeds its expected size.
+ *
+ * @param hash the hash result of element.
+ */
+ @Override
+ public void insertHash(long hash) {
+ Preconditions.checkArgument(!finalized,
+ "AdaptiveBlockSplitBloomFilter insertion has been mark as finalized, no
more data is allowed!");
+ if (!largestCandidate.bloomFilter.findHash(hash)) {
+ distinctValueCounter++;
+ }
+ // distinct values exceed the expected size, remove the bad bloom filter
(leave at least the max bloom filter candidate)
+ candidates.removeIf(candidate -> candidate.getExpectedNDV() <
distinctValueCounter && candidate != largestCandidate);
Review Comment:
In the worst case where the maximum size is too small and many distinct
values have been inserted, the `largestCandidate` will set TRUE to almost all
bits making it less effective. Why not simply drop all the candidates?
##########
parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java:
##########
@@ -97,10 +97,9 @@ abstract class ColumnWriterBase implements ColumnWriter {
int optimalNumOfBits =
BlockSplitBloomFilter.optimalNumOfBits(ndv.getAsLong(), fpp.getAsDouble());
this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8,
maxBloomFilterSize);
} else {
- boolean useDynamicBloomFilter = props.getDynamicBloomFilterEnabled(path);
- if(useDynamicBloomFilter) {
+ if(props.getAdaptiveBloomFilterEnabled(path)) {
int candidateSize = props.getBloomFilterCandidateSize(path);
- this.bloomFilter = new DynamicBlockBloomFilter(maxBloomFilterSize,
candidateSize, fpp.getAsDouble(), path);
+ this.bloomFilter = new
AdaptiveBlockSplitBloomFilter(maxBloomFilterSize, candidateSize,
fpp.getAsDouble(), path);
Review Comment:
```suggestion
this.bloomFilter = new
AdaptiveBlockSplitBloomFilter(maxBloomFilterSize, numCandidates,
fpp.getAsDouble(), path);
```
##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/AdaptiveBlockSplitBloomFilter.java:
##########
@@ -38,57 +38,51 @@
import org.apache.parquet.io.api.Binary;
/**
- * `DynamicBlockBloomFilter` contains multiple `BlockSplitBloomFilter` as
candidates and inserts values in
+ * `AdaptiveBlockSplitBloomFilter` contains multiple `BlockSplitBloomFilter`
as candidates and inserts values in
* the candidates at the same time.
* The purpose of this is to finally generate a bloom filter with the optimal
bit size according to the number
* of real data distinct values. Use the largest bloom filter as an
approximate deduplication counter, and then
* remove incapable bloom filter candidate during data insertion.
*/
-public class DynamicBlockBloomFilter implements BloomFilter {
+public class AdaptiveBlockSplitBloomFilter implements BloomFilter {
- private static final Logger LOG =
LoggerFactory.getLogger(DynamicBlockBloomFilter.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(AdaptiveBlockSplitBloomFilter.class);
// multiple candidates, inserting data at the same time. If the distinct
values are greater than the
// expected NDV of candidates, it will be removed. Finally we will choose
the smallest candidate to write out.
private final List<BloomFilterCandidate> candidates = new ArrayList<>();
// the largest among candidates and used as an approximate deduplication
counter
- private BloomFilterCandidate maxCandidate;
+ private BloomFilterCandidate largestCandidate;
// the accumulator of the number of distinct values that have been inserted
so far
- private int distinctValueCounter = 0;
+ private long distinctValueCounter = 0;
// indicates that the bloom filter candidate has been written out and new
data should be no longer allowed to be inserted
private boolean finalized = false;
+ // indicates the step size to find the NDV value corresponding to numBytes
+ private static final int NDV_STEP = 500;
private int maximumBytes = UPPER_BOUND_BYTES;
private int minimumBytes = LOWER_BOUND_BYTES;
// the hash strategy used in this bloom filter.
private final HashStrategy hashStrategy;
// the column to build bloom filter
private ColumnDescriptor column;
- public DynamicBlockBloomFilter(int numBytes, int candidatesNum, double fpp,
ColumnDescriptor column) {
- this(numBytes, LOWER_BOUND_BYTES, UPPER_BOUND_BYTES, HashStrategy.XXH64,
fpp, candidatesNum, column);
- }
-
- public DynamicBlockBloomFilter(int numBytes, int maximumBytes, int
candidatesNum, double fpp, ColumnDescriptor column) {
- this(numBytes, LOWER_BOUND_BYTES, maximumBytes, HashStrategy.XXH64, fpp,
candidatesNum, column);
+ /**
+ * Given the maximum acceptable bytes size of bloom filter, generate
candidates according it.
+ *
+ * @param maximumBytes the maximum bit size of candidate
Review Comment:
nit: would be good to reflect bit/byte in the variable name.
##########
parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterBase.java:
##########
@@ -97,10 +97,9 @@ abstract class ColumnWriterBase implements ColumnWriter {
int optimalNumOfBits =
BlockSplitBloomFilter.optimalNumOfBits(ndv.getAsLong(), fpp.getAsDouble());
this.bloomFilter = new BlockSplitBloomFilter(optimalNumOfBits / 8,
maxBloomFilterSize);
} else {
- boolean useDynamicBloomFilter = props.getDynamicBloomFilterEnabled(path);
- if(useDynamicBloomFilter) {
+ if(props.getAdaptiveBloomFilterEnabled(path)) {
int candidateSize = props.getBloomFilterCandidateSize(path);
Review Comment:
```suggestion
int numCandidates = props.getBloomFilterCandidateCount(path);
```
##########
parquet-column/src/main/java/org/apache/parquet/column/values/bloomfilter/AdaptiveBlockSplitBloomFilter.java:
##########
@@ -97,21 +91,21 @@ public DynamicBlockBloomFilter(int numBytes, int
minimumBytes, int maximumBytes,
default:
throw new RuntimeException("Unsupported hash strategy");
}
- initCandidates(numBytes, candidatesNum, fpp);
+ initCandidates(maximumBytes, numCandidates, fpp);
}
/**
* Given the maximum acceptable bytes size of bloom filter, generate
candidates according
- * to the bytes size. The bytes size of the candidate needs to be a
- * power of 2. Therefore, set the candidate size according to `maxBytes` of
`1/2`, `1/4`, `1/8`, etc.
+ * to the bytes size. Because the bytes size of the candidate need to be a
+ * power of 2, we setting the candidate size according to `maxBytes` of
`1/2`, `1/4`, `1/8`, etc.
Review Comment:
```suggestion
* power of 2, here we set the candidate size to be a proportion of
`maxBytes` like `1/2`, `1/4`, `1/8`, etc.
```
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java:
##########
@@ -152,6 +153,8 @@ public static enum JobSummaryLevel {
public static final String BLOOM_FILTER_EXPECTED_NDV =
"parquet.bloom.filter.expected.ndv";
public static final String BLOOM_FILTER_MAX_BYTES =
"parquet.bloom.filter.max.bytes";
public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
+ public static final String ADAPTIVE_BLOOM_FILTER_ENABLED =
"parquet.bloom.filter.adaptive.enabled";
Review Comment:
Please document new configs here:
https://github.com/apache/parquet-mr/blob/master/parquet-hadoop/README.md
##########
parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java:
##########
@@ -152,6 +153,8 @@ public static enum JobSummaryLevel {
public static final String BLOOM_FILTER_EXPECTED_NDV =
"parquet.bloom.filter.expected.ndv";
public static final String BLOOM_FILTER_MAX_BYTES =
"parquet.bloom.filter.max.bytes";
public static final String BLOOM_FILTER_FPP = "parquet.bloom.filter.fpp";
+ public static final String DYNAMIC_BLOOM_FILTER_ENABLED =
"parquet.bloom.filter.dynamic.enabled";
+ public static final String BLOOM_FILTER_CANDIDATE_SIZE =
"parquet.bloom.filter.candidate.size";
Review Comment:
OK, that sounds good.
> Build a BloomFilter with a more precise size
> --------------------------------------------
>
> Key: PARQUET-2254
> URL: https://issues.apache.org/jira/browse/PARQUET-2254
> Project: Parquet
> Issue Type: Improvement
> Reporter: Mars
> Assignee: Mars
> Priority: Major
>
> h3. Why are the changes needed?
> Now the usage of bloom filter is to specify the NDV(number of distinct
> values), and then build BloomFilter. In general scenarios, it is actually not
> sure how much the distinct value is.
> If BloomFilter can be automatically generated according to the data, the file
> size can be reduced and the reading efficiency can also be improved.
> h3. What changes were proposed in this pull request?
> {{DynamicBlockBloomFilter}} contains multiple {{BlockSplitBloomFilter}} as
> candidates and inserts values in the candidates at the same time. Use the
> largest bloom filter as an approximate deduplication counter, and then remove
> incapable bloom filter candidates during data insertion.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)