nsivabalan commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r795001136



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -399,4 +775,116 @@ public static int mapRecordKeyToFileGroupIndex(String 
recordKey, int numFileGrou
     return fileSliceStream.sorted((s1, s2) -> 
s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList());
   }
 
+  public static List<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,

Review comment:
       comment about L760 to L767. I see we are instantiating the 
HoodieTableFileSystemView everytime here. May be we can try to move the fsView 
instantiation to the caller and thus amortize the cost. For eg, 
   ```
   enablePartition(MetadataPartitionType.FILES, metadataConfig, metaClient, 
isBootstrapCompleted);
   ```
   we make similar calls thrice with this patch. So, we can instantiate just 
once outside and use the same here. Try to check for other callers too if we 
can do any such amortization. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
##########
@@ -101,4 +116,34 @@ public static HoodieRecord getTaggedRecord(HoodieRecord 
inputRecord, Option<Hood
     }
     return record;
   }
+
+  /**
+   * Given a list of row keys and one file, return only row keys existing in 
that file.
+   *
+   * @param filePath            - File to filter keys from
+   * @param candidateRecordKeys - Candidate keys to filter
+   * @return List of candidate keys that are available in the file
+   */
+  public static List<String> filterKeysFromFile(Path filePath, List<String> 
candidateRecordKeys,
+                                                Configuration configuration) 
throws HoodieIndexException {
+    ValidationUtils.checkArgument(FSUtils.isBaseFile(filePath));
+    List<String> foundRecordKeys = new ArrayList<>();
+    try {
+      // Load all rowKeys from the file, to double-confirm
+      if (!candidateRecordKeys.isEmpty()) {
+        HoodieTimer timer = new HoodieTimer().startTimer();
+        HoodieFileReader fileReader = 
HoodieFileReaderFactory.getFileReader(configuration, filePath);
+        Set<String> fileRowKeys = fileReader.filterRowKeys(new 
TreeSet<>(candidateRecordKeys));

Review comment:
       for non metadata path, we can avoid sorting the candidateRecordKeys 
here, just to keep it same as before. 
   

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -1435,6 +1435,14 @@ public boolean useBloomIndexBucketizedChecking() {
     return getBoolean(HoodieIndexConfig.BLOOM_INDEX_BUCKETIZED_CHECKING);
   }
 
+  public boolean isMetadataBloomFilterIndexEnabled() {
+    return isMetadataTableEnabled() && 
getMetadataConfig().isBloomFilterIndexEnabled();
+  }
+
+  public boolean isMetadataIndexColumnStatsForAllColumnsEnabled() {
+    return isMetadataTableEnabled() && 
getMetadataConfig().isMetadataColumnStatsIndexForAllColumnsEnabled();

Review comment:
       supporting a subset configurable by users are not supported in this 
patch is it? do we have a tracking ticket. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLookupHandle.java
##########
@@ -46,52 +51,58 @@
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieKeyLookupHandle.class);
 
-  private final HoodieTableType tableType;
-
   private final BloomFilter bloomFilter;
-
   private final List<String> candidateRecordKeys;
-
+  private final boolean useMetadataTableIndex;
+  private Option<String> fileName = Option.empty();
   private long totalKeysChecked;
 
   public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, 
O> hoodieTable,
-                               Pair<String, String> partitionPathFilePair) {
-    super(config, null, hoodieTable, partitionPathFilePair);
-    this.tableType = hoodieTable.getMetaClient().getTableType();
+                               Pair<String, String> partitionPathFileIDPair) {
+    this(config, hoodieTable, partitionPathFileIDPair, Option.empty(), false);
+  }
+
+  public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T, I, K, 
O> hoodieTable,
+                               Pair<String, String> partitionPathFileIDPair, 
Option<String> fileName,
+                               boolean useMetadataTableIndex) {
+    super(config, hoodieTable, partitionPathFileIDPair);
     this.candidateRecordKeys = new ArrayList<>();
     this.totalKeysChecked = 0;
-    HoodieTimer timer = new HoodieTimer().startTimer();
-
-    try {
-      this.bloomFilter = createNewFileReader().readBloomFilter();
-    } catch (IOException e) {
-      throw new HoodieIndexException(String.format("Error reading bloom filter 
from %s: %s", partitionPathFilePair, e));
+    if (fileName.isPresent()) {
+      
ValidationUtils.checkArgument(FSUtils.getFileId(fileName.get()).equals(getFileId()),
+          "File name '" + fileName.get() + "' doesn't match this lookup handle 
fileid '" + getFileId() + "'");
+      this.fileName = fileName;
     }
-    LOG.info(String.format("Read bloom filter from %s in %d ms", 
partitionPathFilePair, timer.endTimer()));
+    this.useMetadataTableIndex = useMetadataTableIndex;
+    this.bloomFilter = getBloomFilter();
   }
 
-  /**
-   * Given a list of row keys and one file, return only row keys existing in 
that file.
-   */
-  public List<String> checkCandidatesAgainstFile(Configuration configuration, 
List<String> candidateRecordKeys,
-                                                 Path filePath) throws 
HoodieIndexException {
-    List<String> foundRecordKeys = new ArrayList<>();
+  private BloomFilter getBloomFilter() {
+    BloomFilter bloomFilter = null;
+    HoodieTimer timer = new HoodieTimer().startTimer();
     try {
-      // Load all rowKeys from the file, to double-confirm
-      if (!candidateRecordKeys.isEmpty()) {
-        HoodieTimer timer = new HoodieTimer().startTimer();
-        Set<String> fileRowKeys = createNewFileReader().filterRowKeys(new 
HashSet<>(candidateRecordKeys));
-        foundRecordKeys.addAll(fileRowKeys);
-        LOG.info(String.format("Checked keys against file %s, in %d ms. 
#candidates (%d) #found (%d)", filePath,
-            timer.endTimer(), candidateRecordKeys.size(), 
foundRecordKeys.size()));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Keys matching for file " + filePath + " => " + 
foundRecordKeys);
+      if (this.useMetadataTableIndex) {
+        ValidationUtils.checkArgument(this.fileName.isPresent(),
+            "File name not available to fetch bloom filter from the metadata 
table index.");
+        Option<ByteBuffer> bloomFilterByteBuffer =
+            
hoodieTable.getMetadataTable().getBloomFilter(partitionPathFileIDPair.getLeft(),
 fileName.get());
+        if (!bloomFilterByteBuffer.isPresent()) {
+          throw new HoodieIndexException("BloomFilter missing for " + 
partitionPathFileIDPair.getRight());
+        }
+        bloomFilter =

Review comment:
       if someone has explicitly overrode the bloom type as simple, this will 
fail. Can we fix this. should not take much time. 
   

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -145,22 +151,82 @@
       
ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(),
           "File listing cannot be used for Metadata Table");
 
-      initRegistry();
       this.dataMetaClient =
           
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
+      enablePartitions();
+      initRegistry();
       initialize(engineContext, actionMetadata, inflightInstantTimestamp);
       initTableMetadata();
     } else {
       enabled = false;
-      this.metrics = Option.empty();
     }
   }
 
   public HoodieBackedTableMetadataWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig,
-      HoodieEngineContext engineContext) {
+                                         HoodieEngineContext engineContext) {
     this(hadoopConf, writeConfig, engineContext, Option.empty(), 
Option.empty());
   }
 
+  /**
+   * Enable metadata table partitions based on config.
+   */
+  private void enablePartitions() {
+    final HoodieMetadataConfig metadataConfig = 
dataWriteConfig.getMetadataConfig();
+    boolean isBootstrapCompleted = false;
+    Option<HoodieTableMetaClient> metaClient = Option.empty();
+    try {
+      isBootstrapCompleted = dataMetaClient.getFs().exists(new 
Path(metadataWriteConfig.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME));
+      if (isBootstrapCompleted) {
+        metaClient = 
Option.of(HoodieTableMetaClient.builder().setConf(hadoopConf.get())
+            .setBasePath(metadataWriteConfig.getBasePath()).build());
+      }
+    } catch (IOException e) {
+      throw new HoodieException("Failed to enable metadata partitions!", e);
+    }
+
+    enablePartition(MetadataPartitionType.FILES, metadataConfig, metaClient, 
isBootstrapCompleted);

Review comment:
       may I know why this is called enablePartition. I don't see we do much 
wrt enabling the partition as such. can you throw some light.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -133,30 +143,83 @@ public HoodieBloomIndex(HoodieWriteConfig config, 
BaseHoodieBloomIndexHelper blo
   /**
    * Load all involved files as <Partition, filename> pair List.
    */
-  List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(
+  List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
       List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
     // Obtain the latest data files from all the partitions.
     List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
         .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
         .collect(toList());
 
-    if (config.getBloomIndexPruneByRanges()) {
-      // also obtain file ranges, if range pruning is enabled
-      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
-      return context.map(partitionPathFileIDList, pf -> {
-        try {
-          HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
-          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
-        } catch (MetadataNotFoundException me) {
-          LOG.warn("Unable to find range metadata in file :" + pf);
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+    context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
+    return context.map(partitionPathFileIDList, pf -> {
+      try {
+        HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
+        String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
+      } catch (MetadataNotFoundException me) {
+        LOG.warn("Unable to find range metadata in file :" + pf);
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+      }
+    }, Math.max(partitionPathFileIDList.size(), 1));
+  }
+
+  /**
+   * Get BloomIndexFileInfo for all the latest base files for the requested 
partitions.
+   *
+   * @param partitions  - List of partitions to get the base files for
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie Table
+   * @return List of partition and file column range info pairs
+   */
+  private List<Pair<String, BloomIndexFileInfo>> getFileInfoForLatestBaseFiles(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context,
+        hoodieTable).stream()
+        .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
+        .collect(toList());
+    return partitionPathFileIDList.stream()
+        .map(pf -> Pair.of(pf.getKey(), new 
BloomIndexFileInfo(pf.getValue()))).collect(toList());
+  }
+
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved 
files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be 
loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   * @return List of partition and file column range info pairs
+   */
+  protected List<Pair<String, BloomIndexFileInfo>> 
loadColumnRangesFromMetaIndex(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    // also obtain file ranges, if range pruning is enabled
+    context.setJobStatus(this.getClass().getName(), "Load meta index key 
ranges for file slices");
+
+    final String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+    return context.flatMap(partitions, partitionName -> {
+      List<Pair<String, String>> columnStatKeys = 
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,

Review comment:
       columnStatsPartitionKeys might be a better name

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -133,30 +143,83 @@ public HoodieBloomIndex(HoodieWriteConfig config, 
BaseHoodieBloomIndexHelper blo
   /**
    * Load all involved files as <Partition, filename> pair List.
    */
-  List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(
+  List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
       List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
     // Obtain the latest data files from all the partitions.
     List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
         .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
         .collect(toList());
 
-    if (config.getBloomIndexPruneByRanges()) {
-      // also obtain file ranges, if range pruning is enabled
-      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
-      return context.map(partitionPathFileIDList, pf -> {
-        try {
-          HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
-          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
-        } catch (MetadataNotFoundException me) {
-          LOG.warn("Unable to find range metadata in file :" + pf);
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+    context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
+    return context.map(partitionPathFileIDList, pf -> {
+      try {
+        HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
+        String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
+      } catch (MetadataNotFoundException me) {
+        LOG.warn("Unable to find range metadata in file :" + pf);
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+      }
+    }, Math.max(partitionPathFileIDList.size(), 1));
+  }
+
+  /**
+   * Get BloomIndexFileInfo for all the latest base files for the requested 
partitions.
+   *
+   * @param partitions  - List of partitions to get the base files for
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie Table
+   * @return List of partition and file column range info pairs
+   */
+  private List<Pair<String, BloomIndexFileInfo>> getFileInfoForLatestBaseFiles(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context,
+        hoodieTable).stream()
+        .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
+        .collect(toList());
+    return partitionPathFileIDList.stream()
+        .map(pf -> Pair.of(pf.getKey(), new 
BloomIndexFileInfo(pf.getValue()))).collect(toList());
+  }
+
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved 
files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be 
loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   * @return List of partition and file column range info pairs
+   */
+  protected List<Pair<String, BloomIndexFileInfo>> 
loadColumnRangesFromMetaIndex(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    // also obtain file ranges, if range pruning is enabled
+    context.setJobStatus(this.getClass().getName(), "Load meta index key 
ranges for file slices");
+
+    final String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+    return context.flatMap(partitions, partitionName -> {
+      List<Pair<String, String>> columnStatKeys = 
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
+              hoodieTable).stream().map(baseFile -> Pair.of(partitionName, 
baseFile.getFileName()))
+          .sorted()
+          .collect(toList());
+      if (columnStatKeys.isEmpty()) {
+        return Stream.empty();
+      }
+      try {
+        Map<Pair<String, String>, HoodieColumnStats> fileToColumnStatMap = 
hoodieTable

Review comment:
       minor. fileToColumnStat**s**Map

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * Spark Function2 implementation for checking bloom filters for the
+ * requested keys from the metadata table index. The bloom filter
+ * checking for keys and the actual file verification for the
+ * candidate keys is done in an iterative fashion. In each iteration,
+ * bloom filters are requested for a batch of partition files and the
+ * keys are checked against them.
+ */
+public class HoodieMetadataBloomIndexCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetadataBloomIndexCheckFunction.class);
+
+  // Assuming each file bloom filter takes up 512K, sizing the max file count
+  // per batch so that the total fetched bloom filters would not cross 128 MB.
+  private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256;
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieMetadataBloomIndexCheckFunction(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, 
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    return new BloomIndexLazyKeyCheckIterator(tuple2Iterator);
+  }
+
+  private class BloomIndexLazyKeyCheckIterator extends 
LazyIterableIterator<Tuple2<String, HoodieKey>, List<HoodieKeyLookupResult>> {
+    public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<String, HoodieKey>> 
tuple2Iterator) {
+      super(tuple2Iterator);
+    }
+
+    @Override
+    protected void start() {
+    }
+
+    @Override
+    protected List<HoodieKeyLookupResult> computeNext() {
+      // Partition path and file name pair to list of keys
+      final Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new 
HashMap<>();
+      final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+      final List<HoodieKeyLookupResult> resultList = new ArrayList<>();
+
+      while (inputItr.hasNext()) {
+        Tuple2<String, HoodieKey> entry = inputItr.next();
+        final String partitionPath = entry._2.getPartitionPath();
+        final String fileId = entry._1;
+        if (!fileIDBaseFileMap.containsKey(fileId)) {
+          Option<HoodieBaseFile> baseFile = 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
+          if (!baseFile.isPresent()) {
+            throw new HoodieIndexException("Failed to find the base file for 
partition: " + partitionPath
+                + ", fileId: " + fileId);
+          }
+          fileIDBaseFileMap.put(fileId, baseFile.get());
+        }
+        fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId).getFileName()),
+            k -> new ArrayList<>()).add(entry._2);
+        if (fileToKeysMap.size() > 
BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
+          break;
+        }
+      }
+      if (fileToKeysMap.isEmpty()) {
+        return Collections.emptyList();
+      }
+
+      List<Pair<String, String>> partitionNameFileNameList = 
fileToKeysMap.keySet()
+          .stream().collect(Collectors.toList());
+      Map<Pair<String, String>, ByteBuffer> fileIDToBloomFilterByteBufferMap =
+          
hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);
+
+      final AtomicInteger totalKeys = new AtomicInteger(0);
+      fileToKeysMap.forEach((partitionPathFileNamePair, hoodieKeyList) -> {
+        final String partitionPath = partitionPathFileNamePair.getLeft();
+        final String fileName = partitionPathFileNamePair.getRight();
+        final String fileId = FSUtils.getFileId(fileName);
+        ValidationUtils.checkState(!fileId.isEmpty());
+
+        if 
(!fileIDToBloomFilterByteBufferMap.containsKey(partitionPathFileNamePair)) {
+          throw new HoodieIndexException("Failed to get the bloom filter for " 
+ partitionPathFileNamePair);
+        }
+        final ByteBuffer fileBloomFilterByteBuffer = 
fileIDToBloomFilterByteBufferMap.get(partitionPathFileNamePair);
+
+        HoodieDynamicBoundedBloomFilter fileBloomFilter =

Review comment:
       metadata table always used Dynamic bloom is it? 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
##########
@@ -70,26 +69,44 @@ public static SparkHoodieBloomIndexHelper getInstance() {
     JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
         HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
             .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
-    Map<String, Long> comparisonsPerFileGroup = computeComparisonsPerFileGroup(
-        config, recordsPerPartition, partitionToFileInfo, fileComparisonsRDD, 
context);
-    int inputParallelism =
-        
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
+
+    int inputParallelism = 
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
     int joinParallelism = Math.max(inputParallelism, 
config.getBloomIndexParallelism());
     LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: 
${"
         + config.getBloomIndexParallelism() + "}");
 
-    if (config.useBloomIndexBucketizedChecking()) {
+    JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+    if (config.isMetaIndexBloomFilterEnabled()) {
+      // Step 1: Sort by file id
+      JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
+          fileComparisonsRDD.sortBy(entry -> entry._1, true, joinParallelism);

Review comment:
       I don't see some of the feedback has been addressed. can you please 
check them all.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * Spark Function2 implementation for checking bloom filters for the
+ * requested keys from the metadata table index. The bloom filter
+ * checking for keys and the actual file verification for the
+ * candidate keys is done in an iterative fashion. In each iteration,
+ * bloom filters are requested for a batch of partition files and the
+ * keys are checked against them.
+ */
+public class HoodieMetadataBloomIndexCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetadataBloomIndexCheckFunction.class);
+
+  // Assuming each file bloom filter takes up 512K, sizing the max file count
+  // per batch so that the total fetched bloom filters would not cross 128 MB.
+  private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256;

Review comment:
       can we make this configurable? 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * Spark Function2 implementation for checking bloom filters for the
+ * requested keys from the metadata table index. The bloom filter
+ * checking for keys and the actual file verification for the
+ * candidate keys is done in an iterative fashion. In each iteration,
+ * bloom filters are requested for a batch of partition files and the
+ * keys are checked against them.
+ */
+public class HoodieMetadataBloomIndexCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetadataBloomIndexCheckFunction.class);
+
+  // Assuming each file bloom filter takes up 512K, sizing the max file count
+  // per batch so that the total fetched bloom filters would not cross 128 MB.
+  private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256;
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieMetadataBloomIndexCheckFunction(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, 
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    return new BloomIndexLazyKeyCheckIterator(tuple2Iterator);
+  }
+
+  private class BloomIndexLazyKeyCheckIterator extends 
LazyIterableIterator<Tuple2<String, HoodieKey>, List<HoodieKeyLookupResult>> {
+    public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<String, HoodieKey>> 
tuple2Iterator) {
+      super(tuple2Iterator);
+    }
+
+    @Override
+    protected void start() {
+    }
+
+    @Override
+    protected List<HoodieKeyLookupResult> computeNext() {
+      // Partition path and file name pair to list of keys
+      final Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new 
HashMap<>();
+      final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+      final List<HoodieKeyLookupResult> resultList = new ArrayList<>();
+
+      while (inputItr.hasNext()) {
+        Tuple2<String, HoodieKey> entry = inputItr.next();
+        final String partitionPath = entry._2.getPartitionPath();
+        final String fileId = entry._1;
+        if (!fileIDBaseFileMap.containsKey(fileId)) {
+          Option<HoodieBaseFile> baseFile = 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
+          if (!baseFile.isPresent()) {
+            throw new HoodieIndexException("Failed to find the base file for 
partition: " + partitionPath
+                + ", fileId: " + fileId);
+          }
+          fileIDBaseFileMap.put(fileId, baseFile.get());
+        }
+        fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId).getFileName()),
+            k -> new ArrayList<>()).add(entry._2);
+        if (fileToKeysMap.size() > 
BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
+          break;
+        }
+      }
+      if (fileToKeysMap.isEmpty()) {
+        return Collections.emptyList();
+      }
+
+      List<Pair<String, String>> partitionNameFileNameList = 
fileToKeysMap.keySet()
+          .stream().collect(Collectors.toList());
+      Map<Pair<String, String>, ByteBuffer> fileIDToBloomFilterByteBufferMap =

Review comment:
       fileIdToBloomFilter map (naming) would suffice. its understandable that 
bloom filter is represented as byte buffer. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,12 +156,123 @@ protected BaseTableMetadata(HoodieEngineContext 
engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final String partitionName, final 
String fileName)
+      throws HoodieMetadataException {
+    if (!isBloomFilterIndexEnabled) {
+      LOG.error("Metadata bloom filter index is disabled!");
+      return Option.empty();
+    }
+
+    final Pair<String, String> partitionFileName = Pair.of(partitionName, 
fileName);
+    Map<Pair<String, String>, ByteBuffer> bloomFilters = 
getBloomFilters(Collections.singletonList(partitionFileName));
+    if (bloomFilters.isEmpty()) {
+      LOG.error("Meta index: missing bloom filter for partition: " + 
partitionName + ", file: " + fileName);
+      return Option.empty();
+    }
+
+    ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+    return Option.of(bloomFilters.get(partitionFileName));
+  }
+
+  @Override
+  public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList)
+      throws HoodieMetadataException {
+    if (!isBloomFilterIndexEnabled) {
+      LOG.error("Metadata bloom filter index is disabled!");
+      return Collections.emptyMap();
+    }
+    if (partitionNameFileNameList.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+    Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+          final String bloomKey = new 
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()
+              .concat(new 
FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString());
+          partitionIDFileIDSortedStrings.add(bloomKey);
+          fileToKeyMap.put(bloomKey, partitionNameFileNamePair);
+        }
+    );
+
+    List<String> partitionIDFileIDStrings = new 
ArrayList<>(partitionIDFileIDSortedStrings);
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
hoodieRecordList =
+        getRecordsByKeys(partitionIDFileIDStrings, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
+        (timer.endTimer() / partitionIDFileIDStrings.size())));
+
+    Map<Pair<String, String>, ByteBuffer> partitionFileToBloomFilterMap = new 
HashMap<>();
+    for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry 
: hoodieRecordList) {
+      if (entry.getRight().isPresent()) {
+        final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
+            entry.getRight().get().getData().getBloomFilterMetadata();
+        if (bloomFilterMetadata.isPresent()) {
+          if (!bloomFilterMetadata.get().getIsDeleted()) {
+            
ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
+            
partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()), 
bloomFilterMetadata.get().getBloomFilter());
+          }
+        } else {
+          LOG.error("Meta index bloom filter missing for: " + 
fileToKeyMap.get(entry.getLeft()));
+        }
+      }
+    }
+    return partitionFileToBloomFilterMap;
+  }
+
+  @Override
+  public Map<Pair<String, String>, HoodieColumnStats> getColumnStats(final 
List<Pair<String, String>> partitionNameFileNameList, final String columnName)
+      throws HoodieMetadataException {
+    if (!isColumnStatsIndexEnabled) {
+      LOG.error("Metadata column stats index is disabled!");
+      return Collections.emptyMap();
+    }
+
+    Map<String, Pair<String, String>> columnStatKeyToFileNameMap = new 
HashMap<>();
+    TreeSet<String> sortedKeys = new TreeSet<>();
+    final String columnIndexStr = new 
ColumnIndexID(columnName).asBase64EncodedString();
+    for (Pair<String, String> partitionNameFileNamePair : 
partitionNameFileNameList) {
+      final String columnStatIndexKey = columnIndexStr

Review comment:
       columnStatsPartitionKey

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.bloom.HoodieDynamicBoundedBloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+/**
+ * Spark Function2 implementation for checking bloom filters for the
+ * requested keys from the metadata table index. The bloom filter
+ * checking for keys and the actual file verification for the
+ * candidate keys is done in an iterative fashion. In each iteration,
+ * bloom filters are requested for a batch of partition files and the
+ * keys are checked against them.
+ */
+public class HoodieMetadataBloomIndexCheckFunction implements
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetadataBloomIndexCheckFunction.class);
+
+  // Assuming each file bloom filter takes up 512K, sizing the max file count
+  // per batch so that the total fetched bloom filters would not cross 128 MB.
+  private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256;
+  private final HoodieTable hoodieTable;
+  private final HoodieWriteConfig config;
+
+  public HoodieMetadataBloomIndexCheckFunction(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, 
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
+    return new BloomIndexLazyKeyCheckIterator(tuple2Iterator);
+  }
+
+  private class BloomIndexLazyKeyCheckIterator extends 
LazyIterableIterator<Tuple2<String, HoodieKey>, List<HoodieKeyLookupResult>> {
+    public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<String, HoodieKey>> 
tuple2Iterator) {
+      super(tuple2Iterator);
+    }
+
+    @Override
+    protected void start() {
+    }
+
+    @Override
+    protected List<HoodieKeyLookupResult> computeNext() {
+      // Partition path and file name pair to list of keys
+      final Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new 
HashMap<>();
+      final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+      final List<HoodieKeyLookupResult> resultList = new ArrayList<>();
+
+      while (inputItr.hasNext()) {
+        Tuple2<String, HoodieKey> entry = inputItr.next();
+        final String partitionPath = entry._2.getPartitionPath();
+        final String fileId = entry._1;
+        if (!fileIDBaseFileMap.containsKey(fileId)) {
+          Option<HoodieBaseFile> baseFile = 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
+          if (!baseFile.isPresent()) {
+            throw new HoodieIndexException("Failed to find the base file for 
partition: " + partitionPath
+                + ", fileId: " + fileId);
+          }
+          fileIDBaseFileMap.put(fileId, baseFile.get());
+        }
+        fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId).getFileName()),
+            k -> new ArrayList<>()).add(entry._2);
+        if (fileToKeysMap.size() > 
BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
+          break;
+        }
+      }
+      if (fileToKeysMap.isEmpty()) {
+        return Collections.emptyList();
+      }
+
+      List<Pair<String, String>> partitionNameFileNameList = 
fileToKeysMap.keySet()

Review comment:
       we can simplify this
   ```
         List<Pair<String, String>> partitionNameFileNameList = new 
ArrayList<>(fileToKeysMap.keySet());
   ```

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,12 +156,123 @@ protected BaseTableMetadata(HoodieEngineContext 
engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final String partitionName, final 
String fileName)
+      throws HoodieMetadataException {
+    if (!isBloomFilterIndexEnabled) {
+      LOG.error("Metadata bloom filter index is disabled!");
+      return Option.empty();
+    }
+
+    final Pair<String, String> partitionFileName = Pair.of(partitionName, 
fileName);
+    Map<Pair<String, String>, ByteBuffer> bloomFilters = 
getBloomFilters(Collections.singletonList(partitionFileName));
+    if (bloomFilters.isEmpty()) {
+      LOG.error("Meta index: missing bloom filter for partition: " + 
partitionName + ", file: " + fileName);
+      return Option.empty();
+    }
+
+    ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+    return Option.of(bloomFilters.get(partitionFileName));
+  }
+
+  @Override
+  public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList)
+      throws HoodieMetadataException {
+    if (!isBloomFilterIndexEnabled) {
+      LOG.error("Metadata bloom filter index is disabled!");
+      return Collections.emptyMap();
+    }
+    if (partitionNameFileNameList.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+    Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+          final String bloomKey = new 
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()

Review comment:
       bloomFilterPartitionKey

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
##########
@@ -133,30 +143,83 @@ public HoodieBloomIndex(HoodieWriteConfig config, 
BaseHoodieBloomIndexHelper blo
   /**
    * Load all involved files as <Partition, filename> pair List.
    */
-  List<Pair<String, BloomIndexFileInfo>> loadInvolvedFiles(
+  List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
       List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
     // Obtain the latest data files from all the partitions.
     List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
         .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
         .collect(toList());
 
-    if (config.getBloomIndexPruneByRanges()) {
-      // also obtain file ranges, if range pruning is enabled
-      context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
-      return context.map(partitionPathFileIDList, pf -> {
-        try {
-          HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
-          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
-        } catch (MetadataNotFoundException me) {
-          LOG.warn("Unable to find range metadata in file :" + pf);
-          return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+    context.setJobStatus(this.getClass().getName(), "Obtain key ranges for 
file slices (range pruning=on)");
+    return context.map(partitionPathFileIDList, pf -> {
+      try {
+        HoodieRangeInfoHandle rangeInfoHandle = new 
HoodieRangeInfoHandle(config, hoodieTable, pf);
+        String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue(), 
minMaxKeys[0], minMaxKeys[1]));
+      } catch (MetadataNotFoundException me) {
+        LOG.warn("Unable to find range metadata in file :" + pf);
+        return Pair.of(pf.getKey(), new BloomIndexFileInfo(pf.getValue()));
+      }
+    }, Math.max(partitionPathFileIDList.size(), 1));
+  }
+
+  /**
+   * Get BloomIndexFileInfo for all the latest base files for the requested 
partitions.
+   *
+   * @param partitions  - List of partitions to get the base files for
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie Table
+   * @return List of partition and file column range info pairs
+   */
+  private List<Pair<String, BloomIndexFileInfo>> getFileInfoForLatestBaseFiles(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    List<Pair<String, String>> partitionPathFileIDList = 
getLatestBaseFilesForAllPartitions(partitions, context,
+        hoodieTable).stream()
+        .map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
+        .collect(toList());
+    return partitionPathFileIDList.stream()
+        .map(pf -> Pair.of(pf.getKey(), new 
BloomIndexFileInfo(pf.getValue()))).collect(toList());
+  }
+
+  /**
+   * Load the column stats index as BloomIndexFileInfo for all the involved 
files in the partition.
+   *
+   * @param partitions  - List of partitions for which column stats need to be 
loaded
+   * @param context     - Engine context
+   * @param hoodieTable - Hoodie table
+   * @return List of partition and file column range info pairs
+   */
+  protected List<Pair<String, BloomIndexFileInfo>> 
loadColumnRangesFromMetaIndex(
+      List<String> partitions, final HoodieEngineContext context, final 
HoodieTable hoodieTable) {
+    // also obtain file ranges, if range pruning is enabled
+    context.setJobStatus(this.getClass().getName(), "Load meta index key 
ranges for file slices");
+
+    final String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+    return context.flatMap(partitions, partitionName -> {
+      List<Pair<String, String>> columnStatKeys = 
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,

Review comment:
       also, can you add a comment just above this line that pair represents 
partition path and file Name. In index code, usually its easy to get confused 
between fileId and fileName. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to