This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 17b396ebc87f feat(mdt): Support writing bloom filter for hfile log
block (#18936)
17b396ebc87f is described below
commit 17b396ebc87f53c85eeb448554c21eb423b44562
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Jun 11 10:23:49 2026 +0800
feat(mdt): Support writing bloom filter for hfile log block (#18936)
* feat(mdt): Support writing bloom filter for hfile log block
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 4 +
.../org/apache/hudi/io/HoodieAppendHandle.java | 15 ++-
.../hudi/metadata/HoodieMetadataWriteUtils.java | 11 ++-
.../metadata/TestHoodieMetadataWriteUtils.java | 18 ++++
.../hudi/common/config/HoodieStorageConfig.java | 12 +++
.../common/table/log/block/HoodieDataBlock.java | 4 +-
.../table/log/block/HoodieHFileDataBlock.java | 21 ++--
.../org/apache/hudi/common/util/ConfigUtils.java | 2 +
.../org/apache/hudi/common/util/HFileUtils.java | 49 ++++++++++
.../io/storage/HoodieNativeAvroHFileReader.java | 4 +-
.../common/config/TestHoodieStorageConfig.java | 14 +++
.../apache/hudi/common/util/TestConfigUtils.java | 6 +-
.../apache/hudi/common/util/TestHFileUtils.java | 91 ++++++++++++++++++
.../org/apache/hudi/util/FlinkWriteClients.java | 48 ++++++++--
.../java/org/apache/hudi/util/StreamerUtil.java | 8 +-
.../apache/hudi/table/format/TestInputFormat.java | 106 +++++++++++++++++++++
.../apache/hudi/utils/TestFlinkWriteClients.java | 50 ++++++++++
.../org/apache/hudi/utils/TestStreamerUtil.java | 24 +++++
.../hadoop/HoodieAvroFileReaderFactory.java | 20 +++-
.../hadoop/HoodieAvroFileWriterFactory.java | 3 +-
.../io/storage/hadoop/HoodieAvroHFileWriter.java | 20 +---
.../io/hadoop/TestHoodieAvroFileReaderFactory.java | 66 +++++++++++++
.../io/hadoop/TestHoodieHFileReaderWriter.java | 79 +++++++++++++--
23 files changed, 608 insertions(+), 67 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 9c710fd9953a..9e4263a4fbdd 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2451,6 +2451,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return
getBooleanOrDefault(HoodieStorageConfig.PARQUET_WITH_BLOOM_FILTER_ENABLED);
}
+ public boolean hfileBloomFilterEnabled() {
+ return
getBooleanOrDefault(HoodieStorageConfig.HFILE_WITH_BLOOM_FILTER_ENABLED);
+ }
+
public Option<HoodieLogBlock.HoodieLogBlockType> getLogDataBlockFormat() {
return
Option.ofNullable(getString(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT))
.map(HoodieLogBlock.HoodieLogBlockType::fromId);
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 3b825bbf1ade..1e0f0adf250f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -84,6 +84,12 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_TYPE;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_WITH_BLOOM_FILTER_ENABLED;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata;
@@ -726,8 +732,15 @@ public class HoodieAppendHandle<T, I, K, O> extends
HoodieWriteHandle<T, I, K, O
// Not supporting positions in HFile data blocks
header.remove(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
records.sort(Comparator.comparing(HoodieRecord::getRecordKey));
+ Map<String, String> hfileParams = new HashMap<>();
+ hfileParams.put(HFILE_COMPRESSION_ALGORITHM_NAME.key(),
writeConfig.getHFileCompressionAlgorithm());
+ hfileParams.put(HFILE_WITH_BLOOM_FILTER_ENABLED.key(),
Boolean.toString(writeConfig.hfileBloomFilterEnabled()));
+ hfileParams.put(BLOOM_FILTER_NUM_ENTRIES_VALUE.key(),
Integer.toString(writeConfig.getBloomFilterNumEntries()));
+ hfileParams.put(BLOOM_FILTER_FPP_VALUE.key(),
Double.toString(writeConfig.getBloomFilterFPP()));
+ hfileParams.put(BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key(),
Integer.toString(writeConfig.getDynamicBloomFilterMaxNumEntries()));
+ hfileParams.put(BLOOM_FILTER_TYPE.key(),
writeConfig.getBloomFilterType());
return new HoodieHFileDataBlock(
- records, header, writeConfig.getHFileCompressionAlgorithm(), new
StoragePath(writeConfig.getBasePath()));
+ records, header, writeConfig.getHFileCompressionAlgorithm(),
hfileParams, new StoragePath(writeConfig.getBasePath()));
case PARQUET_DATA_BLOCK:
return new HoodieParquetDataBlock(
records,
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index 7d6cef77c63a..3992b65dfdd0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -259,11 +259,12 @@ public class HoodieMetadataWriteUtils {
// Keeping the log blocks as large as the log files themselves
reduces the number of HFile blocks to be checked for
// presence of keys
.logFileDataBlockMaxSize(maxLogFileSizeBytes)
-
.withBloomFilterType(writeConfig.getMetadataConfig().getBloomFilterType())
-
.withBloomFilterNumEntries(writeConfig.getMetadataConfig().getBloomFilterNumEntries())
-
.withBloomFilterFpp(writeConfig.getMetadataConfig().getBloomFilterFpp())
-
.withBloomFilterDynamicMaxEntries(writeConfig.getMetadataConfig().getDynamicBloomFilterMaxNumEntries())
- .build())
+ .hfileBloomFilterEnable(writeConfig.hfileBloomFilterEnabled())
+
.withBloomFilterType(writeConfig.getMetadataConfig().getBloomFilterType())
+
.withBloomFilterNumEntries(writeConfig.getMetadataConfig().getBloomFilterNumEntries())
+
.withBloomFilterFpp(writeConfig.getMetadataConfig().getBloomFilterFpp())
+
.withBloomFilterDynamicMaxEntries(writeConfig.getMetadataConfig().getDynamicBloomFilterMaxNumEntries())
+ .build())
.withRollbackParallelism(MDT_DEFAULT_PARALLELISM)
.withFinalizeWriteParallelism(MDT_DEFAULT_PARALLELISM)
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
index 73d10ef228d2..333d212a1d50 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
@@ -24,6 +24,7 @@ import
org.apache.hudi.client.transaction.lock.StorageBasedLockProvider;
import
org.apache.hudi.client.transaction.lock.ZookeeperBasedImplicitBasePathLockProvider;
import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.ActionType;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -35,6 +36,8 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.Properties;
@@ -47,6 +50,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieMetadataWriteUtils {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void
testCreateMetadataWriteConfigPropagatesHFileBloomFilterSetting(boolean
hfileBloomFilterEnabled) {
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp/base_path/")
+ .withStorageConfig(HoodieStorageConfig.newBuilder()
+ .hfileBloomFilterEnable(hfileBloomFilterEnabled)
+ .build())
+ .build();
+
+ HoodieWriteConfig metadataWriteConfig =
HoodieMetadataWriteUtils.createMetadataWriteConfig(
+ writeConfig, HoodieFailedWritesCleaningPolicy.EAGER,
HoodieTableVersion.EIGHT);
+ assertEquals(hfileBloomFilterEnabled,
metadataWriteConfig.hfileBloomFilterEnabled());
+ }
+
@Test
public void testCreateMetadataWriteConfigForCleaner() {
HoodieWriteConfig writeConfig1 = HoodieWriteConfig.newBuilder()
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index 17f2bc6a9b89..cbdf5cc223e0 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -125,6 +125,13 @@ public class HoodieStorageConfig extends HoodieConfig {
+ "reader for footer/metadata operations (schema, bloom filter).
Independent of the "
+ "data allocator since metadata allocations are small and
short-lived.");
+ public static final ConfigProperty<Boolean> HFILE_WITH_BLOOM_FILTER_ENABLED
= ConfigProperty
+ .key("hoodie.hfile.bloom.filter.enabled")
+ .defaultValue(true)
+ .markAdvanced()
+ .sinceVersion("1.3.0")
+ .withDocumentation("Control whether to write bloom filter or not to
HFile.");
+
public static final ConfigProperty<Boolean> HFILE_WRITER_TO_ALLOW_DUPLICATES
= ConfigProperty
.key("hoodie.hfile.writes.allow.duplicates")
.defaultValue(false)
@@ -579,6 +586,11 @@ public class HoodieStorageConfig extends HoodieConfig {
return this;
}
+ public Builder hfileBloomFilterEnable(boolean hfileBloomFilterEnable) {
+ storageConfig.setValue(HFILE_WITH_BLOOM_FILTER_ENABLED,
String.valueOf(hfileBloomFilterEnable));
+ return this;
+ }
+
public Builder parquetVariantWriteShreddingEnabled(boolean enabled) {
storageConfig.setValue(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED,
String.valueOf(enabled));
return this;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index aee07ea947a7..70ac694dedfe 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -263,7 +263,7 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
public final <T> ClosableIterator<T>
getEngineRecordIterator(HoodieReaderContext<T> readerContext, List<String>
keys, boolean fullKey) throws IOException {
boolean fullScan = keys.isEmpty();
if (!fullScan) {
- return lookupEngineRecords(keys, fullKey);
+ return lookupEngineRecords(readerContext, keys, fullKey);
} else {
throw new IllegalStateException("Unexpected code reached. Expected to be
called only with keySpec defined for non FILES partition in Metadata table");
}
@@ -323,7 +323,7 @@ public abstract class HoodieDataBlock extends
HoodieLogBlock {
);
}
- protected <T> ClosableIterator<T> lookupEngineRecords(List<String> keys,
boolean fullKey) throws IOException {
+ protected <T> ClosableIterator<T> lookupEngineRecords(HoodieReaderContext<T>
readerContext, List<String> keys, boolean fullKey) throws IOException {
throw new UnsupportedOperationException(
String.format("Point lookups are not supported by this Data block type
(%s)", getBlockType())
);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 659cbf8c8d18..bc80e279a194 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -57,7 +57,7 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkState;
* base file format.
*/
public class HoodieHFileDataBlock extends HoodieDataBlock {
- private final Option<String> compressionCodec;
+ private final Map<String, String> writerParams;
// This path is used for constructing HFile reader context, which should not
be
// interpreted as the actual file path for the HFile data blocks
private final StoragePath pathForReader;
@@ -73,7 +73,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
StoragePath pathForReader) {
super(content, inputStreamSupplier, readBlockLazily,
Option.of(logBlockContentLocation), readerSchema,
header, footer, HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME,
enablePointLookups);
- this.compressionCodec = Option.empty();
+ this.writerParams = Collections.emptyMap();
this.pathForReader = pathForReader;
}
@@ -81,8 +81,17 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
Map<HeaderMetadataType, String> header,
String compressionCodec,
StoragePath pathForReader) {
+ this(records, header, compressionCodec, Collections.emptyMap(),
pathForReader);
+ }
+
+ public HoodieHFileDataBlock(List<HoodieRecord> records,
+ Map<HeaderMetadataType, String> header,
+ String compressionCodec,
+ Map<String, String> paramsMap,
+ StoragePath pathForReader) {
super(records, header, new HashMap<>(),
HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME);
- this.compressionCodec = Option.of(compressionCodec);
+ this.writerParams = new HashMap<>(paramsMap);
+ this.writerParams.putIfAbsent(HFILE_COMPRESSION_ALGORITHM_NAME.key(),
compressionCodec);
this.pathForReader = pathForReader;
}
@@ -98,7 +107,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
return
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(HoodieFileFormat.HFILE)
.serializeRecordsToLogBlock(
storage, records, writerSchema, getSchema(), getKeyFieldName(),
- Collections.singletonMap(HFILE_COMPRESSION_ALGORITHM_NAME.key(),
compressionCodec.get()));
+ writerParams);
}
@Override
@@ -178,7 +187,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
}
@Override
- protected <T> ClosableIterator<T> lookupEngineRecords(List<String>
sortedKeys, boolean fullKey) throws IOException {
+ protected <T> ClosableIterator<T> lookupEngineRecords(HoodieReaderContext<T>
readerContext, List<String> sortedKeys, boolean fullKey) throws IOException {
HoodieLogBlockContentLocation blockContentLoc =
getBlockContentLocation().get();
// NOTE: It's important to extend Hadoop configuration here to make sure
configuration
@@ -195,7 +204,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
try (final HoodieAvroHFileReaderImplBase reader =
(HoodieAvroHFileReaderImplBase) HoodieIOFactory
.getIOFactory(inlineStorage)
.getReaderFactory(HoodieRecordType.AVRO)
- .getFileReader(ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
+ .getFileReader(readerContext.getHoodieReaderConfig(),
storagePathInfo,
HoodieFileFormat.HFILE,
Option.of(getSchemaFromHeader()))) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 5f0ce3bbb9a0..398494a957dd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -860,6 +860,8 @@ public class ConfigUtils {
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES));
props.setProperty(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB.key(),
metadataConfig.getStringOrDefault(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB));
+ props.setProperty(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(),
+
metadataConfig.getStringOrDefault(HoodieMetadataConfig.BLOOM_FILTER_ENABLE));
return props;
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
index e47779c6f623..864bd76e2b14 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -20,6 +20,8 @@
package org.apache.hudi.common.util;
import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
import org.apache.hudi.common.config.HoodieReaderConfig;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieFileFormat;
@@ -58,7 +60,12 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_TYPE;
import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_WITH_BLOOM_FILTER_ENABLED;
import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
/**
@@ -192,6 +199,8 @@ public class HFileUtils extends FileFormatUtils {
String keyFieldName,
Map<String, String>
paramsMap) throws IOException {
CompressionCodec compressionCodec =
getHFileCompressionAlgorithm(paramsMap);
+ boolean enableBloomFilter = isHFileBloomFilterEnabled(paramsMap);
+ BloomFilter bloomFilter = enableBloomFilter ? createBloomFilter(paramsMap)
: null;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (OutputStream ostream = new DataOutputStream(baos)) {
HFileContext context = HFileContext.builder()
@@ -206,6 +215,8 @@ public class HFileUtils extends FileFormatUtils {
Option<HoodieSchemaField> keyField = writerSchema.getField(keyFieldName);
try (HFileWriter writer = new HFileWriterImpl(context, ostream)) {
String previousRecordKey = null;
+ String minRecordKey = null;
+ String maxRecordKey = null;
// It is assumed that the input records are sorted based on the record
key
// for HFile block
for (int i = 0; i < records.size(); i++) {
@@ -229,18 +240,43 @@ public class HFileUtils extends FileFormatUtils {
} catch (IOException e) {
throw new HoodieIOException("IOException serializing records", e);
}
+ if (enableBloomFilter) {
+ bloomFilter.add(recordKey);
+ if (minRecordKey == null) {
+ minRecordKey = recordKey;
+ }
+ maxRecordKey = recordKey;
+ }
previousRecordKey = recordKey;
}
writer.appendFileInfo(
HoodieAvroHFileReaderImplBase.SCHEMA_KEY,
getUTF8Bytes(readerSchema.toString()));
+ if (enableBloomFilter) {
+ appendBloomFilter(writer, minRecordKey, maxRecordKey, bloomFilter);
+ }
}
ostream.flush();
}
return baos;
}
+ public static void appendBloomFilter(HFileWriter writer, String
minRecordKey, String maxRecordKey, BloomFilter bloomFilter) {
+ writer.appendFileInfo(
+ HoodieAvroHFileReaderImplBase.KEY_MIN_RECORD,
+ getUTF8Bytes(minRecordKey == null ? StringUtils.EMPTY_STRING :
minRecordKey));
+ writer.appendFileInfo(
+ HoodieAvroHFileReaderImplBase.KEY_MAX_RECORD,
+ getUTF8Bytes(maxRecordKey == null ? StringUtils.EMPTY_STRING :
maxRecordKey));
+ writer.appendFileInfo(
+ HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_TYPE_CODE,
+ getUTF8Bytes(bloomFilter.getBloomFilterTypeCode().toString()));
+ writer.appendMetaInfo(
+ HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_META_BLOCK,
+ getUTF8Bytes(bloomFilter.serializeToString()));
+ }
+
/**
* Print the meta fields of the record of interest
*/
@@ -270,6 +306,19 @@ public class HFileUtils extends FileFormatUtils {
return Option.ofNullable(record.getRecordKey(readerSchema, keyFieldName));
}
+ private static boolean isHFileBloomFilterEnabled(Map<String, String>
paramsMap) {
+ return Boolean.parseBoolean(paramsMap.getOrDefault(
+ HFILE_WITH_BLOOM_FILTER_ENABLED.key(),
String.valueOf(HFILE_WITH_BLOOM_FILTER_ENABLED.defaultValue())));
+ }
+
+ private static BloomFilter createBloomFilter(Map<String, String> paramsMap) {
+ return BloomFilterFactory.createBloomFilter(
+
Integer.parseInt(paramsMap.getOrDefault(BLOOM_FILTER_NUM_ENTRIES_VALUE.key(),
BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue())),
+
Double.parseDouble(paramsMap.getOrDefault(BLOOM_FILTER_FPP_VALUE.key(),
BLOOM_FILTER_FPP_VALUE.defaultValue())),
+
Integer.parseInt(paramsMap.getOrDefault(BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key(),
BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue())),
+ paramsMap.getOrDefault(BLOOM_FILTER_TYPE.key(),
BLOOM_FILTER_TYPE.defaultValue()));
+ }
+
private static byte[] serializeRecord(HoodieRecord<?> record, HoodieSchema
schema, Option<HoodieSchemaField> keyField) throws IOException {
return record.toIndexedRecord(schema,
CollectionUtils.emptyProps()).map(HoodieAvroIndexedRecord::getData).map(indexedRecord
-> {
keyField.ifPresent(field -> indexedRecord.put(field.pos(),
StringUtils.EMPTY_STRING));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
index 1020d1a8c9f0..69b6dd9fd0ed 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
@@ -333,8 +333,8 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
private static BloomFilter readBloomFilter(HFileReader reader) throws
HoodieException {
try {
- ByteBuffer byteBuffer =
reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK).get();
- return BloomFilterFactory.fromByteBuffer(byteBuffer,
+ ByteBuffer byteBuffer =
reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK).orElse(null);
+ return byteBuffer == null ? null :
BloomFilterFactory.fromByteBuffer(byteBuffer,
fromUTF8Bytes(reader.getMetaInfo(new
UTF8StringKey(KEY_BLOOM_FILTER_TYPE_CODE)).get()));
} catch (IOException e) {
throw new HoodieException("Could not read bloom filter from HFile", e);
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java
index 9ddbf08e3e99..a7901c3e3e0f 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java
@@ -29,8 +29,11 @@ import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_DYN
import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE;
import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE;
import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_TYPE;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_WITH_BLOOM_FILTER_ENABLED;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestHoodieStorageConfig {
@Test
@@ -74,4 +77,15 @@ public class TestHoodieStorageConfig {
assertEquals(BLOOM_FILTER_FPP_VALUE.defaultValue(),
storageConfig.getString(BLOOM_FILTER_FPP_VALUE));
assertEquals(BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue(),
storageConfig.getString(BLOOM_FILTER_DYNAMIC_MAX_ENTRIES));
}
+
+ @Test
+ void testHFileBloomFilterBuilder() {
+ HoodieStorageConfig defaultStorageConfig =
HoodieStorageConfig.newBuilder().build();
+
assertTrue(defaultStorageConfig.getBoolean(HFILE_WITH_BLOOM_FILTER_ENABLED));
+
+ HoodieStorageConfig disabledStorageConfig =
HoodieStorageConfig.newBuilder()
+ .hfileBloomFilterEnable(false)
+ .build();
+
assertFalse(disabledStorageConfig.getBoolean(HFILE_WITH_BLOOM_FILTER_ENABLED));
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
index 1c862803c5f2..84566159b139 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
@@ -458,9 +458,10 @@ public class TestConfigUtils {
}
@Test
- void testBuildFileGroupReaderPropertiesIncludesMetadataFileCacheConfig() {
+ void testBuildFileGroupReaderPropertiesIncludesMetadataReaderConfigs() {
TypedProperties metadataProps = new TypedProperties();
metadataProps.setProperty(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB.key(),
"123");
+ metadataProps.setProperty(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(),
"true");
metadataProps.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(),
"false");
metadataProps.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key(),
"200000");
metadataProps.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES.key(),
"7");
@@ -472,8 +473,9 @@ public class TestConfigUtils {
TypedProperties fileGroupReaderProps =
ConfigUtils.buildFileGroupReaderProperties(metadataConfig, false);
assertEquals("123",
fileGroupReaderProps.getProperty(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB.key()));
+ assertEquals("true",
fileGroupReaderProps.getProperty(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key()));
assertEquals("false",
fileGroupReaderProps.getProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key()));
assertEquals("200000",
fileGroupReaderProps.getProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key()));
assertEquals("7",
fileGroupReaderProps.getProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES.key()));
}
-}
\ No newline at end of file
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
index fbf9b196f427..85660913304e 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
@@ -19,23 +19,61 @@
package org.apache.hudi.common.util;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.io.ByteArraySeekableDataInputStream;
+import org.apache.hudi.io.ByteBufferBackedInputStream;
import org.apache.hudi.io.compress.CompressionCodec;
+import org.apache.hudi.io.hfile.HFileReader;
+import org.apache.hudi.io.hfile.HFileReaderImpl;
+import org.apache.hudi.io.hfile.UTF8StringKey;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_TYPE;
import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_WITH_BLOOM_FILTER_ENABLED;
import static
org.apache.hudi.common.util.HFileUtils.getHFileCompressionAlgorithm;
+import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Tests {@link HFileUtils}
*/
public class TestHFileUtils {
+ private static final String KEY_FIELD_NAME = "key";
+ private static final HoodieSchema SCHEMA = HoodieSchema.parse("{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"HFileLogBlockRecord\",\n"
+ + " \"fields\": [\n"
+ + " {\"name\": \"" + KEY_FIELD_NAME + "\", \"type\": \"string\"},\n"
+ + " {\"name\": \"value\", \"type\": \"string\"}\n"
+ + " ]\n"
+ + "}");
+
@ParameterizedTest
@EnumSource(CompressionCodec.class)
public void testGetHFileCompressionAlgorithm(CompressionCodec algo) {
@@ -57,4 +95,57 @@ public class TestHFileUtils {
public void testGetDefaultHFileCompressionAlgorithm() {
assertEquals(CompressionCodec.GZIP,
getHFileCompressionAlgorithm(Collections.emptyMap()));
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void
testSerializeRecordsToLogBlockControlsBloomFilterMetadata(boolean
hfileBloomFilterEnabled) throws Exception {
+ Map<String, String> paramsMap = new HashMap<>();
+ paramsMap.put(HFILE_COMPRESSION_ALGORITHM_NAME.key(),
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue());
+ paramsMap.put(HFILE_WITH_BLOOM_FILTER_ENABLED.key(),
Boolean.toString(hfileBloomFilterEnabled));
+ paramsMap.put(BLOOM_FILTER_NUM_ENTRIES_VALUE.key(),
BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue());
+ paramsMap.put(BLOOM_FILTER_FPP_VALUE.key(),
BLOOM_FILTER_FPP_VALUE.defaultValue());
+ paramsMap.put(BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key(),
BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue());
+ paramsMap.put(BLOOM_FILTER_TYPE.key(), BLOOM_FILTER_TYPE.defaultValue());
+
+ ByteArrayOutputStream outputStream = new
HFileUtils().serializeRecordsToLogBlock(
+ null, createRecords(), SCHEMA, SCHEMA, KEY_FIELD_NAME, paramsMap);
+
+ try (HFileReader reader = new HFileReaderImpl(
+ new ByteArraySeekableDataInputStream(new
ByteBufferBackedInputStream(outputStream.toByteArray())),
+ outputStream.size())) {
+ reader.initializeMetadata();
+
+ assertEquals(hfileBloomFilterEnabled,
+
reader.getMetaBlock(HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_META_BLOCK).isPresent());
+ assertEquals(hfileBloomFilterEnabled,
+ reader.getMetaInfo(new
UTF8StringKey(HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_TYPE_CODE)).isPresent());
+ assertEquals(hfileBloomFilterEnabled,
+ reader.getMetaInfo(new
UTF8StringKey(HoodieAvroHFileReaderImplBase.KEY_MIN_RECORD)).isPresent());
+ assertEquals(hfileBloomFilterEnabled,
+ reader.getMetaInfo(new
UTF8StringKey(HoodieAvroHFileReaderImplBase.KEY_MAX_RECORD)).isPresent());
+
+ if (hfileBloomFilterEnabled) {
+ ByteBuffer bloomFilterBuffer =
reader.getMetaBlock(HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_META_BLOCK).get();
+ BloomFilter bloomFilter = BloomFilterFactory.fromByteBuffer(
+ bloomFilterBuffer,
+ fromUTF8Bytes(reader.getMetaInfo(new
UTF8StringKey(HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_TYPE_CODE)).get()));
+
+ assertTrue(bloomFilter.mightContain("key00"));
+ assertTrue(bloomFilter.mightContain("key02"));
+ assertFalse(bloomFilter.mightContain("missing-key"));
+ }
+ }
+ }
+
+ private static List<HoodieRecord> createRecords() {
+ List<HoodieRecord> records = new ArrayList<>();
+ for (int i = 0; i < 3; i++) {
+ String key = "key" + String.format("%02d", i);
+ GenericRecord record = new GenericData.Record(SCHEMA.toAvroSchema());
+ record.put(KEY_FIELD_NAME, key);
+ record.put("value", "value" + i);
+ records.add(new HoodieAvroIndexedRecord(new HoodieKey(key, ""), record));
+ }
+ return records;
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index dcd87550ede4..078f6b6779a3 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -232,16 +232,7 @@ public class FlinkWriteClients {
.parquetMaxFileSize(conf.get(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) * 1024 *
1024L)
.withWriteUtcTimezone(conf.get(FlinkOptions.WRITE_UTC_TIMEZONE))
.build())
- .withMetadataConfig(HoodieMetadataConfig.newBuilder()
- .withEngineType(EngineType.FLINK) // this affects the default
value inference
- .enable(conf.get(FlinkOptions.METADATA_ENABLED))
- .withRecordIndexFileGroupCount(
-
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(),
-
GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_DEFAULT)),
-
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(),
-
HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP.defaultValue()
+ "")))
-
.withMaxNumDeltaCommitsBeforeCompaction(conf.get(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
- .build())
+ .withMetadataConfig(getMetadataConfig(conf, false))
.withIndexConfig(StreamerUtil.getIndexConfig(conf))
.withPayloadConfig(getPayloadConfig(conf))
.withEmbeddedTimelineServerReuseEnabled(true) // make write client
embedded timeline service singleton
@@ -279,4 +270,41 @@ public class FlinkWriteClients {
}
return writeConfig;
}
+
+ /**
+ * Builds a {@link HoodieMetadataConfig} from the given Flink configuration,
including any
+ * additional metadata table properties set via raw config options.
+ *
+ * @param conf the Flink configuration.
+ * @return the {@link HoodieMetadataConfig} constructed from the Flink
configuration.
+ */
+ public static HoodieMetadataConfig getMetadataConfig(Configuration conf) {
+ return getMetadataConfig(conf, true);
+ }
+
+ /**
+ * Builds a {@link HoodieMetadataConfig} from the given Flink configuration.
+ *
+ * @param conf the Flink configuration.
+ * @param includeAllProperties whether to also apply any additional metadata
table properties
+ * set via raw config options on top of the
well-known Flink options.
+ * @return the {@link HoodieMetadataConfig} constructed from the Flink
configuration.
+ */
+ public static HoodieMetadataConfig getMetadataConfig(Configuration conf,
boolean includeAllProperties) {
+ HoodieMetadataConfig.Builder metadataConfigBuilder =
+ HoodieMetadataConfig.newBuilder()
+ .withEngineType(EngineType.FLINK) // this affects the default value
inference
+ .enable(conf.get(FlinkOptions.METADATA_ENABLED))
+ .withRecordIndexFileGroupCount(
+
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(),
+ GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_DEFAULT)),
+
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(),
+
HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP.defaultValue()
+ "")))
+
.withMaxNumDeltaCommitsBeforeCompaction(conf.get(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS));
+
+ if (includeAllProperties) {
+ metadataConfigBuilder.fromProperties(flinkConf2TypedProperties(conf));
+ }
+ return metadataConfigBuilder.build();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 4190bf07a458..7e9fb63d1d90 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -98,7 +98,6 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.stream.Collectors;
import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
@@ -733,12 +732,7 @@ public class StreamerUtil {
* @return HoodieMetadataConfig constructed from flink configuration.
*/
public static HoodieMetadataConfig
metadataConfig(org.apache.flink.configuration.Configuration conf) {
- Properties properties = new Properties();
-
- // set up metadata.enabled=true in table DDL to enable metadata listing
- properties.put(HoodieMetadataConfig.ENABLE.key(),
conf.get(FlinkOptions.METADATA_ENABLED));
-
- return
HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
+ return FlinkWriteClients.getMetadataConfig(conf);
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index f1f809bb43d2..09810de5c657 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -21,29 +21,54 @@ package org.apache.hudi.table.format;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.HoodieDataUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.io.ByteArraySeekableDataInputStream;
+import org.apache.hudi.io.ByteBufferBackedInputStream;
import org.apache.hudi.io.FileGroupReaderBasedMergeHandle;
import org.apache.hudi.io.HoodieWriteMergeHandle;
+import org.apache.hudi.io.SeekableDataInputStream;
+import org.apache.hudi.io.hfile.HFileReader;
+import org.apache.hudi.io.hfile.HFileReaderImpl;
+import org.apache.hudi.io.hfile.UTF8StringKey;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.MetadataPartitionType;
import org.apache.hudi.source.IncrementalInputSplits;
import org.apache.hudi.source.prune.PartitionPruners;
+import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.HoodieTableSource;
import org.apache.hudi.table.format.cdc.CdcInputFormat;
@@ -81,6 +106,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
@@ -92,6 +118,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
+import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
import static org.apache.hudi.utils.TestData.insertRow;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
@@ -170,6 +197,58 @@ public class TestInputFormat {
assertThat(actual, is(expected));
}
+ @Test
+ void testRecordIndexLogBlocksContainBloomFilter() throws Exception {
+ Map<String, String> options = new HashMap<>();
+ options.put(FlinkOptions.INDEX_TYPE.key(),
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
+
options.put(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
+ options.put(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), "true");
+ options.put(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(), "true");
+ options.put(HoodieStorageConfig.HFILE_WITH_BLOOM_FILTER_ENABLED.key(),
"true");
+ beforeEach(HoodieTableType.COPY_ON_WRITE, options);
+
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ HoodieStorage storage = StreamerUtil.createMetaClient(conf).getStorage();
+ StoragePath recordIndexPath = new StoragePath(
+
HoodieTableMetadata.getMetadataTableBasePath(conf.get(FlinkOptions.PATH)),
+ MetadataPartitionType.RECORD_INDEX.getPartitionPath());
+ List<StoragePathInfo> logFiles =
FSUtils.getAllDataFilesInPartition(storage, recordIndexPath).stream()
+ .filter(pathInfo -> FSUtils.isLogFile(pathInfo.getPath()))
+ .collect(Collectors.toList());
+
+ assertFalse(logFiles.isEmpty(), "The MDT record index partition should
contain log files");
+ int hfileDataBlockCount = 0;
+ for (StoragePathInfo logFile : logFiles) {
+ HoodieSchema schema = TableSchemaResolver.readSchemaFromLogFile(storage,
logFile.getPath());
+ try (HoodieLogFormat.Reader reader =
+ HoodieLogFormat.newReader(storage, new HoodieLogFile(logFile),
schema)) {
+ while (reader.hasNext()) {
+ HoodieLogBlock logBlock = reader.next();
+ if (logBlock instanceof HoodieHFileDataBlock) {
+ assertBloomFilterContainsWrittenKey(storage, logBlock);
+ hfileDataBlockCount++;
+ }
+ }
+ }
+ }
+ assertTrue(hfileDataBlockCount > 0, "The MDT record index log files should
contain HFile data blocks");
+
+ HoodieTableMetadata metadataTable =
StreamerUtil.createMetaClient(conf).getTableFormat().getMetadataFactory().create(
+ HoodieFlinkEngineContext.DEFAULT,
+ storage,
+ StreamerUtil.metadataConfig(conf),
+ conf.get(FlinkOptions.PATH));
+ List<String> recordKeys = Arrays.asList(
+ "id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8", "missing-key");
+ Map<String, HoodieRecordGlobalLocation> recordLocations =
HoodieDataUtils.dedupeAndCollectAsMap(
+
metadataTable.readRecordIndexLocationsWithKeys(HoodieListData.eager(recordKeys)));
+
+ assertThat(recordLocations.size(), is(8));
+ recordKeys.subList(0, 8).forEach(key ->
assertTrue(recordLocations.containsKey(key)));
+ assertFalse(recordLocations.containsKey("missing-key"));
+ }
+
@ParameterizedTest
@ValueSource(strings = {"PARQUET", "LANCE"})
void testReadBaseAndLogFiles(String baseFileFormat) throws Exception {
@@ -1519,6 +1598,33 @@ public class TestInputFormat {
conf);
}
+ private static void assertBloomFilterContainsWrittenKey(
+ HoodieStorage storage, HoodieLogBlock logBlock) throws IOException {
+ HoodieLogBlock.HoodieLogBlockContentLocation contentLocation =
+ logBlock.getBlockContentLocation().get();
+ byte[] hfileContent = new
byte[Math.toIntExact(contentLocation.getBlockSize())];
+ try (SeekableDataInputStream inputStream =
+ storage.openSeekable(contentLocation.getLogFile().getPath(),
false)) {
+ inputStream.seek(contentLocation.getContentPositionInLogFile());
+ inputStream.readFully(hfileContent);
+ }
+
+ try (HFileReader hfileReader = new HFileReaderImpl(
+ new ByteArraySeekableDataInputStream(new
ByteBufferBackedInputStream(hfileContent)),
+ hfileContent.length)) {
+ hfileReader.initializeMetadata();
+ ByteBuffer bloomFilterBuffer = hfileReader.getMetaBlock(
+ HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_META_BLOCK).get();
+ String bloomFilterType = fromUTF8Bytes(hfileReader.getMetaInfo(
+ new
UTF8StringKey(HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_TYPE_CODE)).get());
+ BloomFilter bloomFilter =
BloomFilterFactory.fromByteBuffer(bloomFilterBuffer, bloomFilterType);
+
+ assertTrue(hfileReader.seekTo(), "The HFile data block should contain at
least one record");
+ String writtenKey =
hfileReader.getKeyValue().get().getKey().getContentInString();
+ assertTrue(bloomFilter.mightContain(writtenKey));
+ }
+ }
+
private static List<RowData> readData(InputFormat inputFormat) throws
IOException {
return readData(inputFormat, TestConfigurations.SERIALIZER);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
index a3908f9bcf21..3910b91daebd 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
@@ -25,6 +25,7 @@ import
org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
@@ -230,6 +231,55 @@ public class TestFlinkWriteClients {
assertThat(mergerClasses,
is(PartialUpdateFlinkRecordMerger.class.getName()));
}
+ @Test
+ void testGetMetadataConfigWithoutAdditionalProperties() {
+ conf.set(FlinkOptions.METADATA_ENABLED, true);
+ conf.set(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 5);
+ // raw metadata table properties that are not exposed through FlinkOptions
+ conf.setString(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(), "true");
+ conf.setString(HoodieMetadataConfig.BLOOM_FILTER_TYPE.key(),
BloomFilterTypeCode.SIMPLE.name());
+
+ HoodieMetadataConfig metadataConfig =
FlinkWriteClients.getMetadataConfig(conf, false);
+
+ assertTrue(metadataConfig.isEnabled());
+ assertEquals(5,
metadataConfig.getInt(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS));
+ // raw properties are not applied when includeAllProperties is false
+ assertFalse(metadataConfig.enableBloomFilter());
+ }
+
+ @Test
+ void testGetMetadataConfigWithAdditionalProperties() {
+ conf.set(FlinkOptions.METADATA_ENABLED, true);
+ conf.set(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 5);
+ conf.setString(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(), "true");
+ conf.setString(HoodieMetadataConfig.BLOOM_FILTER_TYPE.key(),
BloomFilterTypeCode.SIMPLE.name());
+ conf.setString(HoodieMetadataConfig.BLOOM_FILTER_NUM_ENTRIES.key(),
"12345");
+
+ HoodieMetadataConfig metadataConfig =
FlinkWriteClients.getMetadataConfig(conf);
+
+ assertTrue(metadataConfig.isEnabled());
+ assertEquals(5,
metadataConfig.getInt(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS));
+ // raw properties are applied when includeAllProperties is true (the
default)
+ assertTrue(metadataConfig.enableBloomFilter());
+ assertEquals(BloomFilterTypeCode.SIMPLE.name(),
metadataConfig.getBloomFilterType());
+ assertEquals(12345, metadataConfig.getBloomFilterNumEntries());
+ }
+
+ @Test
+ void testHoodieClientConfigMetadataConfigConsistentWithGetMetadataConfig()
throws Exception {
+ conf.set(FlinkOptions.METADATA_ENABLED, true);
+ conf.setString(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(), "true");
+ conf.setString(HoodieMetadataConfig.BLOOM_FILTER_TYPE.key(),
BloomFilterTypeCode.SIMPLE.name());
+ StreamerUtil.initTableIfNotExists(conf);
+
+ HoodieMetadataConfig directMetadataConfig =
FlinkWriteClients.getMetadataConfig(conf);
+ HoodieWriteConfig writeConfig =
FlinkWriteClients.getHoodieClientConfig(conf);
+
+ assertEquals(directMetadataConfig.isEnabled(),
writeConfig.isMetadataTableEnabled());
+ assertEquals(directMetadataConfig.enableBloomFilter(),
writeConfig.getMetadataConfig().enableBloomFilter());
+ assertEquals(directMetadataConfig.getBloomFilterType(),
writeConfig.getMetadataConfig().getBloomFilterType());
+ }
+
@Test
void testWriteMergeHandleForPreV9Table() throws Exception {
conf.set(FlinkOptions.WRITE_TABLE_VERSION,
HoodieTableVersion.EIGHT.versionCode());
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
index 13df2e842ab2..7b2a981377f0 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
@@ -19,6 +19,8 @@
package org.apache.hudi.utils;
import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -26,6 +28,7 @@ import org.apache.hudi.common.model.PartialUpdateAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -65,6 +68,27 @@ class TestStreamerUtil {
@TempDir
File tempFile;
+ @Test
+ void testMetadataConfigIncludesMetadataTableBloomFilterSettings() {
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.set(FlinkOptions.METADATA_ENABLED, true);
+ conf.setString(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(), "true");
+ conf.setString(HoodieMetadataConfig.BLOOM_FILTER_TYPE.key(),
BloomFilterTypeCode.SIMPLE.name());
+ conf.setString(HoodieMetadataConfig.BLOOM_FILTER_NUM_ENTRIES.key(),
"12345");
+ conf.setString(HoodieMetadataConfig.BLOOM_FILTER_FPP.key(), "0.005");
+
conf.setString(HoodieMetadataConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key(),
"23456");
+ conf.setString(FileSystemViewStorageConfig.REMOTE_HOST_NAME.key(),
"localhost");
+
+ HoodieMetadataConfig metadataConfig = StreamerUtil.metadataConfig(conf);
+
+ assertTrue(metadataConfig.isEnabled());
+ assertTrue(metadataConfig.enableBloomFilter());
+ assertEquals(BloomFilterTypeCode.SIMPLE.name(),
metadataConfig.getBloomFilterType());
+ assertEquals(12345, metadataConfig.getBloomFilterNumEntries());
+ assertEquals(0.005, metadataConfig.getBloomFilterFpp());
+ assertEquals(23456, metadataConfig.getDynamicBloomFilterMaxNumEntries());
+ }
+
@Test
void testInferMergingBehavior() {
Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileReaderFactory.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileReaderFactory.java
index a0512e84522d..ccc936a66f75 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileReaderFactory.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileReaderFactory.java
@@ -20,6 +20,7 @@
package org.apache.hudi.io.storage.hadoop;
import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.io.storage.HFileReaderFactory;
@@ -51,8 +52,7 @@ public class HoodieAvroFileReaderFactory extends
HoodieFileReaderFactory {
HFileReaderFactory readerFactory = HFileReaderFactory.builder()
.withStorage(storage).withProps(hoodieConfig.getProps())
.withPath(path).build();
- return HoodieNativeAvroHFileReader.builder()
- .readerFactory(readerFactory).path(path).schema(schemaOption).build();
+ return newNativeHFileReader(hoodieConfig, readerFactory, path,
schemaOption);
}
protected HoodieFileReader newHFileFileReader(HoodieConfig hoodieConfig,
StoragePathInfo pathInfo,
@@ -60,8 +60,7 @@ public class HoodieAvroFileReaderFactory extends
HoodieFileReaderFactory {
HFileReaderFactory readerFactory = HFileReaderFactory.builder()
.withStorage(storage).withProps(hoodieConfig.getProps())
.withPath(pathInfo.getPath()).withFileSize(pathInfo.getLength()).build();
- return HoodieNativeAvroHFileReader.builder()
-
.readerFactory(readerFactory).path(pathInfo.getPath()).schema(schemaOption).build();
+ return newNativeHFileReader(hoodieConfig, readerFactory,
pathInfo.getPath(), schemaOption);
}
@Override
@@ -73,8 +72,19 @@ public class HoodieAvroFileReaderFactory extends
HoodieFileReaderFactory {
HFileReaderFactory.Builder readerFactoryBuilder =
HFileReaderFactory.builder()
.withStorage(storage).withProps(hoodieConfig.getProps())
.withContent(content);
+ return newNativeHFileReader(hoodieConfig, readerFactoryBuilder.build(),
path, schemaOption);
+ }
+
+ private HoodieNativeAvroHFileReader newNativeHFileReader(HoodieConfig
hoodieConfig,
+ HFileReaderFactory
readerFactory,
+ StoragePath path,
+
Option<HoodieSchema> schemaOption) {
return HoodieNativeAvroHFileReader.builder()
-
.readerFactory(readerFactoryBuilder.build()).path(path).schema(schemaOption).build();
+ .readerFactory(readerFactory)
+ .path(path)
+ .schema(schemaOption)
+
.useBloomFilter(hoodieConfig.getBoolean(HoodieMetadataConfig.BLOOM_FILTER_ENABLE))
+ .build();
}
@Override
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
index c474df450ad8..83ebc7159e01 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
@@ -109,7 +109,8 @@ public class HoodieAvroFileWriterFactory extends
HoodieFileWriterFactory {
protected HoodieFileWriter newHFileFileWriter(
String instantTime, StoragePath path, HoodieConfig config, HoodieSchema
schema,
TaskContextSupplier taskContextSupplier) throws IOException {
- BloomFilter filter = createBloomFilter(config);
+ BloomFilter filter =
config.getBooleanOrDefault(HoodieStorageConfig.HFILE_WITH_BLOOM_FILTER_ENABLED)
+ ? createBloomFilter(config) : null;
HoodieHFileConfig hfileConfig = new HoodieHFileConfig(
storage.getConf(),
CompressionCodec.findCodecByName(
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroHFileWriter.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroHFileWriter.java
index 9031390af1bb..15f088cbd8aa 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroHFileWriter.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroHFileWriter.java
@@ -20,11 +20,11 @@
package org.apache.hudi.io.storage.hadoop;
import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.util.HFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieDuplicateKeyException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
@@ -169,23 +169,7 @@ public class HoodieAvroHFileWriter
@Override
public void close() throws IOException {
if (hfileConfig.useBloomFilter()) {
- final BloomFilter bloomFilter = hfileConfig.getBloomFilter();
- if (minRecordKey == null) {
- minRecordKey = "";
- }
- if (maxRecordKey == null) {
- maxRecordKey = "";
- }
- writer.appendFileInfo(
- HoodieAvroHFileReaderImplBase.KEY_MIN_RECORD,
getUTF8Bytes(minRecordKey));
- writer.appendFileInfo(
- HoodieAvroHFileReaderImplBase.KEY_MAX_RECORD,
getUTF8Bytes(maxRecordKey));
- writer.appendFileInfo(
- HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_TYPE_CODE,
- getUTF8Bytes(bloomFilter.getBloomFilterTypeCode().toString()));
- writer.appendMetaInfo(
- HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_META_BLOCK,
- getUTF8Bytes(bloomFilter.serializeToString()));
+ HFileUtils.appendBloomFilter(writer, minRecordKey, maxRecordKey,
hfileConfig.getBloomFilter());
}
writer.close();
writer = null;
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
index 337ad4d2b8f9..af17d8447884 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
@@ -19,22 +19,34 @@
package org.apache.hudi.io.hadoop;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader;
import org.apache.hudi.io.storage.hadoop.HoodieAvroOrcReader;
import org.apache.hudi.io.storage.hadoop.HoodieAvroParquetReader;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
+import java.lang.reflect.Field;
import static
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -45,6 +57,8 @@ public class TestHoodieAvroFileReaderFactory {
@TempDir
public java.nio.file.Path tempDir;
+ private static final StoragePath HFILE_PATH = new
StoragePath("/partition/path/f1_1-0-1_000.hfile");
+
@Test
public void testGetFileReader() throws IOException {
// parquet file format.
@@ -70,4 +84,56 @@ public class TestHoodieAvroFileReaderFactory {
.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, orcPath);
assertTrue(orcReader instanceof HoodieAvroOrcReader);
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testHFileReaderPassesBloomFilterConfig(boolean
bloomFilterEnabled)
+ throws IOException, ReflectiveOperationException {
+ HoodieStorage storage = HoodieTestUtils.getDefaultStorage();
+ HoodieFileReaderFactory readerFactory =
HoodieIOFactory.getIOFactory(storage)
+ .getReaderFactory(HoodieRecordType.AVRO);
+ HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+ .withProperties(DEFAULT_HUDI_CONFIG_FOR_READER.getProps())
+ .withProperties(bloomFilterProps(bloomFilterEnabled))
+ .build();
+
+ assertBloomFilterConfig(readerFactory.getFileReader(metadataConfig,
HFILE_PATH),
+ bloomFilterEnabled);
+ assertBloomFilterConfig(readerFactory.getFileReader(metadataConfig,
+ new StoragePathInfo(HFILE_PATH, 100, false, (short) 0, 0, 0),
HoodieFileFormat.HFILE,
+ Option.<HoodieSchema>empty()), bloomFilterEnabled);
+ assertBloomFilterConfig(readerFactory.getContentReader(metadataConfig,
HFILE_PATH, HoodieFileFormat.HFILE,
+ storage, new byte[0], Option.<HoodieSchema>empty()),
+ bloomFilterEnabled);
+ }
+
+ @Test
+ public void testHFileReaderDoesNotUseBloomFilterByDefault()
+ throws IOException, ReflectiveOperationException {
+ HoodieStorage storage = HoodieTestUtils.getDefaultStorage();
+ HoodieFileReaderFactory readerFactory =
HoodieIOFactory.getIOFactory(storage)
+ .getReaderFactory(HoodieRecordType.AVRO);
+
+
assertBloomFilterConfig(readerFactory.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER,
HFILE_PATH), false);
+
assertBloomFilterConfig(readerFactory.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER,
+ new StoragePathInfo(HFILE_PATH, 100, false, (short) 0, 0, 0),
HoodieFileFormat.HFILE,
+ Option.<HoodieSchema>empty()), false);
+
assertBloomFilterConfig(readerFactory.getContentReader(DEFAULT_HUDI_CONFIG_FOR_READER,
HFILE_PATH,
+ HoodieFileFormat.HFILE, storage, new byte[0],
Option.<HoodieSchema>empty()),
+ false);
+ }
+
+ private static TypedProperties bloomFilterProps(boolean enableBloomFilter) {
+ TypedProperties properties = new TypedProperties();
+ properties.setProperty(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(),
Boolean.toString(enableBloomFilter));
+ return properties;
+ }
+
+ private static void assertBloomFilterConfig(HoodieFileReader reader, boolean
expectedBloomFilterEnabled)
+ throws ReflectiveOperationException {
+ HoodieNativeAvroHFileReader hfileReader =
assertInstanceOf(HoodieNativeAvroHFileReader.class, reader);
+ Field useBloomFilterField =
HoodieNativeAvroHFileReader.class.getDeclaredField("useBloomFilter");
+ useBloomFilterField.setAccessible(true);
+ assertEquals(expectedBloomFilterEnabled,
useBloomFilterField.getBoolean(hfileReader));
+ }
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
index d7a112a5f2d7..f2225ff3ca66 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.bloom.BloomFilterTypeCode;
import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
@@ -53,6 +53,7 @@ import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
@@ -74,7 +75,6 @@ import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.TreeMap;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -99,6 +99,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
@@ -157,18 +158,22 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
@Override
protected HoodieAvroHFileWriter createWriter(
HoodieSchema schema, boolean populateMetaFields) throws Exception {
+ return createWriter(schema, populateMetaFields, true);
+ }
+
+ protected HoodieAvroHFileWriter createWriter(
+ HoodieSchema schema, boolean populateMetaFields, boolean
hfileBloomFilterEnabled) throws Exception {
String instantTime = "000";
HoodieStorage storage = HoodieTestUtils.getStorage(getFilePath());
Properties props = new Properties();
props.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(),
Boolean.toString(populateMetaFields));
- TaskContextSupplier mockTaskContextSupplier =
mock(TaskContextSupplier.class);
- Supplier<Integer> partitionSupplier = mock(Supplier.class);
-
when(mockTaskContextSupplier.getPartitionIdSupplier()).thenReturn(partitionSupplier);
- when(partitionSupplier.get()).thenReturn(10);
return (HoodieAvroHFileWriter) HoodieFileWriterFactory.getFileWriter(
- instantTime, getFilePath(), storage,
HoodieStorageConfig.newBuilder().fromProperties(props).build(), schema,
- mockTaskContextSupplier, HoodieRecord.HoodieRecordType.AVRO);
+ instantTime, getFilePath(), storage, HoodieStorageConfig.newBuilder()
+ .fromProperties(props)
+ .hfileBloomFilterEnable(hfileBloomFilterEnabled)
+ .build(), schema,
+ new LocalTaskContextSupplier(), HoodieRecord.HoodieRecordType.AVRO);
}
@Override
@@ -260,6 +265,64 @@ public class TestHoodieHFileReaderWriter extends
TestHoodieReaderWriterBase {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testHFileBloomFilterMetadataWriteConfig(boolean
hfileBloomFilterEnabled) throws Exception {
+ HoodieSchema schema =
getHoodieSchemaFromResource(TestHoodieReaderWriterBase.class,
"/exampleSchema.avsc");
+ HoodieAvroHFileWriter writer = createWriter(schema, true,
hfileBloomFilterEnabled);
+ for (int i = 0; i < 3; i++) {
+ GenericRecord record = new GenericData.Record(schema.getAvroSchema());
+ String key = "key" + String.format("%02d", i);
+ record.put("_row_key", key);
+ record.put("time", Integer.toString(i));
+ record.put("number", i);
+ writer.writeAvro(key, record);
+ }
+ writer.close();
+
+ HoodieStorage storage = HoodieTestUtils.getStorage(getFilePath());
+ try (HFileReader hfileReader = HFileReaderFactory.builder()
+ .withStorage(storage)
+ .withProps(DEFAULT_PROPS)
+ .withPath(getFilePath())
+ .build()
+ .createHFileReader()) {
+ hfileReader.initializeMetadata();
+ assertEquals(hfileBloomFilterEnabled,
hfileReader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK).isPresent());
+ assertEquals(hfileBloomFilterEnabled, hfileReader.getMetaInfo(new
UTF8StringKey(KEY_BLOOM_FILTER_TYPE_CODE)).isPresent());
+ assertEquals(hfileBloomFilterEnabled, hfileReader.getMetaInfo(new
UTF8StringKey(KEY_MIN_RECORD)).isPresent());
+ assertEquals(hfileBloomFilterEnabled, hfileReader.getMetaInfo(new
UTF8StringKey(KEY_MAX_RECORD)).isPresent());
+ assertTrue(hfileReader.getMetaInfo(new
UTF8StringKey(SCHEMA_KEY)).isPresent());
+ }
+ }
+
+ @Test
+ public void testPointLookupFallsBackWhenBloomFilterMetadataIsMissing()
throws Exception {
+ HoodieSchema schema =
getHoodieSchemaFromResource(TestHoodieReaderWriterBase.class,
"/exampleSchema.avsc");
+ HoodieAvroHFileWriter writer = createWriter(schema, true, false);
+ for (int i = 0; i < 3; i++) {
+ GenericRecord record = new GenericData.Record(schema.getAvroSchema());
+ String key = "key" + String.format("%02d", i);
+ record.put("_row_key", key);
+ record.put("time", Integer.toString(i));
+ record.put("number", i);
+ writer.writeAvro(key, record);
+ }
+ writer.close();
+
+ HoodieStorage storage = HoodieTestUtils.getStorage(getFilePath());
+ try (HoodieAvroHFileReaderImplBase hfileReader = createReader(storage,
true)) {
+ assertNull(hfileReader.readBloomFilter());
+
+ List<IndexedRecord> records = HoodieAvroHFileReaderImplBase.readRecords(
+ hfileReader, new ArrayList<>(Arrays.asList("key00", "key02",
"missing-key")));
+
+ assertEquals(2, records.size());
+ assertEquals("key00", ((GenericRecord)
records.get(0)).get("_row_key").toString());
+ assertEquals("key02", ((GenericRecord)
records.get(1)).get("_row_key").toString());
+ }
+ }
+
@Disabled("Disable the test with evolved schema for HFile since it's not
supported")
@ParameterizedTest
@Override