This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 17b396ebc87f feat(mdt): Support writing bloom filter for hfile log 
block (#18936)
17b396ebc87f is described below

commit 17b396ebc87f53c85eeb448554c21eb423b44562
Author: Shuo Cheng <[email protected]>
AuthorDate: Thu Jun 11 10:23:49 2026 +0800

    feat(mdt): Support writing bloom filter for hfile log block (#18936)
    
    * feat(mdt): Support writing bloom filter for hfile log block
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  15 ++-
 .../hudi/metadata/HoodieMetadataWriteUtils.java    |  11 ++-
 .../metadata/TestHoodieMetadataWriteUtils.java     |  18 ++++
 .../hudi/common/config/HoodieStorageConfig.java    |  12 +++
 .../common/table/log/block/HoodieDataBlock.java    |   4 +-
 .../table/log/block/HoodieHFileDataBlock.java      |  21 ++--
 .../org/apache/hudi/common/util/ConfigUtils.java   |   2 +
 .../org/apache/hudi/common/util/HFileUtils.java    |  49 ++++++++++
 .../io/storage/HoodieNativeAvroHFileReader.java    |   4 +-
 .../common/config/TestHoodieStorageConfig.java     |  14 +++
 .../apache/hudi/common/util/TestConfigUtils.java   |   6 +-
 .../apache/hudi/common/util/TestHFileUtils.java    |  91 ++++++++++++++++++
 .../org/apache/hudi/util/FlinkWriteClients.java    |  48 ++++++++--
 .../java/org/apache/hudi/util/StreamerUtil.java    |   8 +-
 .../apache/hudi/table/format/TestInputFormat.java  | 106 +++++++++++++++++++++
 .../apache/hudi/utils/TestFlinkWriteClients.java   |  50 ++++++++++
 .../org/apache/hudi/utils/TestStreamerUtil.java    |  24 +++++
 .../hadoop/HoodieAvroFileReaderFactory.java        |  20 +++-
 .../hadoop/HoodieAvroFileWriterFactory.java        |   3 +-
 .../io/storage/hadoop/HoodieAvroHFileWriter.java   |  20 +---
 .../io/hadoop/TestHoodieAvroFileReaderFactory.java |  66 +++++++++++++
 .../io/hadoop/TestHoodieHFileReaderWriter.java     |  79 +++++++++++++--
 23 files changed, 608 insertions(+), 67 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 9c710fd9953a..9e4263a4fbdd 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2451,6 +2451,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return 
getBooleanOrDefault(HoodieStorageConfig.PARQUET_WITH_BLOOM_FILTER_ENABLED);
   }
 
+  public boolean hfileBloomFilterEnabled() {
+    return 
getBooleanOrDefault(HoodieStorageConfig.HFILE_WITH_BLOOM_FILTER_ENABLED);
+  }
+
   public Option<HoodieLogBlock.HoodieLogBlockType> getLogDataBlockFormat() {
     return 
Option.ofNullable(getString(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT))
         .map(HoodieLogBlock.HoodieLogBlockType::fromId);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 3b825bbf1ade..1e0f0adf250f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -84,6 +84,12 @@ import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_TYPE;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_WITH_BLOOM_FILTER_ENABLED;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.PARTITION_NAME_COLUMN_STATS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.collectColumnRangeMetadata;
 
@@ -726,8 +732,15 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
         // Not supporting positions in HFile data blocks
         
header.remove(HeaderMetadataType.BASE_FILE_INSTANT_TIME_OF_RECORD_POSITIONS);
         records.sort(Comparator.comparing(HoodieRecord::getRecordKey));
+        Map<String, String> hfileParams = new HashMap<>();
+        hfileParams.put(HFILE_COMPRESSION_ALGORITHM_NAME.key(), 
writeConfig.getHFileCompressionAlgorithm());
+        hfileParams.put(HFILE_WITH_BLOOM_FILTER_ENABLED.key(), 
Boolean.toString(writeConfig.hfileBloomFilterEnabled()));
+        hfileParams.put(BLOOM_FILTER_NUM_ENTRIES_VALUE.key(), 
Integer.toString(writeConfig.getBloomFilterNumEntries()));
+        hfileParams.put(BLOOM_FILTER_FPP_VALUE.key(), 
Double.toString(writeConfig.getBloomFilterFPP()));
+        hfileParams.put(BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key(), 
Integer.toString(writeConfig.getDynamicBloomFilterMaxNumEntries()));
+        hfileParams.put(BLOOM_FILTER_TYPE.key(), 
writeConfig.getBloomFilterType());
         return new HoodieHFileDataBlock(
-            records, header, writeConfig.getHFileCompressionAlgorithm(), new 
StoragePath(writeConfig.getBasePath()));
+            records, header, writeConfig.getHFileCompressionAlgorithm(), 
hfileParams, new StoragePath(writeConfig.getBasePath()));
       case PARQUET_DATA_BLOCK:
         return new HoodieParquetDataBlock(
             records,
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index 7d6cef77c63a..3992b65dfdd0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -259,11 +259,12 @@ public class HoodieMetadataWriteUtils {
             // Keeping the log blocks as large as the log files themselves 
reduces the number of HFile blocks to be checked for
             // presence of keys
             .logFileDataBlockMaxSize(maxLogFileSizeBytes)
-                               
.withBloomFilterType(writeConfig.getMetadataConfig().getBloomFilterType())
-                               
.withBloomFilterNumEntries(writeConfig.getMetadataConfig().getBloomFilterNumEntries())
-                               
.withBloomFilterFpp(writeConfig.getMetadataConfig().getBloomFilterFpp())
-                               
.withBloomFilterDynamicMaxEntries(writeConfig.getMetadataConfig().getDynamicBloomFilterMaxNumEntries())
-                               .build())
+            .hfileBloomFilterEnable(writeConfig.hfileBloomFilterEnabled())
+            
.withBloomFilterType(writeConfig.getMetadataConfig().getBloomFilterType())
+            
.withBloomFilterNumEntries(writeConfig.getMetadataConfig().getBloomFilterNumEntries())
+            
.withBloomFilterFpp(writeConfig.getMetadataConfig().getBloomFilterFpp())
+            
.withBloomFilterDynamicMaxEntries(writeConfig.getMetadataConfig().getDynamicBloomFilterMaxNumEntries())
+            .build())
         .withRollbackParallelism(MDT_DEFAULT_PARALLELISM)
         .withFinalizeWriteParallelism(MDT_DEFAULT_PARALLELISM)
         
.withKeyGenerator(HoodieTableMetadataKeyGenerator.class.getCanonicalName())
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
index 73d10ef228d2..333d212a1d50 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
@@ -24,6 +24,7 @@ import 
org.apache.hudi.client.transaction.lock.StorageBasedLockProvider;
 import 
org.apache.hudi.client.transaction.lock.ZookeeperBasedImplicitBasePathLockProvider;
 import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.model.ActionType;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
@@ -35,6 +36,8 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Properties;
 
@@ -47,6 +50,21 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieMetadataWriteUtils {
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void 
testCreateMetadataWriteConfigPropagatesHFileBloomFilterSetting(boolean 
hfileBloomFilterEnabled) {
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath("/tmp/base_path/")
+        .withStorageConfig(HoodieStorageConfig.newBuilder()
+            .hfileBloomFilterEnable(hfileBloomFilterEnabled)
+            .build())
+        .build();
+
+    HoodieWriteConfig metadataWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(
+        writeConfig, HoodieFailedWritesCleaningPolicy.EAGER, 
HoodieTableVersion.EIGHT);
+    assertEquals(hfileBloomFilterEnabled, 
metadataWriteConfig.hfileBloomFilterEnabled());
+  }
+
   @Test
   public void testCreateMetadataWriteConfigForCleaner() {
     HoodieWriteConfig writeConfig1 = HoodieWriteConfig.newBuilder()
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index 17f2bc6a9b89..cbdf5cc223e0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -125,6 +125,13 @@ public class HoodieStorageConfig extends HoodieConfig {
           + "reader for footer/metadata operations (schema, bloom filter). 
Independent of the "
           + "data allocator since metadata allocations are small and 
short-lived.");
 
+  public static final ConfigProperty<Boolean> HFILE_WITH_BLOOM_FILTER_ENABLED 
= ConfigProperty
+      .key("hoodie.hfile.bloom.filter.enabled")
+      .defaultValue(true)
+      .markAdvanced()
+      .sinceVersion("1.3.0")
+      .withDocumentation("Control whether to write bloom filter or not to 
HFile.");
+
   public static final ConfigProperty<Boolean> HFILE_WRITER_TO_ALLOW_DUPLICATES 
= ConfigProperty
       .key("hoodie.hfile.writes.allow.duplicates")
       .defaultValue(false)
@@ -579,6 +586,11 @@ public class HoodieStorageConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder hfileBloomFilterEnable(boolean hfileBloomFilterEnable) {
+      storageConfig.setValue(HFILE_WITH_BLOOM_FILTER_ENABLED, 
String.valueOf(hfileBloomFilterEnable));
+      return this;
+    }
+
     public Builder parquetVariantWriteShreddingEnabled(boolean enabled) {
       storageConfig.setValue(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED, 
String.valueOf(enabled));
       return this;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index aee07ea947a7..70ac694dedfe 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -263,7 +263,7 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
   public final <T> ClosableIterator<T> 
getEngineRecordIterator(HoodieReaderContext<T> readerContext, List<String> 
keys, boolean fullKey) throws IOException {
     boolean fullScan = keys.isEmpty();
     if (!fullScan) {
-      return lookupEngineRecords(keys, fullKey);
+      return lookupEngineRecords(readerContext, keys, fullKey);
     } else {
       throw new IllegalStateException("Unexpected code reached. Expected to be 
called only with keySpec defined for non FILES partition in Metadata table");
     }
@@ -323,7 +323,7 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
     );
   }
 
-  protected <T> ClosableIterator<T> lookupEngineRecords(List<String> keys, 
boolean fullKey) throws IOException {
+  protected <T> ClosableIterator<T> lookupEngineRecords(HoodieReaderContext<T> 
readerContext, List<String> keys, boolean fullKey) throws IOException {
     throw new UnsupportedOperationException(
         String.format("Point lookups are not supported by this Data block type 
(%s)", getBlockType())
     );
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 659cbf8c8d18..bc80e279a194 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -57,7 +57,7 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkState;
  * base file format.
  */
 public class HoodieHFileDataBlock extends HoodieDataBlock {
-  private final Option<String> compressionCodec;
+  private final Map<String, String> writerParams;
   // This path is used for constructing HFile reader context, which should not 
be
   // interpreted as the actual file path for the HFile data blocks
   private final StoragePath pathForReader;
@@ -73,7 +73,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
                               StoragePath pathForReader) {
     super(content, inputStreamSupplier, readBlockLazily, 
Option.of(logBlockContentLocation), readerSchema,
         header, footer, HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME, 
enablePointLookups);
-    this.compressionCodec = Option.empty();
+    this.writerParams = Collections.emptyMap();
     this.pathForReader = pathForReader;
   }
 
@@ -81,8 +81,17 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
                               Map<HeaderMetadataType, String> header,
                               String compressionCodec,
                               StoragePath pathForReader) {
+    this(records, header, compressionCodec, Collections.emptyMap(), 
pathForReader);
+  }
+
+  public HoodieHFileDataBlock(List<HoodieRecord> records,
+                              Map<HeaderMetadataType, String> header,
+                              String compressionCodec,
+                              Map<String, String> paramsMap,
+                              StoragePath pathForReader) {
     super(records, header, new HashMap<>(), 
HoodieAvroHFileReaderImplBase.KEY_FIELD_NAME);
-    this.compressionCodec = Option.of(compressionCodec);
+    this.writerParams = new HashMap<>(paramsMap);
+    this.writerParams.putIfAbsent(HFILE_COMPRESSION_ALGORITHM_NAME.key(), 
compressionCodec);
     this.pathForReader = pathForReader;
   }
 
@@ -98,7 +107,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
     return 
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(HoodieFileFormat.HFILE)
         .serializeRecordsToLogBlock(
             storage, records, writerSchema, getSchema(), getKeyFieldName(),
-            Collections.singletonMap(HFILE_COMPRESSION_ALGORITHM_NAME.key(), 
compressionCodec.get()));
+            writerParams);
   }
 
   @Override
@@ -178,7 +187,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
   }
 
   @Override
-  protected <T> ClosableIterator<T> lookupEngineRecords(List<String> 
sortedKeys, boolean fullKey) throws IOException {
+  protected <T> ClosableIterator<T> lookupEngineRecords(HoodieReaderContext<T> 
readerContext, List<String> sortedKeys, boolean fullKey) throws IOException {
     HoodieLogBlockContentLocation blockContentLoc = 
getBlockContentLocation().get();
 
     // NOTE: It's important to extend Hadoop configuration here to make sure 
configuration
@@ -195,7 +204,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
     try (final HoodieAvroHFileReaderImplBase reader = 
(HoodieAvroHFileReaderImplBase) HoodieIOFactory
         .getIOFactory(inlineStorage)
         .getReaderFactory(HoodieRecordType.AVRO)
-        .getFileReader(ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
+        .getFileReader(readerContext.getHoodieReaderConfig(),
             storagePathInfo,
             HoodieFileFormat.HFILE,
             Option.of(getSchemaFromHeader()))) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
index 5f0ce3bbb9a0..398494a957dd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ConfigUtils.java
@@ -860,6 +860,8 @@ public class ConfigUtils {
         
metadataConfig.getStringOrDefault(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES));
     
props.setProperty(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB.key(),
         
metadataConfig.getStringOrDefault(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB));
+    props.setProperty(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(),
+        
metadataConfig.getStringOrDefault(HoodieMetadataConfig.BLOOM_FILTER_ENABLE));
     return props;
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
index e47779c6f623..864bd76e2b14 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -20,6 +20,8 @@
 package org.apache.hudi.common.util;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -58,7 +60,12 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_TYPE;
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_WITH_BLOOM_FILTER_ENABLED;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 
 /**
@@ -192,6 +199,8 @@ public class HFileUtils extends FileFormatUtils {
                                                           String keyFieldName,
                                                           Map<String, String> 
paramsMap) throws IOException {
     CompressionCodec compressionCodec = 
getHFileCompressionAlgorithm(paramsMap);
+    boolean enableBloomFilter = isHFileBloomFilterEnabled(paramsMap);
+    BloomFilter bloomFilter = enableBloomFilter ? createBloomFilter(paramsMap) 
: null;
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     try (OutputStream ostream = new DataOutputStream(baos)) {
       HFileContext context = HFileContext.builder()
@@ -206,6 +215,8 @@ public class HFileUtils extends FileFormatUtils {
       Option<HoodieSchemaField> keyField = writerSchema.getField(keyFieldName);
       try (HFileWriter writer = new HFileWriterImpl(context, ostream)) {
         String previousRecordKey = null;
+        String minRecordKey = null;
+        String maxRecordKey = null;
         // It is assumed that the input records are sorted based on the record 
key
         // for HFile block
         for (int i = 0; i < records.size(); i++) {
@@ -229,18 +240,43 @@ public class HFileUtils extends FileFormatUtils {
           } catch (IOException e) {
             throw new HoodieIOException("IOException serializing records", e);
           }
+          if (enableBloomFilter) {
+            bloomFilter.add(recordKey);
+            if (minRecordKey == null) {
+              minRecordKey = recordKey;
+            }
+            maxRecordKey = recordKey;
+          }
           previousRecordKey = recordKey;
         }
 
         writer.appendFileInfo(
             HoodieAvroHFileReaderImplBase.SCHEMA_KEY,
             getUTF8Bytes(readerSchema.toString()));
+        if (enableBloomFilter) {
+          appendBloomFilter(writer, minRecordKey, maxRecordKey, bloomFilter);
+        }
       }
       ostream.flush();
     }
     return baos;
   }
 
+  public static void appendBloomFilter(HFileWriter writer, String 
minRecordKey, String maxRecordKey, BloomFilter bloomFilter) {
+    writer.appendFileInfo(
+        HoodieAvroHFileReaderImplBase.KEY_MIN_RECORD,
+        getUTF8Bytes(minRecordKey == null ? StringUtils.EMPTY_STRING : 
minRecordKey));
+    writer.appendFileInfo(
+        HoodieAvroHFileReaderImplBase.KEY_MAX_RECORD,
+        getUTF8Bytes(maxRecordKey == null ? StringUtils.EMPTY_STRING : 
maxRecordKey));
+    writer.appendFileInfo(
+        HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_TYPE_CODE,
+        getUTF8Bytes(bloomFilter.getBloomFilterTypeCode().toString()));
+    writer.appendMetaInfo(
+        HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_META_BLOCK,
+        getUTF8Bytes(bloomFilter.serializeToString()));
+  }
+
   /**
    * Print the meta fields of the record of interest
    */
@@ -270,6 +306,19 @@ public class HFileUtils extends FileFormatUtils {
     return Option.ofNullable(record.getRecordKey(readerSchema, keyFieldName));
   }
 
+  private static boolean isHFileBloomFilterEnabled(Map<String, String> 
paramsMap) {
+    return Boolean.parseBoolean(paramsMap.getOrDefault(
+        HFILE_WITH_BLOOM_FILTER_ENABLED.key(), 
String.valueOf(HFILE_WITH_BLOOM_FILTER_ENABLED.defaultValue())));
+  }
+
+  private static BloomFilter createBloomFilter(Map<String, String> paramsMap) {
+    return BloomFilterFactory.createBloomFilter(
+        
Integer.parseInt(paramsMap.getOrDefault(BLOOM_FILTER_NUM_ENTRIES_VALUE.key(), 
BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue())),
+        
Double.parseDouble(paramsMap.getOrDefault(BLOOM_FILTER_FPP_VALUE.key(), 
BLOOM_FILTER_FPP_VALUE.defaultValue())),
+        
Integer.parseInt(paramsMap.getOrDefault(BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key(), 
BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue())),
+        paramsMap.getOrDefault(BLOOM_FILTER_TYPE.key(), 
BLOOM_FILTER_TYPE.defaultValue()));
+  }
+
   private static byte[] serializeRecord(HoodieRecord<?> record, HoodieSchema 
schema, Option<HoodieSchemaField> keyField) throws IOException {
     return record.toIndexedRecord(schema, 
CollectionUtils.emptyProps()).map(HoodieAvroIndexedRecord::getData).map(indexedRecord
 -> {
       keyField.ifPresent(field -> indexedRecord.put(field.pos(), 
StringUtils.EMPTY_STRING));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
index 1020d1a8c9f0..69b6dd9fd0ed 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
@@ -333,8 +333,8 @@ public class HoodieNativeAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
 
   private static BloomFilter readBloomFilter(HFileReader reader) throws 
HoodieException {
     try {
-      ByteBuffer byteBuffer = 
reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK).get();
-      return BloomFilterFactory.fromByteBuffer(byteBuffer,
+      ByteBuffer byteBuffer = 
reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK).orElse(null);
+      return byteBuffer == null ? null : 
BloomFilterFactory.fromByteBuffer(byteBuffer,
           fromUTF8Bytes(reader.getMetaInfo(new 
UTF8StringKey(KEY_BLOOM_FILTER_TYPE_CODE)).get()));
     } catch (IOException e) {
       throw new HoodieException("Could not read bloom filter from HFile", e);
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java
index 9ddbf08e3e99..a7901c3e3e0f 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieStorageConfig.java
@@ -29,8 +29,11 @@ import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_DYN
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE;
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE;
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_TYPE;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_WITH_BLOOM_FILTER_ENABLED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestHoodieStorageConfig {
   @Test
@@ -74,4 +77,15 @@ public class TestHoodieStorageConfig {
     assertEquals(BLOOM_FILTER_FPP_VALUE.defaultValue(), 
storageConfig.getString(BLOOM_FILTER_FPP_VALUE));
     assertEquals(BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue(), 
storageConfig.getString(BLOOM_FILTER_DYNAMIC_MAX_ENTRIES));
   }
+
+  @Test
+  void testHFileBloomFilterBuilder() {
+    HoodieStorageConfig defaultStorageConfig = 
HoodieStorageConfig.newBuilder().build();
+    
assertTrue(defaultStorageConfig.getBoolean(HFILE_WITH_BLOOM_FILTER_ENABLED));
+
+    HoodieStorageConfig disabledStorageConfig = 
HoodieStorageConfig.newBuilder()
+        .hfileBloomFilterEnable(false)
+        .build();
+    
assertFalse(disabledStorageConfig.getBoolean(HFILE_WITH_BLOOM_FILTER_ENABLED));
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
index 1c862803c5f2..84566159b139 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestConfigUtils.java
@@ -458,9 +458,10 @@ public class TestConfigUtils {
   }
 
   @Test
-  void testBuildFileGroupReaderPropertiesIncludesMetadataFileCacheConfig() {
+  void testBuildFileGroupReaderPropertiesIncludesMetadataReaderConfigs() {
     TypedProperties metadataProps = new TypedProperties();
     
metadataProps.setProperty(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB.key(),
 "123");
+    metadataProps.setProperty(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(), 
"true");
     
metadataProps.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key(), 
"false");
     metadataProps.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key(), 
"200000");
     
metadataProps.setProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES.key(),
 "7");
@@ -472,8 +473,9 @@ public class TestConfigUtils {
     TypedProperties fileGroupReaderProps = 
ConfigUtils.buildFileGroupReaderProperties(metadataConfig, false);
 
     assertEquals("123", 
fileGroupReaderProps.getProperty(HoodieMetadataConfig.METADATA_FILE_CACHE_MAX_SIZE_MB.key()));
+    assertEquals("true", 
fileGroupReaderProps.getProperty(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key()));
     assertEquals("false", 
fileGroupReaderProps.getProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_ENABLED.key()));
     assertEquals("200000", 
fileGroupReaderProps.getProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_SIZE.key()));
     assertEquals("7", 
fileGroupReaderProps.getProperty(HoodieReaderConfig.HFILE_BLOCK_CACHE_TTL_MINUTES.key()));
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
index fbf9b196f427..85660913304e 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestHFileUtils.java
@@ -19,23 +19,61 @@
 
 package org.apache.hudi.common.util;
 
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.io.ByteArraySeekableDataInputStream;
+import org.apache.hudi.io.ByteBufferBackedInputStream;
 import org.apache.hudi.io.compress.CompressionCodec;
+import org.apache.hudi.io.hfile.HFileReader;
+import org.apache.hudi.io.hfile.HFileReaderImpl;
+import org.apache.hudi.io.hfile.UTF8StringKey;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
 
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.BLOOM_FILTER_TYPE;
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_COMPRESSION_ALGORITHM_NAME;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.HFILE_WITH_BLOOM_FILTER_ENABLED;
 import static 
org.apache.hudi.common.util.HFileUtils.getHFileCompressionAlgorithm;
+import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Tests {@link HFileUtils}
  */
 public class TestHFileUtils {
+  private static final String KEY_FIELD_NAME = "key";
+  private static final HoodieSchema SCHEMA = HoodieSchema.parse("{\n"
+      + "  \"type\": \"record\",\n"
+      + "  \"name\": \"HFileLogBlockRecord\",\n"
+      + "  \"fields\": [\n"
+      + "    {\"name\": \"" + KEY_FIELD_NAME + "\", \"type\": \"string\"},\n"
+      + "    {\"name\": \"value\", \"type\": \"string\"}\n"
+      + "  ]\n"
+      + "}");
+
   @ParameterizedTest
   @EnumSource(CompressionCodec.class)
   public void testGetHFileCompressionAlgorithm(CompressionCodec algo) {
@@ -57,4 +95,57 @@ public class TestHFileUtils {
   public void testGetDefaultHFileCompressionAlgorithm() {
     assertEquals(CompressionCodec.GZIP, 
getHFileCompressionAlgorithm(Collections.emptyMap()));
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void 
testSerializeRecordsToLogBlockControlsBloomFilterMetadata(boolean 
hfileBloomFilterEnabled) throws Exception {
+    Map<String, String> paramsMap = new HashMap<>();
+    paramsMap.put(HFILE_COMPRESSION_ALGORITHM_NAME.key(), 
HFILE_COMPRESSION_ALGORITHM_NAME.defaultValue());
+    paramsMap.put(HFILE_WITH_BLOOM_FILTER_ENABLED.key(), 
Boolean.toString(hfileBloomFilterEnabled));
+    paramsMap.put(BLOOM_FILTER_NUM_ENTRIES_VALUE.key(), 
BLOOM_FILTER_NUM_ENTRIES_VALUE.defaultValue());
+    paramsMap.put(BLOOM_FILTER_FPP_VALUE.key(), 
BLOOM_FILTER_FPP_VALUE.defaultValue());
+    paramsMap.put(BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key(), 
BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.defaultValue());
+    paramsMap.put(BLOOM_FILTER_TYPE.key(), BLOOM_FILTER_TYPE.defaultValue());
+
+    ByteArrayOutputStream outputStream = new 
HFileUtils().serializeRecordsToLogBlock(
+        null, createRecords(), SCHEMA, SCHEMA, KEY_FIELD_NAME, paramsMap);
+
+    try (HFileReader reader = new HFileReaderImpl(
+        new ByteArraySeekableDataInputStream(new 
ByteBufferBackedInputStream(outputStream.toByteArray())),
+        outputStream.size())) {
+      reader.initializeMetadata();
+
+      assertEquals(hfileBloomFilterEnabled,
+          
reader.getMetaBlock(HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_META_BLOCK).isPresent());
+      assertEquals(hfileBloomFilterEnabled,
+          reader.getMetaInfo(new 
UTF8StringKey(HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_TYPE_CODE)).isPresent());
+      assertEquals(hfileBloomFilterEnabled,
+          reader.getMetaInfo(new 
UTF8StringKey(HoodieAvroHFileReaderImplBase.KEY_MIN_RECORD)).isPresent());
+      assertEquals(hfileBloomFilterEnabled,
+          reader.getMetaInfo(new 
UTF8StringKey(HoodieAvroHFileReaderImplBase.KEY_MAX_RECORD)).isPresent());
+
+      if (hfileBloomFilterEnabled) {
+        ByteBuffer bloomFilterBuffer = 
reader.getMetaBlock(HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_META_BLOCK).get();
+        BloomFilter bloomFilter = BloomFilterFactory.fromByteBuffer(
+            bloomFilterBuffer,
+            fromUTF8Bytes(reader.getMetaInfo(new 
UTF8StringKey(HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_TYPE_CODE)).get()));
+
+        assertTrue(bloomFilter.mightContain("key00"));
+        assertTrue(bloomFilter.mightContain("key02"));
+        assertFalse(bloomFilter.mightContain("missing-key"));
+      }
+    }
+  }
+
+  private static List<HoodieRecord> createRecords() {
+    List<HoodieRecord> records = new ArrayList<>();
+    for (int i = 0; i < 3; i++) {
+      String key = "key" + String.format("%02d", i);
+      GenericRecord record = new GenericData.Record(SCHEMA.toAvroSchema());
+      record.put(KEY_FIELD_NAME, key);
+      record.put("value", "value" + i);
+      records.add(new HoodieAvroIndexedRecord(new HoodieKey(key, ""), record));
+    }
+    return records;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
index dcd87550ede4..078f6b6779a3 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java
@@ -232,16 +232,7 @@ public class FlinkWriteClients {
                 
.parquetMaxFileSize(conf.get(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) * 1024 * 
1024L)
                 
.withWriteUtcTimezone(conf.get(FlinkOptions.WRITE_UTC_TIMEZONE))
                 .build())
-            .withMetadataConfig(HoodieMetadataConfig.newBuilder()
-                .withEngineType(EngineType.FLINK) // this affects the default 
value inference
-                .enable(conf.get(FlinkOptions.METADATA_ENABLED))
-                .withRecordIndexFileGroupCount(
-                    
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(),
-                        
GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_DEFAULT)),
-                    
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(),
-                        
HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP.defaultValue()
 + "")))
-                
.withMaxNumDeltaCommitsBeforeCompaction(conf.get(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS))
-                .build())
+            .withMetadataConfig(getMetadataConfig(conf, false))
             .withIndexConfig(StreamerUtil.getIndexConfig(conf))
             .withPayloadConfig(getPayloadConfig(conf))
             .withEmbeddedTimelineServerReuseEnabled(true) // make write client 
embedded timeline service singleton
@@ -279,4 +270,41 @@ public class FlinkWriteClients {
     }
     return writeConfig;
   }
+
+  /**
+   * Builds a {@link HoodieMetadataConfig} from the given Flink configuration, 
including any
+   * additional metadata table properties set via raw config options.
+   *
+   * @param conf the Flink configuration.
+   * @return the {@link HoodieMetadataConfig} constructed from the Flink 
configuration.
+   */
+  public static HoodieMetadataConfig getMetadataConfig(Configuration conf) {
+    return getMetadataConfig(conf, true);
+  }
+
+  /**
+   * Builds a {@link HoodieMetadataConfig} from the given Flink configuration.
+   *
+   * @param conf                 the Flink configuration.
+   * @param includeAllProperties whether to also apply any additional metadata 
table properties
+   *                             set via raw config options on top of the 
well-known Flink options.
+   * @return the {@link HoodieMetadataConfig} constructed from the Flink 
configuration.
+   */
+  public static HoodieMetadataConfig getMetadataConfig(Configuration conf, 
boolean includeAllProperties) {
+    HoodieMetadataConfig.Builder metadataConfigBuilder =
+        HoodieMetadataConfig.newBuilder()
+        .withEngineType(EngineType.FLINK) // this affects the default value 
inference
+        .enable(conf.get(FlinkOptions.METADATA_ENABLED))
+        .withRecordIndexFileGroupCount(
+            
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(),
+                GLOBAL_RECORD_LEVEL_INDEX_MIN_FILE_GROUP_COUNT_DEFAULT)),
+            
Integer.parseInt(conf.getString(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(),
+                
HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_MAX_FILE_GROUP_COUNT_PROP.defaultValue()
 + "")))
+        
.withMaxNumDeltaCommitsBeforeCompaction(conf.get(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS));
+
+    if (includeAllProperties) {
+      metadataConfigBuilder.fromProperties(flinkConf2TypedProperties(conf));
+    }
+    return metadataConfigBuilder.build();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 4190bf07a458..7e9fb63d1d90 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -98,7 +98,6 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
@@ -733,12 +732,7 @@ public class StreamerUtil {
    * @return HoodieMetadataConfig constructed from flink configuration.
    */
   public static HoodieMetadataConfig 
metadataConfig(org.apache.flink.configuration.Configuration conf) {
-    Properties properties = new Properties();
-
-    // set up metadata.enabled=true in table DDL to enable metadata listing
-    properties.put(HoodieMetadataConfig.ENABLE.key(), 
conf.get(FlinkOptions.METADATA_ENABLED));
-
-    return 
HoodieMetadataConfig.newBuilder().fromProperties(properties).build();
+    return FlinkWriteClients.getMetadataConfig(conf);
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index f1f809bb43d2..09810de5c657 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -21,29 +21,54 @@ package org.apache.hudi.table.format;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
+import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.PartialUpdateAvroPayload;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.HoodieDataUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.io.ByteArraySeekableDataInputStream;
+import org.apache.hudi.io.ByteBufferBackedInputStream;
 import org.apache.hudi.io.FileGroupReaderBasedMergeHandle;
 import org.apache.hudi.io.HoodieWriteMergeHandle;
+import org.apache.hudi.io.SeekableDataInputStream;
+import org.apache.hudi.io.hfile.HFileReader;
+import org.apache.hudi.io.hfile.HFileReaderImpl;
+import org.apache.hudi.io.hfile.UTF8StringKey;
+import org.apache.hudi.io.storage.HoodieAvroHFileReaderImplBase;
+import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.source.IncrementalInputSplits;
 import org.apache.hudi.source.prune.PartitionPruners;
+import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.HoodieTableSource;
 import org.apache.hudi.table.format.cdc.CdcInputFormat;
@@ -81,6 +106,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -92,6 +118,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
+import static org.apache.hudi.common.util.StringUtils.fromUTF8Bytes;
 import static org.apache.hudi.utils.TestData.insertRow;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
@@ -170,6 +197,58 @@ public class TestInputFormat {
     assertThat(actual, is(expected));
   }
 
+  @Test
+  void testRecordIndexLogBlocksContainBloomFilter() throws Exception {
+    Map<String, String> options = new HashMap<>();
+    options.put(FlinkOptions.INDEX_TYPE.key(), 
HoodieIndex.IndexType.GLOBAL_RECORD_LEVEL_INDEX.name());
+    
options.put(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(), 
"true");
+    options.put(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), "true");
+    options.put(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(), "true");
+    options.put(HoodieStorageConfig.HFILE_WITH_BLOOM_FILTER_ENABLED.key(), 
"true");
+    beforeEach(HoodieTableType.COPY_ON_WRITE, options);
+
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    HoodieStorage storage = StreamerUtil.createMetaClient(conf).getStorage();
+    StoragePath recordIndexPath = new StoragePath(
+        
HoodieTableMetadata.getMetadataTableBasePath(conf.get(FlinkOptions.PATH)),
+        MetadataPartitionType.RECORD_INDEX.getPartitionPath());
+    List<StoragePathInfo> logFiles = 
FSUtils.getAllDataFilesInPartition(storage, recordIndexPath).stream()
+        .filter(pathInfo -> FSUtils.isLogFile(pathInfo.getPath()))
+        .collect(Collectors.toList());
+
+    assertFalse(logFiles.isEmpty(), "The MDT record index partition should 
contain log files");
+    int hfileDataBlockCount = 0;
+    for (StoragePathInfo logFile : logFiles) {
+      HoodieSchema schema = TableSchemaResolver.readSchemaFromLogFile(storage, 
logFile.getPath());
+      try (HoodieLogFormat.Reader reader =
+               HoodieLogFormat.newReader(storage, new HoodieLogFile(logFile), 
schema)) {
+        while (reader.hasNext()) {
+          HoodieLogBlock logBlock = reader.next();
+          if (logBlock instanceof HoodieHFileDataBlock) {
+            assertBloomFilterContainsWrittenKey(storage, logBlock);
+            hfileDataBlockCount++;
+          }
+        }
+      }
+    }
+    assertTrue(hfileDataBlockCount > 0, "The MDT record index log files should 
contain HFile data blocks");
+
+    HoodieTableMetadata metadataTable = 
StreamerUtil.createMetaClient(conf).getTableFormat().getMetadataFactory().create(
+        HoodieFlinkEngineContext.DEFAULT,
+        storage,
+        StreamerUtil.metadataConfig(conf),
+        conf.get(FlinkOptions.PATH));
+    List<String> recordKeys = Arrays.asList(
+        "id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8", "missing-key");
+    Map<String, HoodieRecordGlobalLocation> recordLocations = 
HoodieDataUtils.dedupeAndCollectAsMap(
+        
metadataTable.readRecordIndexLocationsWithKeys(HoodieListData.eager(recordKeys)));
+
+    assertThat(recordLocations.size(), is(8));
+    recordKeys.subList(0, 8).forEach(key -> 
assertTrue(recordLocations.containsKey(key)));
+    assertFalse(recordLocations.containsKey("missing-key"));
+  }
+
   @ParameterizedTest
   @ValueSource(strings = {"PARQUET", "LANCE"})
   void testReadBaseAndLogFiles(String baseFileFormat) throws Exception {
@@ -1519,6 +1598,33 @@ public class TestInputFormat {
         conf);
   }
 
+  private static void assertBloomFilterContainsWrittenKey(
+      HoodieStorage storage, HoodieLogBlock logBlock) throws IOException {
+    HoodieLogBlock.HoodieLogBlockContentLocation contentLocation =
+        logBlock.getBlockContentLocation().get();
+    byte[] hfileContent = new 
byte[Math.toIntExact(contentLocation.getBlockSize())];
+    try (SeekableDataInputStream inputStream =
+             storage.openSeekable(contentLocation.getLogFile().getPath(), 
false)) {
+      inputStream.seek(contentLocation.getContentPositionInLogFile());
+      inputStream.readFully(hfileContent);
+    }
+
+    try (HFileReader hfileReader = new HFileReaderImpl(
+        new ByteArraySeekableDataInputStream(new 
ByteBufferBackedInputStream(hfileContent)),
+        hfileContent.length)) {
+      hfileReader.initializeMetadata();
+      ByteBuffer bloomFilterBuffer = hfileReader.getMetaBlock(
+          HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_META_BLOCK).get();
+      String bloomFilterType = fromUTF8Bytes(hfileReader.getMetaInfo(
+          new 
UTF8StringKey(HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_TYPE_CODE)).get());
+      BloomFilter bloomFilter = 
BloomFilterFactory.fromByteBuffer(bloomFilterBuffer, bloomFilterType);
+
+      assertTrue(hfileReader.seekTo(), "The HFile data block should contain at 
least one record");
+      String writtenKey = 
hfileReader.getKeyValue().get().getKey().getContentInString();
+      assertTrue(bloomFilter.mightContain(writtenKey));
+    }
+  }
+
   private static List<RowData> readData(InputFormat inputFormat) throws 
IOException {
     return readData(inputFormat, TestConfigurations.SERIALIZER);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
index a3908f9bcf21..3910b91daebd 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestFlinkWriteClients.java
@@ -25,6 +25,7 @@ import 
org.apache.hudi.client.model.CommitTimeFlinkRecordMerger;
 import org.apache.hudi.client.model.EventTimeFlinkRecordMerger;
 import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
 import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
@@ -230,6 +231,55 @@ public class TestFlinkWriteClients {
     assertThat(mergerClasses, 
is(PartialUpdateFlinkRecordMerger.class.getName()));
   }
 
+  @Test
+  void testGetMetadataConfigWithoutAdditionalProperties() {
+    conf.set(FlinkOptions.METADATA_ENABLED, true);
+    conf.set(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 5);
+    // raw metadata table properties that are not exposed through FlinkOptions
+    conf.setString(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(), "true");
+    conf.setString(HoodieMetadataConfig.BLOOM_FILTER_TYPE.key(), 
BloomFilterTypeCode.SIMPLE.name());
+
+    HoodieMetadataConfig metadataConfig = 
FlinkWriteClients.getMetadataConfig(conf, false);
+
+    assertTrue(metadataConfig.isEnabled());
+    assertEquals(5, 
metadataConfig.getInt(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS));
+    // raw properties are not applied when includeAllProperties is false
+    assertFalse(metadataConfig.enableBloomFilter());
+  }
+
+  @Test
+  void testGetMetadataConfigWithAdditionalProperties() {
+    conf.set(FlinkOptions.METADATA_ENABLED, true);
+    conf.set(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS, 5);
+    conf.setString(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(), "true");
+    conf.setString(HoodieMetadataConfig.BLOOM_FILTER_TYPE.key(), 
BloomFilterTypeCode.SIMPLE.name());
+    conf.setString(HoodieMetadataConfig.BLOOM_FILTER_NUM_ENTRIES.key(), 
"12345");
+
+    HoodieMetadataConfig metadataConfig = 
FlinkWriteClients.getMetadataConfig(conf);
+
+    assertTrue(metadataConfig.isEnabled());
+    assertEquals(5, 
metadataConfig.getInt(HoodieMetadataConfig.COMPACT_NUM_DELTA_COMMITS));
+    // raw properties are applied when includeAllProperties is true (the 
default)
+    assertTrue(metadataConfig.enableBloomFilter());
+    assertEquals(BloomFilterTypeCode.SIMPLE.name(), 
metadataConfig.getBloomFilterType());
+    assertEquals(12345, metadataConfig.getBloomFilterNumEntries());
+  }
+
+  @Test
+  void testHoodieClientConfigMetadataConfigConsistentWithGetMetadataConfig() 
throws Exception {
+    conf.set(FlinkOptions.METADATA_ENABLED, true);
+    conf.setString(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(), "true");
+    conf.setString(HoodieMetadataConfig.BLOOM_FILTER_TYPE.key(), 
BloomFilterTypeCode.SIMPLE.name());
+    StreamerUtil.initTableIfNotExists(conf);
+
+    HoodieMetadataConfig directMetadataConfig = 
FlinkWriteClients.getMetadataConfig(conf);
+    HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(conf);
+
+    assertEquals(directMetadataConfig.isEnabled(), 
writeConfig.isMetadataTableEnabled());
+    assertEquals(directMetadataConfig.enableBloomFilter(), 
writeConfig.getMetadataConfig().enableBloomFilter());
+    assertEquals(directMetadataConfig.getBloomFilterType(), 
writeConfig.getMetadataConfig().getBloomFilterType());
+  }
+
   @Test
   void testWriteMergeHandleForPreV9Table() throws Exception {
     conf.set(FlinkOptions.WRITE_TABLE_VERSION, 
HoodieTableVersion.EIGHT.versionCode());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
index 13df2e842ab2..7b2a981377f0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java
@@ -19,6 +19,8 @@
 package org.apache.hudi.utils;
 
 import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
@@ -26,6 +28,7 @@ import org.apache.hudi.common.model.PartialUpdateAvroPayload;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.collection.Triple;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -65,6 +68,27 @@ class TestStreamerUtil {
   @TempDir
   File tempFile;
 
+  @Test
+  void testMetadataConfigIncludesMetadataTableBloomFilterSettings() {
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.set(FlinkOptions.METADATA_ENABLED, true);
+    conf.setString(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(), "true");
+    conf.setString(HoodieMetadataConfig.BLOOM_FILTER_TYPE.key(), 
BloomFilterTypeCode.SIMPLE.name());
+    conf.setString(HoodieMetadataConfig.BLOOM_FILTER_NUM_ENTRIES.key(), 
"12345");
+    conf.setString(HoodieMetadataConfig.BLOOM_FILTER_FPP.key(), "0.005");
+    
conf.setString(HoodieMetadataConfig.BLOOM_FILTER_DYNAMIC_MAX_ENTRIES.key(), 
"23456");
+    conf.setString(FileSystemViewStorageConfig.REMOTE_HOST_NAME.key(), 
"localhost");
+
+    HoodieMetadataConfig metadataConfig = StreamerUtil.metadataConfig(conf);
+
+    assertTrue(metadataConfig.isEnabled());
+    assertTrue(metadataConfig.enableBloomFilter());
+    assertEquals(BloomFilterTypeCode.SIMPLE.name(), 
metadataConfig.getBloomFilterType());
+    assertEquals(12345, metadataConfig.getBloomFilterNumEntries());
+    assertEquals(0.005, metadataConfig.getBloomFilterFpp());
+    assertEquals(23456, metadataConfig.getDynamicBloomFilterMaxNumEntries());
+  }
+
   @Test
   void testInferMergingBehavior() {
     Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileReaderFactory.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileReaderFactory.java
index a0512e84522d..ccc936a66f75 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileReaderFactory.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileReaderFactory.java
@@ -20,6 +20,7 @@
 package org.apache.hudi.io.storage.hadoop;
 
 import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.io.storage.HFileReaderFactory;
@@ -51,8 +52,7 @@ public class HoodieAvroFileReaderFactory extends 
HoodieFileReaderFactory {
     HFileReaderFactory readerFactory = HFileReaderFactory.builder()
         .withStorage(storage).withProps(hoodieConfig.getProps())
         .withPath(path).build();
-    return HoodieNativeAvroHFileReader.builder()
-        .readerFactory(readerFactory).path(path).schema(schemaOption).build();
+    return newNativeHFileReader(hoodieConfig, readerFactory, path, 
schemaOption);
   }
 
   protected HoodieFileReader newHFileFileReader(HoodieConfig hoodieConfig, 
StoragePathInfo pathInfo,
@@ -60,8 +60,7 @@ public class HoodieAvroFileReaderFactory extends 
HoodieFileReaderFactory {
     HFileReaderFactory readerFactory = HFileReaderFactory.builder()
         .withStorage(storage).withProps(hoodieConfig.getProps())
         
.withPath(pathInfo.getPath()).withFileSize(pathInfo.getLength()).build();
-    return HoodieNativeAvroHFileReader.builder()
-        
.readerFactory(readerFactory).path(pathInfo.getPath()).schema(schemaOption).build();
+    return newNativeHFileReader(hoodieConfig, readerFactory, 
pathInfo.getPath(), schemaOption);
   }
 
   @Override
@@ -73,8 +72,19 @@ public class HoodieAvroFileReaderFactory extends 
HoodieFileReaderFactory {
     HFileReaderFactory.Builder readerFactoryBuilder = 
HFileReaderFactory.builder()
         .withStorage(storage).withProps(hoodieConfig.getProps())
         .withContent(content);
+    return newNativeHFileReader(hoodieConfig, readerFactoryBuilder.build(), 
path, schemaOption);
+  }
+
+  private HoodieNativeAvroHFileReader newNativeHFileReader(HoodieConfig 
hoodieConfig,
+                                                           HFileReaderFactory 
readerFactory,
+                                                           StoragePath path,
+                                                           
Option<HoodieSchema> schemaOption) {
     return HoodieNativeAvroHFileReader.builder()
-        
.readerFactory(readerFactoryBuilder.build()).path(path).schema(schemaOption).build();
+        .readerFactory(readerFactory)
+        .path(path)
+        .schema(schemaOption)
+        
.useBloomFilter(hoodieConfig.getBoolean(HoodieMetadataConfig.BLOOM_FILTER_ENABLE))
+        .build();
   }
 
   @Override
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
index c474df450ad8..83ebc7159e01 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroFileWriterFactory.java
@@ -109,7 +109,8 @@ public class HoodieAvroFileWriterFactory extends 
HoodieFileWriterFactory {
   protected HoodieFileWriter newHFileFileWriter(
       String instantTime, StoragePath path, HoodieConfig config, HoodieSchema 
schema,
       TaskContextSupplier taskContextSupplier) throws IOException {
-    BloomFilter filter = createBloomFilter(config);
+    BloomFilter filter = 
config.getBooleanOrDefault(HoodieStorageConfig.HFILE_WITH_BLOOM_FILTER_ENABLED)
+        ? createBloomFilter(config) : null;
     HoodieHFileConfig hfileConfig = new HoodieHFileConfig(
         storage.getConf(),
         CompressionCodec.findCodecByName(
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroHFileWriter.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroHFileWriter.java
index 9031390af1bb..15f088cbd8aa 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroHFileWriter.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/storage/hadoop/HoodieAvroHFileWriter.java
@@ -20,11 +20,11 @@
 package org.apache.hudi.io.storage.hadoop;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.util.HFileUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieDuplicateKeyException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
@@ -169,23 +169,7 @@ public class HoodieAvroHFileWriter
   @Override
   public void close() throws IOException {
     if (hfileConfig.useBloomFilter()) {
-      final BloomFilter bloomFilter = hfileConfig.getBloomFilter();
-      if (minRecordKey == null) {
-        minRecordKey = "";
-      }
-      if (maxRecordKey == null) {
-        maxRecordKey = "";
-      }
-      writer.appendFileInfo(
-          HoodieAvroHFileReaderImplBase.KEY_MIN_RECORD, 
getUTF8Bytes(minRecordKey));
-      writer.appendFileInfo(
-          HoodieAvroHFileReaderImplBase.KEY_MAX_RECORD, 
getUTF8Bytes(maxRecordKey));
-      writer.appendFileInfo(
-          HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_TYPE_CODE,
-          getUTF8Bytes(bloomFilter.getBloomFilterTypeCode().toString()));
-      writer.appendMetaInfo(
-          HoodieAvroHFileReaderImplBase.KEY_BLOOM_FILTER_META_BLOCK,
-          getUTF8Bytes(bloomFilter.serializeToString()));
+      HFileUtils.appendBloomFilter(writer, minRecordKey, maxRecordKey, 
hfileConfig.getBloomFilter());
     }
     writer.close();
     writer = null;
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
index 337ad4d2b8f9..af17d8447884 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieAvroFileReaderFactory.java
@@ -19,22 +19,34 @@
 
 package org.apache.hudi.io.hadoop;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieIOFactory;
+import org.apache.hudi.io.storage.HoodieNativeAvroHFileReader;
 import org.apache.hudi.io.storage.hadoop.HoodieAvroOrcReader;
 import org.apache.hudi.io.storage.hadoop.HoodieAvroParquetReader;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 
 import static 
org.apache.hudi.common.util.ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -45,6 +57,8 @@ public class TestHoodieAvroFileReaderFactory {
   @TempDir
   public java.nio.file.Path tempDir;
 
+  private static final StoragePath HFILE_PATH = new 
StoragePath("/partition/path/f1_1-0-1_000.hfile");
+
   @Test
   public void testGetFileReader() throws IOException {
     // parquet file format.
@@ -70,4 +84,56 @@ public class TestHoodieAvroFileReaderFactory {
         .getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER, orcPath);
     assertTrue(orcReader instanceof HoodieAvroOrcReader);
   }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testHFileReaderPassesBloomFilterConfig(boolean 
bloomFilterEnabled)
+      throws IOException, ReflectiveOperationException {
+    HoodieStorage storage = HoodieTestUtils.getDefaultStorage();
+    HoodieFileReaderFactory readerFactory = 
HoodieIOFactory.getIOFactory(storage)
+        .getReaderFactory(HoodieRecordType.AVRO);
+    HoodieMetadataConfig metadataConfig = HoodieMetadataConfig.newBuilder()
+        .withProperties(DEFAULT_HUDI_CONFIG_FOR_READER.getProps())
+        .withProperties(bloomFilterProps(bloomFilterEnabled))
+        .build();
+
+    assertBloomFilterConfig(readerFactory.getFileReader(metadataConfig, 
HFILE_PATH),
+        bloomFilterEnabled);
+    assertBloomFilterConfig(readerFactory.getFileReader(metadataConfig,
+        new StoragePathInfo(HFILE_PATH, 100, false, (short) 0, 0, 0), 
HoodieFileFormat.HFILE,
+        Option.<HoodieSchema>empty()), bloomFilterEnabled);
+    assertBloomFilterConfig(readerFactory.getContentReader(metadataConfig, 
HFILE_PATH, HoodieFileFormat.HFILE,
+        storage, new byte[0], Option.<HoodieSchema>empty()),
+        bloomFilterEnabled);
+  }
+
+  @Test
+  public void testHFileReaderDoesNotUseBloomFilterByDefault()
+      throws IOException, ReflectiveOperationException {
+    HoodieStorage storage = HoodieTestUtils.getDefaultStorage();
+    HoodieFileReaderFactory readerFactory = 
HoodieIOFactory.getIOFactory(storage)
+        .getReaderFactory(HoodieRecordType.AVRO);
+
+    
assertBloomFilterConfig(readerFactory.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER,
 HFILE_PATH), false);
+    
assertBloomFilterConfig(readerFactory.getFileReader(DEFAULT_HUDI_CONFIG_FOR_READER,
+        new StoragePathInfo(HFILE_PATH, 100, false, (short) 0, 0, 0), 
HoodieFileFormat.HFILE,
+        Option.<HoodieSchema>empty()), false);
+    
assertBloomFilterConfig(readerFactory.getContentReader(DEFAULT_HUDI_CONFIG_FOR_READER,
 HFILE_PATH,
+        HoodieFileFormat.HFILE, storage, new byte[0], 
Option.<HoodieSchema>empty()),
+        false);
+  }
+
+  private static TypedProperties bloomFilterProps(boolean enableBloomFilter) {
+    TypedProperties properties = new TypedProperties();
+    properties.setProperty(HoodieMetadataConfig.BLOOM_FILTER_ENABLE.key(), 
Boolean.toString(enableBloomFilter));
+    return properties;
+  }
+
+  private static void assertBloomFilterConfig(HoodieFileReader reader, boolean 
expectedBloomFilterEnabled)
+      throws ReflectiveOperationException {
+    HoodieNativeAvroHFileReader hfileReader = 
assertInstanceOf(HoodieNativeAvroHFileReader.class, reader);
+    Field useBloomFilterField = 
HoodieNativeAvroHFileReader.class.getDeclaredField("useBloomFilter");
+    useBloomFilterField.setAccessible(true);
+    assertEquals(expectedBloomFilterEnabled, 
useBloomFilterField.getBoolean(hfileReader));
+  }
 }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
index d7a112a5f2d7..f2225ff3ca66 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/io/hadoop/TestHoodieHFileReaderWriter.java
@@ -23,7 +23,7 @@ import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.TypedProperties;
-import org.apache.hudi.common.engine.TaskContextSupplier;
+import org.apache.hudi.common.engine.LocalTaskContextSupplier;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieKey;
@@ -53,6 +53,7 @@ import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.CsvSource;
@@ -74,7 +75,6 @@ import java.util.Set;
 import java.util.Spliterator;
 import java.util.Spliterators;
 import java.util.TreeMap;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
@@ -99,6 +99,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -157,18 +158,22 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
   @Override
   protected HoodieAvroHFileWriter createWriter(
       HoodieSchema schema, boolean populateMetaFields) throws Exception {
+    return createWriter(schema, populateMetaFields, true);
+  }
+
+  protected HoodieAvroHFileWriter createWriter(
+      HoodieSchema schema, boolean populateMetaFields, boolean 
hfileBloomFilterEnabled) throws Exception {
     String instantTime = "000";
     HoodieStorage storage = HoodieTestUtils.getStorage(getFilePath());
     Properties props = new Properties();
     props.setProperty(HoodieTableConfig.POPULATE_META_FIELDS.key(), 
Boolean.toString(populateMetaFields));
-    TaskContextSupplier mockTaskContextSupplier = 
mock(TaskContextSupplier.class);
-    Supplier<Integer> partitionSupplier = mock(Supplier.class);
-    
when(mockTaskContextSupplier.getPartitionIdSupplier()).thenReturn(partitionSupplier);
-    when(partitionSupplier.get()).thenReturn(10);
 
     return (HoodieAvroHFileWriter) HoodieFileWriterFactory.getFileWriter(
-        instantTime, getFilePath(), storage, 
HoodieStorageConfig.newBuilder().fromProperties(props).build(), schema,
-        mockTaskContextSupplier, HoodieRecord.HoodieRecordType.AVRO);
+        instantTime, getFilePath(), storage, HoodieStorageConfig.newBuilder()
+            .fromProperties(props)
+            .hfileBloomFilterEnable(hfileBloomFilterEnabled)
+            .build(), schema,
+        new LocalTaskContextSupplier(), HoodieRecord.HoodieRecordType.AVRO);
   }
 
   @Override
@@ -260,6 +265,64 @@ public class TestHoodieHFileReaderWriter extends 
TestHoodieReaderWriterBase {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testHFileBloomFilterMetadataWriteConfig(boolean 
hfileBloomFilterEnabled) throws Exception {
+    HoodieSchema schema = 
getHoodieSchemaFromResource(TestHoodieReaderWriterBase.class, 
"/exampleSchema.avsc");
+    HoodieAvroHFileWriter writer = createWriter(schema, true, 
hfileBloomFilterEnabled);
+    for (int i = 0; i < 3; i++) {
+      GenericRecord record = new GenericData.Record(schema.getAvroSchema());
+      String key = "key" + String.format("%02d", i);
+      record.put("_row_key", key);
+      record.put("time", Integer.toString(i));
+      record.put("number", i);
+      writer.writeAvro(key, record);
+    }
+    writer.close();
+
+    HoodieStorage storage = HoodieTestUtils.getStorage(getFilePath());
+    try (HFileReader hfileReader = HFileReaderFactory.builder()
+        .withStorage(storage)
+        .withProps(DEFAULT_PROPS)
+        .withPath(getFilePath())
+        .build()
+        .createHFileReader()) {
+      hfileReader.initializeMetadata();
+      assertEquals(hfileBloomFilterEnabled, 
hfileReader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK).isPresent());
+      assertEquals(hfileBloomFilterEnabled, hfileReader.getMetaInfo(new 
UTF8StringKey(KEY_BLOOM_FILTER_TYPE_CODE)).isPresent());
+      assertEquals(hfileBloomFilterEnabled, hfileReader.getMetaInfo(new 
UTF8StringKey(KEY_MIN_RECORD)).isPresent());
+      assertEquals(hfileBloomFilterEnabled, hfileReader.getMetaInfo(new 
UTF8StringKey(KEY_MAX_RECORD)).isPresent());
+      assertTrue(hfileReader.getMetaInfo(new 
UTF8StringKey(SCHEMA_KEY)).isPresent());
+    }
+  }
+
+  @Test
+  public void testPointLookupFallsBackWhenBloomFilterMetadataIsMissing() 
throws Exception {
+    HoodieSchema schema = 
getHoodieSchemaFromResource(TestHoodieReaderWriterBase.class, 
"/exampleSchema.avsc");
+    HoodieAvroHFileWriter writer = createWriter(schema, true, false);
+    for (int i = 0; i < 3; i++) {
+      GenericRecord record = new GenericData.Record(schema.getAvroSchema());
+      String key = "key" + String.format("%02d", i);
+      record.put("_row_key", key);
+      record.put("time", Integer.toString(i));
+      record.put("number", i);
+      writer.writeAvro(key, record);
+    }
+    writer.close();
+
+    HoodieStorage storage = HoodieTestUtils.getStorage(getFilePath());
+    try (HoodieAvroHFileReaderImplBase hfileReader = createReader(storage, 
true)) {
+      assertNull(hfileReader.readBloomFilter());
+
+      List<IndexedRecord> records = HoodieAvroHFileReaderImplBase.readRecords(
+          hfileReader, new ArrayList<>(Arrays.asList("key00", "key02", 
"missing-key")));
+
+      assertEquals(2, records.size());
+      assertEquals("key00", ((GenericRecord) 
records.get(0)).get("_row_key").toString());
+      assertEquals("key02", ((GenericRecord) 
records.get(1)).get("_row_key").toString());
+    }
+  }
+
   @Disabled("Disable the test with evolved schema for HFile since it's not 
supported")
   @ParameterizedTest
   @Override

Reply via email to