This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e270924c5eb [HUDI-5534] Optimizing Bloom Index lookup when using Bloom 
Filters from Metadata Table (#7642)
e270924c5eb is described below

commit e270924c5eb1548d9022fe3e52d5dd64f61fbdc1
Author: Alexey Kudinkin <alexey.kudin...@gmail.com>
AuthorDate: Fri Jan 27 15:47:12 2023 -0800

    [HUDI-5534] Optimizing Bloom Index lookup when using Bloom Filters from 
Metadata Table (#7642)
    
    Most recently, while trying to use Metadata Table in Bloom Index it was 
resulting in failures due to exhaustion of S3 connection pool no matter how 
(reasonably big) we're setting the pool size (we've tested up to 3k 
connections).
    
    This PR focuses on optimizing the Bloom Index lookup sequence in case when 
it's leveraging Bloom Filter partition in Metadata Table. The premise of this 
change is based on the following observations:
    
    Increasing the size of the batch of the requests to MT allows to amortize 
the cost of processing it (bigger the batch, lesser the cost).
    
    Having too few partitions in the Bloom Index path however, starts to hurt 
parallelism when we actually probe individual files whether they actually 
contain target keys or not. Solution to this is to split these 2 in different 
stages w/ drastically different parallelism levels: constrain parallelism when 
reading from MT (10s of tasks) and keep at the current level for probing 
individual files (100s of tasks)
    
    Current way of partitioning records (relying on Spark's default 
partitioner) was entailing that every Spark executor with high likelihood will 
be opening up (and processing) every file-group of the MT Bloom Filter 
partition. To alleviate that same hashing algorithm used by MT should be used 
to partition records into Spark's individual partitions, so that we can limit 
every task to open no more than 1 file-group in Bloom Filter's partition of MT
    
    To achieve that following changes in Bloom Index sequence (leveraging MT) 
are implemented
    
    Bloom Filter probing and actual File Probing are split into 2 separate 
operations (so that parallelism of each of them could be controlled 
individually)
    Requests to MT are replaced to invoke batch APIs
    Custom partitioner is introduced AffineBloomIndexFileGroupPartitioner 
repartitioning dataset of filenames with corresponding record keys in a way 
that is affine w/ MT Bloom Filters' partitioning (allowing us to open no more 
than a single file-group per Spark's task)
    Additionally, this PR addresses some of the low-hanging performance 
optimizations that could considerably improve performance of the Bloom Index 
lookup sequence like mapping file-comparison pairs to PairRDD (where key is 
file-name, and value is record-key) instead of RDD so that we could:
    
    Do in-partition sorting by filename (to make sure we check all records w/in 
the file all at once) w/in a single Spark partition instead of global one 
(reducing shuffling as well)
    Avoid re-shuffling (by re-mapping from RDD to PairRDD later)
---
 .../org/apache/hudi/index/HoodieIndexUtils.java    |   5 +-
 .../index/bloom/BaseHoodieBloomIndexHelper.java    |   5 +-
 .../apache/hudi/index/bloom/HoodieBloomIndex.java  |  77 +++----
 ...ion.java => HoodieBloomIndexCheckFunction.java} |  59 +++---
 .../hudi/index/bloom/HoodieGlobalBloomIndex.java   |  14 +-
 .../bloom/ListBasedHoodieBloomIndexHelper.java     |  26 +--
 .../index/bloom/TestFlinkHoodieBloomIndex.java     |   8 +-
 .../org/apache/hudi/data/HoodieJavaPairRDD.java    |   5 +
 .../java/org/apache/hudi/data/HoodieJavaRDD.java   |  14 +-
 .../bloom/BucketizedBloomCheckPartitioner.java     |  18 +-
 .../bloom/HoodieBloomFilterProbingResult.java      |  35 ++++
 .../index/bloom/HoodieBloomIndexCheckFunction.java | 120 -----------
 .../index/bloom/HoodieFileProbingFunction.java     | 143 +++++++++++++
 .../HoodieMetadataBloomFilterProbingFunction.java  | 157 ++++++++++++++
 .../HoodieMetadataBloomIndexCheckFunction.java     | 154 --------------
 .../index/bloom/SparkHoodieBloomIndexHelper.java   | 229 ++++++++++++++++++---
 .../org/apache/spark/sql/hudi/SparkAdapter.scala   |   4 +-
 .../bloom/TestBucketizedBloomCheckPartitioner.java |  43 ++--
 .../hudi/index/bloom/TestHoodieBloomIndex.java     |  10 +-
 .../index/bloom/TestHoodieGlobalBloomIndex.java    |  10 +-
 .../org/apache/hudi/common/data/HoodieData.java    |  17 +-
 .../apache/hudi/common/data/HoodieListData.java    |  10 +
 .../hudi/common/data/HoodieListPairData.java       |  23 +++
 .../apache/hudi/common/data/HoodiePairData.java    |  11 +-
 .../ThrowingConsumer.java}                         |  31 +--
 .../hudi/common/model/HoodieAvroIndexedRecord.java |   2 +-
 .../hudi/common/table/HoodieTableMetaClient.java   |   4 +
 .../common/util/collection/FlatteningIterator.java |  55 +++++
 .../common/util/collection/MappingIterator.java    |   4 +-
 .../apache/hudi/metadata/BaseTableMetadata.java    |  53 +++--
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  34 ++-
 .../metadata/HoodieMetadataFileSystemView.java     |   5 +-
 .../main/java/org/apache/hudi/util/Transient.java  | 108 ++++++++++
 .../hudi/common/util/collection/TestIterators.java |  48 +++++
 .../apache/spark/sql/adapter/Spark2Adapter.scala   |   4 +-
 35 files changed, 1038 insertions(+), 507 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
index 8a8a03e1b17..2ec46f19a43 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/HoodieIndexUtils.java
@@ -62,9 +62,8 @@ public class HoodieIndexUtils {
    * @param hoodieTable Instance of {@link HoodieTable} of interest
    * @return the list of {@link HoodieBaseFile}
    */
-  public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
-      final String partition,
-      final HoodieTable hoodieTable) {
+  public static List<HoodieBaseFile> getLatestBaseFilesForPartition(String 
partition,
+                                                                    
HoodieTable hoodieTable) {
     Option<HoodieInstant> latestCommitTime = 
hoodieTable.getMetaClient().getCommitsTimeline()
         .filterCompletedInstants().lastInstant();
     if (latestCommitTime.isPresent()) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java
index 9430d9bb5e5..f144540ed22 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/BaseHoodieBloomIndexHelper.java
@@ -19,12 +19,11 @@
 
 package org.apache.hudi.index.bloom;
 
-import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
@@ -51,7 +50,7 @@ public abstract class BaseHoodieBloomIndexHelper implements 
Serializable {
   public abstract HoodiePairData<HoodieKey, HoodieRecordLocation> 
findMatchingFilesForRecordKeys(
       HoodieWriteConfig config, HoodieEngineContext context, HoodieTable 
hoodieTable,
       HoodiePairData<String, String> partitionRecordKeyPairs,
-      HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
+      HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
       Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
       Map<String, Long> recordsPerPartition);
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
index 57d9def9b42..cf067c3a991 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -35,7 +36,6 @@ import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.MetadataNotFoundException;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndexUtils;
@@ -45,9 +45,9 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.util.stream.Collectors.groupingBy;
@@ -127,7 +127,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object, 
Object> {
 
     // Step 3: Obtain a HoodieData, for each incoming record, that already 
exists, with the file id,
     // that contains it.
-    HoodieData<Pair<String, HoodieKey>> fileComparisonPairs =
+    HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs =
         explodeRecordsWithFileComparisons(partitionToFileInfo, 
partitionRecordKeyPairs);
 
     return bloomIndexHelper.findMatchingFilesForRecordKeys(config, context, 
hoodieTable,
@@ -210,34 +210,35 @@ public class HoodieBloomIndex extends HoodieIndex<Object, 
Object> {
     // also obtain file ranges, if range pruning is enabled
     context.setJobStatus(this.getClass().getName(), "Load meta index key 
ranges for file slices: " + config.getTableName());
 
-    final String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
-    return context.flatMap(partitions, partitionName -> {
-      // Partition and file name pairs
-      List<Pair<String, String>> partitionFileNameList = 
HoodieIndexUtils.getLatestBaseFilesForPartition(partitionName,
-              hoodieTable).stream().map(baseFile -> Pair.of(partitionName, 
baseFile.getFileName()))
-          .sorted()
-          .collect(toList());
-      if (partitionFileNameList.isEmpty()) {
-        return Stream.empty();
-      }
-      try {
-        Map<Pair<String, String>, HoodieMetadataColumnStats> 
fileToColumnStatsMap =
-            
hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, keyField);
-        List<Pair<String, BloomIndexFileInfo>> result = new ArrayList<>();
-        for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry 
: fileToColumnStatsMap.entrySet()) {
-          result.add(Pair.of(entry.getKey().getLeft(),
-              new BloomIndexFileInfo(
-                  FSUtils.getFileId(entry.getKey().getRight()),
-                  // NOTE: Here we assume that the type of the primary key 
field is string
-                  (String) 
unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
-                  (String) 
unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
-              )));
-        }
-        return result.stream();
-      } catch (MetadataNotFoundException me) {
-        throw new HoodieMetadataException("Unable to find column range 
metadata for partition:" + partitionName, me);
-      }
-    }, Math.max(partitions.size(), 1));
+    String keyField = 
hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
+
+    // Partition and file name pairs
+    List<Pair<String, String>> partitionFileNameList =
+        HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, 
context, hoodieTable).stream()
+            .map(partitionBaseFilePair -> 
Pair.of(partitionBaseFilePair.getLeft(), 
partitionBaseFilePair.getRight().getFileName()))
+            .sorted()
+            .collect(toList());
+
+    if (partitionFileNameList.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    Map<Pair<String, String>, HoodieMetadataColumnStats> fileToColumnStatsMap =
+        hoodieTable.getMetadataTable().getColumnStats(partitionFileNameList, 
keyField);
+
+    List<Pair<String, BloomIndexFileInfo>> result = new 
ArrayList<>(fileToColumnStatsMap.size());
+
+    for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : 
fileToColumnStatsMap.entrySet()) {
+      result.add(Pair.of(entry.getKey().getLeft(),
+          new BloomIndexFileInfo(
+              FSUtils.getFileId(entry.getKey().getRight()),
+              // NOTE: Here we assume that the type of the primary key field 
is string
+              (String) 
unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
+              (String) 
unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
+          )));
+    }
+
+    return result;
   }
 
   @Override
@@ -278,7 +279,7 @@ public class HoodieBloomIndex extends HoodieIndex<Object, 
Object> {
    * Sub-partition to ensure the records can be looked up against files & also 
prune file<=>record comparisons based on
    * recordKey ranges in the index info.
    */
-  HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
+  HoodiePairData<HoodieFileGroupId, String> explodeRecordsWithFileComparisons(
       final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
       HoodiePairData<String, String> partitionRecordKeyPairs) {
     IndexFileFilter indexFileFilter =
@@ -289,11 +290,13 @@ public class HoodieBloomIndex extends HoodieIndex<Object, 
Object> {
       String recordKey = partitionRecordKeyPair.getRight();
       String partitionPath = partitionRecordKeyPair.getLeft();
 
-      return indexFileFilter.getMatchingFilesAndPartition(partitionPath, 
recordKey).stream()
-          .map(partitionFileIdPair -> (Pair<String, HoodieKey>) new 
ImmutablePair<>(partitionFileIdPair.getRight(),
-              new HoodieKey(recordKey, partitionPath)))
-          .collect(Collectors.toList());
-    }).flatMap(List::iterator);
+      return indexFileFilter.getMatchingFilesAndPartition(partitionPath, 
recordKey)
+          .stream()
+          .map(partitionFileIdPair ->
+              new ImmutablePair<>(
+                  new HoodieFileGroupId(partitionFileIdPair.getLeft(), 
partitionFileIdPair.getRight()), recordKey));
+    })
+        .flatMapToPair(Stream::iterator);
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
similarity index 61%
rename from 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java
rename to 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
index 80031f4e8f0..52b504e9ab1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBaseBloomIndexCheckFunction.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
@@ -19,7 +19,8 @@
 package org.apache.hudi.index.bloom;
 
 import org.apache.hudi.client.utils.LazyIterableIterator;
-import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.function.SerializableFunction;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -28,53 +29,67 @@ import org.apache.hudi.io.HoodieKeyLookupHandle;
 import org.apache.hudi.io.HoodieKeyLookupResult;
 import org.apache.hudi.table.HoodieTable;
 
-import java.util.function.Function;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.function.Function;
 
 /**
- * Function performing actual checking of list containing (fileId, hoodieKeys) 
against the actual files.
+ * Function accepting a tuple of {@link HoodieFileGroupId} and a record key 
and producing
+ * a list of {@link HoodieKeyLookupResult} for every file identified by the 
file-group ids
+ *
+ * @param <I> type of the tuple of {@code (HoodieFileGroupId, <record-key>)}. 
Note that this is
+ *           parameterized as generic such that this code could be reused for 
Spark as well
  */
-public class HoodieBaseBloomIndexCheckFunction
-    implements Function<Iterator<Pair<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {
+public class HoodieBloomIndexCheckFunction<I> 
+    implements Function<Iterator<I>, Iterator<List<HoodieKeyLookupResult>>>, 
Serializable {
 
   private final HoodieTable hoodieTable;
 
   private final HoodieWriteConfig config;
 
-  public HoodieBaseBloomIndexCheckFunction(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
+  private final SerializableFunction<I, HoodieFileGroupId> 
fileGroupIdExtractor;
+  private final SerializableFunction<I, String> recordKeyExtractor;
+
+  public HoodieBloomIndexCheckFunction(HoodieTable hoodieTable,
+                                       HoodieWriteConfig config,
+                                       SerializableFunction<I, 
HoodieFileGroupId> fileGroupIdExtractor,
+                                       SerializableFunction<I, String> 
recordKeyExtractor) {
     this.hoodieTable = hoodieTable;
     this.config = config;
+    this.fileGroupIdExtractor = fileGroupIdExtractor;
+    this.recordKeyExtractor = recordKeyExtractor;
   }
 
   @Override
-  public Iterator<List<HoodieKeyLookupResult>> apply(Iterator<Pair<String, 
HoodieKey>> filePartitionRecordKeyTripletItr) {
-    return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr);
+  public Iterator<List<HoodieKeyLookupResult>> apply(Iterator<I> 
fileGroupIdRecordKeyPairIterator) {
+    return new LazyKeyCheckIterator(fileGroupIdRecordKeyPairIterator);
   }
 
-  class LazyKeyCheckIterator extends LazyIterableIterator<Pair<String, 
HoodieKey>, List<HoodieKeyLookupResult>> {
+  protected class LazyKeyCheckIterator extends LazyIterableIterator<I, 
List<HoodieKeyLookupResult>> {
 
     private HoodieKeyLookupHandle keyLookupHandle;
 
-    LazyKeyCheckIterator(Iterator<Pair<String, HoodieKey>> 
filePartitionRecordKeyTripletItr) {
+    LazyKeyCheckIterator(Iterator<I> filePartitionRecordKeyTripletItr) {
       super(filePartitionRecordKeyTripletItr);
     }
 
-    @Override
-    protected void start() {
-    }
-
     @Override
     protected List<HoodieKeyLookupResult> computeNext() {
+
       List<HoodieKeyLookupResult> ret = new ArrayList<>();
       try {
         // process one file in each go.
         while (inputItr.hasNext()) {
-          Pair<String, HoodieKey> currentTuple = inputItr.next();
-          String fileId = currentTuple.getLeft();
-          String partitionPath = currentTuple.getRight().getPartitionPath();
-          String recordKey = currentTuple.getRight().getRecordKey();
+          I tuple = inputItr.next();
+
+          HoodieFileGroupId fileGroupId = fileGroupIdExtractor.apply(tuple);
+          String recordKey = recordKeyExtractor.apply(tuple);
+
+          String fileId = fileGroupId.getFileId();
+          String partitionPath = fileGroupId.getPartitionPath();
+
           Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, 
fileId);
 
           // lazily init state
@@ -100,15 +115,13 @@ public class HoodieBaseBloomIndexCheckFunction
         }
       } catch (Throwable e) {
         if (e instanceof HoodieException) {
-          throw e;
+          throw (HoodieException) e;
         }
+
         throw new HoodieIndexException("Error checking bloom filter index. ", 
e);
       }
-      return ret;
-    }
 
-    @Override
-    protected void end() {
+      return ret;
     }
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
index 5f2007ea536..b5604312d3f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieGlobalBloomIndex.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -41,7 +42,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * This filter will only work with hoodie table since it will only load 
partitions
@@ -74,7 +75,7 @@ public class HoodieGlobalBloomIndex extends HoodieBloomIndex {
    */
 
   @Override
-  HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
+  HoodiePairData<HoodieFileGroupId, String> explodeRecordsWithFileComparisons(
       final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
       HoodiePairData<String, String> partitionRecordKeyPairs) {
 
@@ -87,10 +88,11 @@ public class HoodieGlobalBloomIndex extends 
HoodieBloomIndex {
       String partitionPath = partitionRecordKeyPair.getLeft();
 
       return indexFileFilter.getMatchingFilesAndPartition(partitionPath, 
recordKey).stream()
-          .map(partitionFileIdPair -> (Pair<String, HoodieKey>) new 
ImmutablePair<>(partitionFileIdPair.getRight(),
-              new HoodieKey(recordKey, partitionFileIdPair.getLeft())))
-          .collect(Collectors.toList());
-    }).flatMap(List::iterator);
+          .map(partitionFileIdPair ->
+              new ImmutablePair<>(
+                  new HoodieFileGroupId(partitionFileIdPair.getLeft(), 
partitionFileIdPair.getRight()), recordKey));
+    })
+        .flatMapToPair(Stream::iterator);
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
index cffee5ee740..b47f5cf066c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/ListBasedHoodieBloomIndexHelper.java
@@ -19,20 +19,20 @@
 
 package org.apache.hudi.index.bloom;
 
-import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.collection.ImmutablePair;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.io.HoodieKeyLookupResult;
 import org.apache.hudi.table.HoodieTable;
 
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -56,21 +56,21 @@ public class ListBasedHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper
   public HoodiePairData<HoodieKey, HoodieRecordLocation> 
findMatchingFilesForRecordKeys(
       HoodieWriteConfig config, HoodieEngineContext context, HoodieTable 
hoodieTable,
       HoodiePairData<String, String> partitionRecordKeyPairs,
-      HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
+      HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
       Map<String, List<BloomIndexFileInfo>> partitionToFileInfo, Map<String, 
Long> recordsPerPartition) {
-    List<Pair<String, HoodieKey>> fileComparisonPairList =
+    List<Pair<HoodieFileGroupId, String>> fileComparisonPairList =
         fileComparisonPairs.collectAsList().stream()
             .sorted(Comparator.comparing(Pair::getLeft)).collect(toList());
 
-    List<HoodieKeyLookupResult> keyLookupResults = new ArrayList<>();
-    Iterator<List<HoodieKeyLookupResult>> iterator = new 
HoodieBaseBloomIndexCheckFunction(
-        hoodieTable, config).apply(fileComparisonPairList.iterator());
-    while (iterator.hasNext()) {
-      keyLookupResults.addAll(iterator.next());
-    }
+    List<HoodieKeyLookupResult> keyLookupResults =
+        CollectionUtils.toStream(
+          new HoodieBloomIndexCheckFunction<Pair<HoodieFileGroupId, 
String>>(hoodieTable, config, Pair::getLeft, Pair::getRight)
+              .apply(fileComparisonPairList.iterator())
+        )
+            .flatMap(Collection::stream)
+            .filter(lr -> lr.getMatchingRecordKeys().size() > 0)
+            .collect(toList());
 
-    keyLookupResults = keyLookupResults.stream().filter(
-        lr -> lr.getMatchingRecordKeys().size() > 0).collect(toList());
     return context.parallelize(keyLookupResults).flatMap(lookupResult ->
         lookupResult.getMatchingRecordKeys().stream()
             .map(recordKey -> new ImmutablePair<>(lookupResult, 
recordKey)).iterator()
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
index 6dd5a1c27e2..d4b4007bedb 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.data.HoodieListPairData;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -186,11 +187,14 @@ public class TestFlinkHoodieBloomIndex extends 
HoodieFlinkClientTestHarness {
           partitionRecordKeyMap.put(t.getLeft(), recordKeyList);
         });
 
-    List<Pair<String, HoodieKey>> comparisonKeyList = 
index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, 
HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList();
+    List<Pair<HoodieFileGroupId, String>> comparisonKeyList =
+        index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, 
HoodieListPairData.lazy(partitionRecordKeyMap)).collectAsList();
 
     assertEquals(10, comparisonKeyList.size());
     java.util.Map<String, List<String>> recordKeyToFileComps = 
comparisonKeyList.stream()
-        .collect(java.util.stream.Collectors.groupingBy(t -> 
t.getRight().getRecordKey(), java.util.stream.Collectors.mapping(t -> 
t.getLeft(), java.util.stream.Collectors.toList())));
+        .collect(
+            java.util.stream.Collectors.groupingBy(t -> t.getRight(),
+              java.util.stream.Collectors.mapping(t -> 
t.getLeft().getFileId(), java.util.stream.Collectors.toList())));
 
     assertEquals(4, recordKeyToFileComps.size());
     assertEquals(new java.util.HashSet<>(asList("f1", "f3", "f4")), new 
java.util.HashSet<>(recordKeyToFileComps.get("002")));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
index 9ec3c4cf715..9019fb43ff0 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaPairRDD.java
@@ -121,6 +121,11 @@ public class HoodieJavaPairRDD<K, V> implements 
HoodiePairData<K, V> {
         tuple -> func.apply(new ImmutablePair<>(tuple._1, tuple._2))));
   }
 
+  @Override
+  public <W> HoodiePairData<K, W> mapValues(SerializableFunction<V, W> func) {
+    return HoodieJavaPairRDD.of(pairRDDData.mapValues(func::apply));
+  }
+
   @Override
   public <L, W> HoodiePairData<L, W> 
mapToPair(SerializablePairFunction<Pair<K, V>, L, W> mapToPairFunc) {
     return HoodieJavaPairRDD.of(pairRDDData.mapToPair(pair -> {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index ed9613bc15f..6ed3a854962 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -24,17 +24,16 @@ import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
+import org.apache.hudi.common.util.collection.MappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
-
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.storage.StorageLevel;
+import scala.Tuple2;
 
 import java.util.Iterator;
 import java.util.List;
 
-import scala.Tuple2;
-
 /**
  * Holds a {@link JavaRDD} of objects.
  *
@@ -119,9 +118,18 @@ public class HoodieJavaRDD<T> implements HoodieData<T> {
 
   @Override
   public <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func) {
+    // NOTE: Unrolling this lambda into a method reference results in 
[[ClassCastException]]
+    //       due to weird interop b/w Scala and Java
     return HoodieJavaRDD.of(rddData.flatMap(e -> func.apply(e)));
   }
 
+  @Override
+  public <K, V> HoodiePairData<K, V> flatMapToPair(SerializableFunction<T, 
Iterator<? extends Pair<K, V>>> func) {
+    return HoodieJavaPairRDD.of(
+        rddData.flatMapToPair(e ->
+            new MappingIterator<>(func.apply(e), p -> new Tuple2<>(p.getKey(), 
p.getValue()))));
+  }
+
   @Override
   public <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K, 
V> func) {
     return HoodieJavaPairRDD.of(rddData.mapToPair(input -> {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java
index 36710dc02bb..48099220dbf 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/BucketizedBloomCheckPartitioner.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.index.bloom;
 
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.util.NumericUtils;
 import org.apache.hudi.common.util.collection.Pair;
 
@@ -63,7 +64,7 @@ public class BucketizedBloomCheckPartitioner extends 
Partitioner {
   /**
    * Stores the final mapping of a file group to a list of partitions for its 
keys.
    */
-  private Map<String, List<Integer>> fileGroupToPartitions;
+  private Map<HoodieFileGroupId, List<Integer>> fileGroupToPartitions;
 
   /**
    * Create a partitioner that computes a plan based on provided workload 
characteristics.
@@ -72,11 +73,11 @@ public class BucketizedBloomCheckPartitioner extends 
Partitioner {
    * @param fileGroupToComparisons number of expected comparisons per file 
group
    * @param keysPerBucket maximum number of keys to pack in a single bucket
    */
-  public BucketizedBloomCheckPartitioner(int targetPartitions, Map<String, 
Long> fileGroupToComparisons,
+  public BucketizedBloomCheckPartitioner(int targetPartitions, 
Map<HoodieFileGroupId, Long> fileGroupToComparisons,
       int keysPerBucket) {
     this.fileGroupToPartitions = new HashMap<>();
 
-    Map<String, Integer> bucketsPerFileGroup = new HashMap<>();
+    Map<HoodieFileGroupId, Integer> bucketsPerFileGroup = new HashMap<>();
     // Compute the buckets needed per file group, using simple uniform 
distribution
     fileGroupToComparisons.forEach((f, c) -> bucketsPerFileGroup.put(f, (int) 
Math.ceil((c * 1.0) / keysPerBucket)));
     int totalBuckets = bucketsPerFileGroup.values().stream().mapToInt(i -> 
i).sum();
@@ -90,9 +91,9 @@ public class BucketizedBloomCheckPartitioner extends 
Partitioner {
     int minBucketsPerPartition = Math.max((int) Math.floor((1.0 * 
totalBuckets) / partitions), 1);
     LOG.info(String.format("TotalBuckets %d, min_buckets/partition %d", 
totalBuckets, minBucketsPerPartition));
     int[] bucketsFilled = new int[partitions];
-    Map<String, AtomicInteger> bucketsFilledPerFileGroup = new HashMap<>();
+    Map<HoodieFileGroupId, AtomicInteger> bucketsFilledPerFileGroup = new 
HashMap<>();
     int partitionIndex = 0;
-    for (Map.Entry<String, Integer> e : bucketsPerFileGroup.entrySet()) {
+    for (Map.Entry<HoodieFileGroupId, Integer> e : 
bucketsPerFileGroup.entrySet()) {
       for (int b = 0; b < Math.max(1, e.getValue() - 1); b++) {
         // keep filled counts upto date
         bucketsFilled[partitionIndex]++;
@@ -115,7 +116,7 @@ public class BucketizedBloomCheckPartitioner extends 
Partitioner {
     // PHASE 2 : for remaining unassigned buckets, round robin over partitions 
once. Since we withheld 1 bucket from
     // each file group uniformly, this remaining is also an uniform mix across 
file groups. We just round robin to
     // optimize for goal 2.
-    for (Map.Entry<String, Integer> e : bucketsPerFileGroup.entrySet()) {
+    for (Map.Entry<HoodieFileGroupId, Integer> e : 
bucketsPerFileGroup.entrySet()) {
       int remaining = e.getValue() - 
bucketsFilledPerFileGroup.get(e.getKey()).intValue();
       for (int r = 0; r < remaining; r++) {
         // mark this partition against the file group
@@ -142,7 +143,8 @@ public class BucketizedBloomCheckPartitioner extends 
Partitioner {
 
   @Override
   public int getPartition(Object key) {
-    final Pair<String, String> parts = (Pair<String, String>) key;
+    final Pair<HoodieFileGroupId, String> parts = (Pair<HoodieFileGroupId, 
String>) key;
+    // TODO replace w/ more performant hash
     final long hashOfKey = NumericUtils.getMessageDigestHash("MD5", 
parts.getRight());
     final List<Integer> candidatePartitions = 
fileGroupToPartitions.get(parts.getLeft());
     final int idx = (int) Math.floorMod((int) hashOfKey, 
candidatePartitions.size());
@@ -150,7 +152,7 @@ public class BucketizedBloomCheckPartitioner extends 
Partitioner {
     return candidatePartitions.get(idx);
   }
 
-  Map<String, List<Integer>> getFileGroupToPartitions() {
+  Map<HoodieFileGroupId, List<Integer>> getFileGroupToPartitions() {
     return fileGroupToPartitions;
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java
new file mode 100644
index 00000000000..c124f8b27b8
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomFilterProbingResult.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import java.util.List;
+
+class HoodieBloomFilterProbingResult {
+
+  private final List<String> candidateKeys;
+
+  HoodieBloomFilterProbingResult(List<String> candidateKeys) {
+    this.candidateKeys = candidateKeys;
+  }
+
+  public List<String> getCandidateKeys() {
+    return candidateKeys;
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
deleted file mode 100644
index e19a429ea72..00000000000
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndexCheckFunction.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.index.bloom;
-
-import org.apache.hudi.client.utils.LazyIterableIterator;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIndexException;
-import org.apache.hudi.io.HoodieKeyLookupHandle;
-import org.apache.hudi.io.HoodieKeyLookupResult;
-import org.apache.hudi.table.HoodieTable;
-
-import org.apache.spark.api.java.function.Function2;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-import scala.Tuple2;
-
-/**
- * Function performing actual checking of RDD partition containing (fileId, 
hoodieKeys) against the actual files.
- */
-public class HoodieBloomIndexCheckFunction
-    implements Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {
-
-  private final HoodieTable hoodieTable;
-
-  private final HoodieWriteConfig config;
-
-  public HoodieBloomIndexCheckFunction(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
-    this.hoodieTable = hoodieTable;
-    this.config = config;
-  }
-
-  @Override
-  public Iterator<List<HoodieKeyLookupResult>> call(Integer partition,
-                                                    Iterator<Tuple2<String, 
HoodieKey>> filePartitionRecordKeyTripletItr) {
-    return new LazyKeyCheckIterator(filePartitionRecordKeyTripletItr);
-  }
-
-  class LazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String, 
HoodieKey>, List<HoodieKeyLookupResult>> {
-
-    private HoodieKeyLookupHandle keyLookupHandle;
-
-    LazyKeyCheckIterator(Iterator<Tuple2<String, HoodieKey>> 
filePartitionRecordKeyTripletItr) {
-      super(filePartitionRecordKeyTripletItr);
-    }
-
-    @Override
-    protected void start() {
-    }
-
-    @Override
-    protected List<HoodieKeyLookupResult> computeNext() {
-
-      List<HoodieKeyLookupResult> ret = new ArrayList<>();
-      try {
-        // process one file in each go.
-        while (inputItr.hasNext()) {
-          Tuple2<String, HoodieKey> currentTuple = inputItr.next();
-          String fileId = currentTuple._1;
-          String partitionPath = currentTuple._2.getPartitionPath();
-          String recordKey = currentTuple._2.getRecordKey();
-          Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, 
fileId);
-
-          // lazily init state
-          if (keyLookupHandle == null) {
-            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, 
partitionPathFilePair);
-          }
-
-          // if continue on current file
-          if 
(keyLookupHandle.getPartitionPathFileIDPair().equals(partitionPathFilePair)) {
-            keyLookupHandle.addKey(recordKey);
-          } else {
-            // do the actual checking of file & break out
-            ret.add(keyLookupHandle.getLookupResult());
-            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, 
partitionPathFilePair);
-            keyLookupHandle.addKey(recordKey);
-            break;
-          }
-        }
-
-        // handle case, where we ran out of input, close pending work, update 
return val
-        if (!inputItr.hasNext()) {
-          ret.add(keyLookupHandle.getLookupResult());
-        }
-      } catch (Throwable e) {
-        if (e instanceof HoodieException) {
-          throw e;
-        }
-        throw new HoodieIndexException("Error checking bloom filter index. ", 
e);
-      }
-
-      return ret;
-    }
-
-    @Override
-    protected void end() {
-    }
-  }
-}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
new file mode 100644
index 00000000000..0809042c9fb
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.index.HoodieIndexUtils;
+import org.apache.hudi.io.HoodieKeyLookupResult;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.broadcast.Broadcast;
+import scala.Tuple2;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of the function probing filtered in candidate keys provided 
in
+ * {@link HoodieBloomFilterProbingResult} w/in corresponding files identified 
by {@link HoodieFileGroupId}
+ * to validate whether the record w/ the provided key is indeed persisted in it
+ */
+public class HoodieFileProbingFunction implements
+    FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, 
HoodieBloomFilterProbingResult>>, List<HoodieKeyLookupResult>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieFileProbingFunction.class);
+
+  // Assuming each file bloom filter takes up 512K, sizing the max file count
+  // per batch so that the total fetched bloom filters would not cross 128 MB.
+  private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256;
+
+  private final Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast;
+  private final SerializableConfiguration hadoopConf;
+
+  public HoodieFileProbingFunction(Broadcast<HoodieTableFileSystemView> 
baseFileOnlyViewBroadcast,
+                                   SerializableConfiguration hadoopConf) {
+    this.baseFileOnlyViewBroadcast = baseFileOnlyViewBroadcast;
+    this.hadoopConf = hadoopConf;
+  }
+
+  @Override
+  public Iterator<List<HoodieKeyLookupResult>> 
call(Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> 
tuple2Iterator) throws Exception {
+    return new BloomIndexLazyKeyCheckIterator(tuple2Iterator);
+  }
+
+  private class BloomIndexLazyKeyCheckIterator
+      extends LazyIterableIterator<Tuple2<HoodieFileGroupId, 
HoodieBloomFilterProbingResult>, List<HoodieKeyLookupResult>> {
+
+    public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<HoodieFileGroupId, 
HoodieBloomFilterProbingResult>> tuple2Iterator) {
+      super(tuple2Iterator);
+    }
+
+    @Override
+    protected List<HoodieKeyLookupResult> computeNext() {
+      // Partition path and file name pair to list of keys
+      final Map<Pair<String, String>, HoodieBloomFilterProbingResult> 
fileToLookupResults = new HashMap<>();
+      final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+
+      while (inputItr.hasNext()) {
+        Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult> entry = 
inputItr.next();
+        final String partitionPath = entry._1.getPartitionPath();
+        final String fileId = entry._1.getFileId();
+
+        if (!fileIDBaseFileMap.containsKey(fileId)) {
+          Option<HoodieBaseFile> baseFile =
+              
baseFileOnlyViewBroadcast.getValue().getLatestBaseFile(partitionPath, fileId);
+          if (!baseFile.isPresent()) {
+            throw new HoodieIndexException("Failed to find the base file for 
partition: " + partitionPath
+                + ", fileId: " + fileId);
+          }
+
+          fileIDBaseFileMap.put(fileId, baseFile.get());
+        }
+
+        fileToLookupResults.putIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId).getFileName()), entry._2);
+
+        if (fileToLookupResults.size() > 
BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
+          break;
+        }
+      }
+
+      if (fileToLookupResults.isEmpty()) {
+        return Collections.emptyList();
+      }
+
+      return fileToLookupResults.entrySet().stream()
+          .map(entry -> {
+            Pair<String, String> partitionPathFileNamePair = entry.getKey();
+            HoodieBloomFilterProbingResult bloomFilterKeyLookupResult = 
entry.getValue();
+
+            final String partitionPath = partitionPathFileNamePair.getLeft();
+            final String fileName = partitionPathFileNamePair.getRight();
+            final String fileId = FSUtils.getFileId(fileName);
+            ValidationUtils.checkState(!fileId.isEmpty());
+
+            List<String> candidateRecordKeys = 
bloomFilterKeyLookupResult.getCandidateKeys();
+
+            // TODO add assertion that file is checked only once
+
+            final HoodieBaseFile dataFile = fileIDBaseFileMap.get(fileId);
+            List<String> matchingKeys = 
HoodieIndexUtils.filterKeysFromFile(new Path(dataFile.getPath()),
+                candidateRecordKeys, hadoopConf.get());
+
+            LOG.debug(
+                String.format("Bloom filter candidates (%d) / false positives 
(%d), actual matches (%d)",
+                    candidateRecordKeys.size(), candidateRecordKeys.size() - 
matchingKeys.size(), matchingKeys.size()));
+
+            return new HoodieKeyLookupResult(fileId, partitionPath, 
dataFile.getCommitTime(), matchingKeys);
+          })
+          .collect(Collectors.toList());
+    }
+
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
new file mode 100644
index 00000000000..406be816500
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.index.bloom;
+
+import org.apache.hudi.client.utils.LazyIterableIterator;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.FlatteningIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIndexException;
+import org.apache.hudi.metadata.HoodieBackedTableMetadata;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.function.PairFlatMapFunction;
+import org.apache.spark.broadcast.Broadcast;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of the function that probing Bloom Filters of individual 
files verifying
+ * whether particular record key could be stored in the latest file-slice of 
the file-group
+ * identified by the {@link HoodieFileGroupId}
+ */
+public class HoodieMetadataBloomFilterProbingFunction implements
+    PairFlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, String>>, 
HoodieFileGroupId, HoodieBloomFilterProbingResult> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetadataBloomFilterProbingFunction.class);
+
+  // Assuming each file bloom filter takes up 512K, sizing the max file count
+  // per batch so that the total fetched bloom filters would not cross 128 MB.
+  private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256;
+  private final HoodieTable hoodieTable;
+
+  private final Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast;
+
+  /**
+   * NOTE: It's critical for this ctor to accept {@link HoodieTable} to make 
sure that it uses
+   *       broadcast-ed instance of {@link HoodieBackedTableMetadata} 
internally, instead of
+   *       one being serialized and deserialized for _every_ task individually
+   *
+   * NOTE: We pass in broadcasted {@link HoodieTableFileSystemView} to make 
sure it's materialized
+   *       on executor once
+   */
+  public 
HoodieMetadataBloomFilterProbingFunction(Broadcast<HoodieTableFileSystemView> 
baseFileOnlyViewBroadcast,
+                                                  HoodieTable hoodieTable) {
+    this.baseFileOnlyViewBroadcast = baseFileOnlyViewBroadcast;
+    this.hoodieTable = hoodieTable;
+  }
+
+  @Override
+  public Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> 
call(Iterator<Tuple2<HoodieFileGroupId, String>> tuple2Iterator) throws 
Exception {
+    return new FlatteningIterator<>(new 
BloomIndexLazyKeyCheckIterator(tuple2Iterator));
+  }
+
+  private class BloomIndexLazyKeyCheckIterator
+      extends LazyIterableIterator<Tuple2<HoodieFileGroupId, String>, 
Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>>> {
+
+    public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<HoodieFileGroupId, 
String>> tuple2Iterator) {
+      super(tuple2Iterator);
+    }
+
+    @Override
+    protected Iterator<Tuple2<HoodieFileGroupId, 
HoodieBloomFilterProbingResult>> computeNext() {
+      // Partition path and file name pair to list of keys
+      final Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new 
HashMap<>();
+      final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
+
+      while (inputItr.hasNext()) {
+        Tuple2<HoodieFileGroupId, String> entry = inputItr.next();
+        String partitionPath = entry._1.getPartitionPath();
+        String fileId = entry._1.getFileId();
+
+        if (!fileIDBaseFileMap.containsKey(fileId)) {
+          Option<HoodieBaseFile> baseFile = 
baseFileOnlyViewBroadcast.getValue().getLatestBaseFile(partitionPath, fileId);
+          if (!baseFile.isPresent()) {
+            throw new HoodieIndexException("Failed to find the base file for 
partition: " + partitionPath
+                + ", fileId: " + fileId);
+          }
+          fileIDBaseFileMap.put(fileId, baseFile.get());
+        }
+
+        fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId).getFileName()),
+            k -> new ArrayList<>()).add(new HoodieKey(entry._2, 
partitionPath));
+
+        if (fileToKeysMap.size() > 
BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
+          break;
+        }
+      }
+
+      if (fileToKeysMap.isEmpty()) {
+        return Collections.emptyIterator();
+      }
+
+      List<Pair<String, String>> partitionNameFileNameList = new 
ArrayList<>(fileToKeysMap.keySet());
+      Map<Pair<String, String>, BloomFilter> fileToBloomFilterMap =
+          
hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);
+
+      return fileToKeysMap.entrySet().stream()
+          .map(entry -> {
+            Pair<String, String> partitionPathFileNamePair = entry.getKey();
+            List<HoodieKey> hoodieKeyList = entry.getValue();
+
+            final String partitionPath = partitionPathFileNamePair.getLeft();
+            final String fileName = partitionPathFileNamePair.getRight();
+            final String fileId = FSUtils.getFileId(fileName);
+            ValidationUtils.checkState(!fileId.isEmpty());
+
+            if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) {
+              throw new HoodieIndexException("Failed to get the bloom filter 
for " + partitionPathFileNamePair);
+            }
+            final BloomFilter fileBloomFilter = 
fileToBloomFilterMap.get(partitionPathFileNamePair);
+
+            List<String> candidateRecordKeys = new ArrayList<>();
+            hoodieKeyList.forEach(hoodieKey -> {
+              if (fileBloomFilter.mightContain(hoodieKey.getRecordKey())) {
+                candidateRecordKeys.add(hoodieKey.getRecordKey());
+              }
+            });
+
+            LOG.debug(String.format("Total records (%d), bloom filter 
candidates (%d)",
+                hoodieKeyList.size(), candidateRecordKeys.size()));
+
+            return Tuple2.apply(new HoodieFileGroupId(partitionPath, fileId), 
new HoodieBloomFilterProbingResult(candidateRecordKeys));
+          })
+          .iterator();
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java
deleted file mode 100644
index 8a2958eab9d..00000000000
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomIndexCheckFunction.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.index.bloom;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hudi.client.utils.LazyIterableIterator;
-import org.apache.hudi.common.bloom.BloomFilter;
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.exception.HoodieIndexException;
-import org.apache.hudi.index.HoodieIndexUtils;
-import org.apache.hudi.io.HoodieKeyLookupResult;
-import org.apache.hudi.table.HoodieTable;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.api.java.function.Function2;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Spark Function2 implementation for checking bloom filters for the
- * requested keys from the metadata table index. The bloom filter
- * checking for keys and the actual file verification for the
- * candidate keys is done in an iterative fashion. In each iteration,
- * bloom filters are requested for a batch of partition files and the
- * keys are checked against them.
- */
-public class HoodieMetadataBloomIndexCheckFunction implements
-    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<HoodieKeyLookupResult>>> {
-
-  private static final Logger LOG = 
LogManager.getLogger(HoodieMetadataBloomIndexCheckFunction.class);
-
-  // Assuming each file bloom filter takes up 512K, sizing the max file count
-  // per batch so that the total fetched bloom filters would not cross 128 MB.
-  private static final long BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH = 256;
-  private final HoodieTable hoodieTable;
-
-  public HoodieMetadataBloomIndexCheckFunction(HoodieTable hoodieTable) {
-    this.hoodieTable = hoodieTable;
-  }
-
-  @Override
-  public Iterator<List<HoodieKeyLookupResult>> call(Integer integer, 
Iterator<Tuple2<String, HoodieKey>> tuple2Iterator) throws Exception {
-    return new BloomIndexLazyKeyCheckIterator(tuple2Iterator);
-  }
-
-  private class BloomIndexLazyKeyCheckIterator extends 
LazyIterableIterator<Tuple2<String, HoodieKey>, List<HoodieKeyLookupResult>> {
-    public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<String, HoodieKey>> 
tuple2Iterator) {
-      super(tuple2Iterator);
-    }
-
-    @Override
-    protected void start() {
-    }
-
-    @Override
-    protected List<HoodieKeyLookupResult> computeNext() {
-      // Partition path and file name pair to list of keys
-      final Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new 
HashMap<>();
-      final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();
-      final List<HoodieKeyLookupResult> resultList = new ArrayList<>();
-
-      while (inputItr.hasNext()) {
-        Tuple2<String, HoodieKey> entry = inputItr.next();
-        final String partitionPath = entry._2.getPartitionPath();
-        final String fileId = entry._1;
-        if (!fileIDBaseFileMap.containsKey(fileId)) {
-          Option<HoodieBaseFile> baseFile = 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, fileId);
-          if (!baseFile.isPresent()) {
-            throw new HoodieIndexException("Failed to find the base file for 
partition: " + partitionPath
-                + ", fileId: " + fileId);
-          }
-          fileIDBaseFileMap.put(fileId, baseFile.get());
-        }
-        fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, 
fileIDBaseFileMap.get(fileId).getFileName()),
-            k -> new ArrayList<>()).add(entry._2);
-        if (fileToKeysMap.size() > 
BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
-          break;
-        }
-      }
-      if (fileToKeysMap.isEmpty()) {
-        return Collections.emptyList();
-      }
-
-      List<Pair<String, String>> partitionNameFileNameList = new 
ArrayList<>(fileToKeysMap.keySet());
-      Map<Pair<String, String>, BloomFilter> fileToBloomFilterMap =
-          
hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);
-
-      final AtomicInteger totalKeys = new AtomicInteger(0);
-      fileToKeysMap.forEach((partitionPathFileNamePair, hoodieKeyList) -> {
-        final String partitionPath = partitionPathFileNamePair.getLeft();
-        final String fileName = partitionPathFileNamePair.getRight();
-        final String fileId = FSUtils.getFileId(fileName);
-        ValidationUtils.checkState(!fileId.isEmpty());
-
-        if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) {
-          throw new HoodieIndexException("Failed to get the bloom filter for " 
+ partitionPathFileNamePair);
-        }
-        final BloomFilter fileBloomFilter = 
fileToBloomFilterMap.get(partitionPathFileNamePair);
-
-        List<String> candidateRecordKeys = new ArrayList<>();
-        hoodieKeyList.forEach(hoodieKey -> {
-          totalKeys.incrementAndGet();
-          if (fileBloomFilter.mightContain(hoodieKey.getRecordKey())) {
-            candidateRecordKeys.add(hoodieKey.getRecordKey());
-          }
-        });
-
-        final HoodieBaseFile dataFile = fileIDBaseFileMap.get(fileId);
-        List<String> matchingKeys =
-            HoodieIndexUtils.filterKeysFromFile(new Path(dataFile.getPath()), 
candidateRecordKeys,
-                hoodieTable.getHadoopConf());
-        LOG.debug(
-            String.format("Total records (%d), bloom filter candidates 
(%d)/fp(%d), actual matches (%d)",
-                hoodieKeyList.size(), candidateRecordKeys.size(),
-                candidateRecordKeys.size() - matchingKeys.size(), 
matchingKeys.size()));
-
-        resultList.add(new HoodieKeyLookupResult(fileId, partitionPath, 
dataFile.getCommitTime(), matchingKeys));
-      });
-      return resultList;
-    }
-
-    @Override
-    protected void end() {
-    }
-  }
-}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
index 5736024dc24..265b0507768 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/SparkHoodieBloomIndexHelper.java
@@ -19,30 +19,47 @@
 
 package org.apache.hudi.index.bloom;
 
-import org.apache.hudi.common.data.HoodieData;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodiePairData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.common.util.hash.FileIndexID;
+import org.apache.hudi.common.util.hash.PartitionIndexID;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaPairRDD;
 import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.HoodieKeyLookupResult;
 import org.apache.hudi.table.HoodieTable;
-
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.broadcast.Broadcast;
+import scala.Tuple2;
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import scala.Tuple2;
-
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.getBloomFilterIndexKey;
+import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex;
 import static org.apache.hudi.metadata.MetadataPartitionType.BLOOM_FILTERS;
 
 /**
@@ -55,8 +72,7 @@ public class SparkHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper {
   private static final SparkHoodieBloomIndexHelper SINGLETON_INSTANCE =
       new SparkHoodieBloomIndexHelper();
 
-  private SparkHoodieBloomIndexHelper() {
-  }
+  private SparkHoodieBloomIndexHelper() {}
 
   public static SparkHoodieBloomIndexHelper getInstance() {
     return SINGLETON_INSTANCE;
@@ -66,42 +82,92 @@ public class SparkHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper {
   public HoodiePairData<HoodieKey, HoodieRecordLocation> 
findMatchingFilesForRecordKeys(
       HoodieWriteConfig config, HoodieEngineContext context, HoodieTable 
hoodieTable,
       HoodiePairData<String, String> partitionRecordKeyPairs,
-      HoodieData<Pair<String, HoodieKey>> fileComparisonPairs,
+      HoodiePairData<HoodieFileGroupId, String> fileComparisonPairs,
       Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
       Map<String, Long> recordsPerPartition) {
-    JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
-        HoodieJavaRDD.getJavaRDD(fileComparisonPairs)
-            .map(pair -> new Tuple2<>(pair.getLeft(), pair.getRight()));
 
-    int inputParallelism = 
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).partitions().size();
-    int joinParallelism = Math.max(inputParallelism, 
config.getBloomIndexParallelism());
-    LOG.info("InputParallelism: ${" + inputParallelism + "}, IndexParallelism: 
${"
-        + config.getBloomIndexParallelism() + "}");
+    int inputParallelism = 
HoodieJavaPairRDD.getJavaPairRDD(partitionRecordKeyPairs).getNumPartitions();
+    int configuredBloomIndexParallelism = config.getBloomIndexParallelism();
 
+    // NOTE: Target parallelism could be overridden by the config
+    int targetParallelism =
+        configuredBloomIndexParallelism > 0 ? configuredBloomIndexParallelism 
: inputParallelism;
+
+    LOG.info(String.format("Input parallelism: %d, Index parallelism: %d", 
inputParallelism, targetParallelism));
+
+    JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD = 
HoodieJavaRDD.getJavaRDD(fileComparisonPairs);
     JavaRDD<List<HoodieKeyLookupResult>> keyLookupResultRDD;
+
     if (config.getBloomIndexUseMetadata()
         && hoodieTable.getMetaClient().getTableConfig().getMetadataPartitions()
         .contains(BLOOM_FILTERS.getPartitionPath())) {
-      // Step 1: Sort by file id
-      JavaRDD<Tuple2<String, HoodieKey>> sortedFileIdAndKeyPairs =
-          fileComparisonsRDD.sortBy(Tuple2::_1, true, joinParallelism);
+      SerializableConfiguration hadoopConf = new 
SerializableConfiguration(hoodieTable.getHadoopConf());
+
+      HoodieTableFileSystemView baseFileOnlyView =
+          getBaseFileOnlyView(hoodieTable, partitionToFileInfo.keySet());
+
+      Broadcast<HoodieTableFileSystemView> baseFileOnlyViewBroadcast =
+          ((HoodieSparkEngineContext) 
context).getJavaSparkContext().broadcast(baseFileOnlyView);
+
+      // When leveraging MT we're aiming for following goals:
+      //    - (G1) All requests to MT are made in batch (ie we're trying to 
fetch all the values
+      //      for corresponding keys at once)
+      //    - (G2) Each task reads no more than just _one_ file-group from the 
MT Bloom Filters
+      //    partition
+      //
+      // Ta achieve G2, following invariant have to be maintained: Spark 
partitions have to be
+      // affine w/ Metadata Table's file-groups, meaning that each Spark 
partition holds records
+      // belonging to one and only file-group in MT Bloom Filters partition. 
To provide for that
+      // we need to make sure
+      //    - Spark's used [[Partitioner]] employs same hashing function as 
Metadata Table (as well
+      //      as being applied to the same keys as the MT one)
+      //    - Make sure that # of partitions is congruent to the # of 
file-groups (ie number of Spark
+      //    partitions is a multiple of the # of the file-groups).
+      //
+      //    Last provision is necessary, so that for every key it's the case 
that
+      //
+      //        (hash(key) % N) % M = hash(key) % M, iff N % M = 0
+      //
+      //    Let's take an example of N = 8 and M = 4 (default # of file-groups 
in Bloom Filter
+      //    partition). In that case Spark partitions for which `hash(key) % 
N` will be either 0
+      //    or 4, will map to the same (first) file-group in MT
+      int bloomFilterPartitionFileGroupCount =
+          config.getMetadataConfig().getBloomFilterIndexFileGroupCount();
+      int adjustedTargetParallelism =
+          targetParallelism % bloomFilterPartitionFileGroupCount == 0
+              ? targetParallelism
+              // NOTE: We add 1 to make sure parallelism a) value always stays 
positive and b)
+              //       {@code targetParallelism <= adjustedTargetParallelism}
+              : (targetParallelism / bloomFilterPartitionFileGroupCount + 1) * 
bloomFilterPartitionFileGroupCount;
+
+      AffineBloomIndexFileGroupPartitioner partitioner =
+          new AffineBloomIndexFileGroupPartitioner(baseFileOnlyViewBroadcast, 
adjustedTargetParallelism);
+
+      // First, we need to repartition and sort records using 
[[AffineBloomIndexFileGroupPartitioner]]
+      // to make sure every Spark task accesses no more than just a single 
file-group in MT (allows
+      // us to achieve G2).
+      //
+      // NOTE: Sorting records w/in individual partitions is required to make 
sure that we cluster
+      //       together keys co-located w/in the MT files (sorted by keys)
+      keyLookupResultRDD = 
fileComparisonsRDD.repartitionAndSortWithinPartitions(partitioner)
+          .mapPartitionsToPair(new 
HoodieMetadataBloomFilterProbingFunction(baseFileOnlyViewBroadcast, 
hoodieTable))
+          // Second, we use [[HoodieFileProbingFunction]] to open actual file 
and check whether it
+          // contains the records with candidate keys that were filtered in by 
the Bloom Filter
+          .mapPartitions(new 
HoodieFileProbingFunction(baseFileOnlyViewBroadcast, hadoopConf), true);
 
-      // Step 2: Use bloom filter to filter and the actual log file to get the 
record location
-      keyLookupResultRDD = sortedFileIdAndKeyPairs.mapPartitionsWithIndex(
-          new HoodieMetadataBloomIndexCheckFunction(hoodieTable), true);
     } else if (config.useBloomIndexBucketizedChecking()) {
-      Map<String, Long> comparisonsPerFileGroup = 
computeComparisonsPerFileGroup(
+      Map<HoodieFileGroupId, Long> comparisonsPerFileGroup = 
computeComparisonsPerFileGroup(
           config, recordsPerPartition, partitionToFileInfo, 
fileComparisonsRDD, context);
-      Partitioner partitioner = new 
BucketizedBloomCheckPartitioner(joinParallelism, comparisonsPerFileGroup,
+      Partitioner partitioner = new 
BucketizedBloomCheckPartitioner(targetParallelism, comparisonsPerFileGroup,
           config.getBloomIndexKeysPerBucket());
 
-      keyLookupResultRDD = fileComparisonsRDD.mapToPair(t -> new 
Tuple2<>(Pair.of(t._1, t._2.getRecordKey()), t))
+      keyLookupResultRDD = fileComparisonsRDD.mapToPair(t -> new 
Tuple2<>(Pair.of(t._1, t._2), t))
           .repartitionAndSortWithinPartitions(partitioner)
           .map(Tuple2::_2)
-          .mapPartitionsWithIndex(new 
HoodieBloomIndexCheckFunction(hoodieTable, config), true);
+          .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, 
config), true);
     } else {
-      keyLookupResultRDD = fileComparisonsRDD.sortBy(Tuple2::_1, true, 
joinParallelism)
-          .mapPartitionsWithIndex(new 
HoodieBloomIndexCheckFunction(hoodieTable, config), true);
+      keyLookupResultRDD = fileComparisonsRDD.sortByKey(true, 
targetParallelism)
+          .mapPartitions(new HoodieSparkBloomIndexCheckFunction(hoodieTable, 
config), true);
     }
 
     return HoodieJavaPairRDD.of(keyLookupResultRDD.flatMap(List::iterator)
@@ -115,27 +181,124 @@ public class SparkHoodieBloomIndexHelper extends 
BaseHoodieBloomIndexHelper {
   /**
    * Compute the estimated number of bloom filter comparisons to be performed 
on each file group.
    */
-  private Map<String, Long> computeComparisonsPerFileGroup(
+  private Map<HoodieFileGroupId, Long> computeComparisonsPerFileGroup(
       final HoodieWriteConfig config,
       final Map<String, Long> recordsPerPartition,
       final Map<String, List<BloomIndexFileInfo>> partitionToFileInfo,
-      final JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD,
+      final JavaPairRDD<HoodieFileGroupId, String> fileComparisonsRDD,
       final HoodieEngineContext context) {
-    Map<String, Long> fileToComparisons;
+    Map<HoodieFileGroupId, Long> fileToComparisons;
     if (config.getBloomIndexPruneByRanges()) {
       // we will just try exploding the input and then count to determine 
comparisons
       // FIX(vc): Only do sampling here and extrapolate?
       context.setJobStatus(this.getClass().getSimpleName(), "Compute all 
comparisons needed between records and files: " + config.getTableName());
-      fileToComparisons = fileComparisonsRDD.mapToPair(t -> t).countByKey();
+      fileToComparisons = fileComparisonsRDD.countByKey();
     } else {
       fileToComparisons = new HashMap<>();
-      partitionToFileInfo.forEach((key, value) -> {
-        for (BloomIndexFileInfo fileInfo : value) {
+      partitionToFileInfo.forEach((partitionPath, fileInfos) -> {
+        for (BloomIndexFileInfo fileInfo : fileInfos) {
           // each file needs to be compared against all the records coming 
into the partition
-          fileToComparisons.put(fileInfo.getFileId(), 
recordsPerPartition.get(key));
+          fileToComparisons.put(
+              new HoodieFileGroupId(partitionPath, fileInfo.getFileId()), 
recordsPerPartition.get(partitionPath));
         }
       });
     }
     return fileToComparisons;
   }
+
+  private static HoodieTableFileSystemView getBaseFileOnlyView(HoodieTable<?, 
?, ?, ?> hoodieTable, Collection<String> partitionPaths) {
+    try {
+      List<String> fullPartitionPaths = partitionPaths.stream()
+          .map(partitionPath ->
+              String.format("%s/%s", 
hoodieTable.getMetaClient().getBasePathV2(), partitionPath))
+          .collect(Collectors.toList());
+
+      FileStatus[] allFiles =
+          
hoodieTable.getMetadataTable().getAllFilesInPartitions(fullPartitionPaths).values().stream()
+              .flatMap(Arrays::stream)
+              .toArray(FileStatus[]::new);
+
+      return new HoodieTableFileSystemView(hoodieTable.getMetaClient(), 
hoodieTable.getActiveTimeline(), allFiles);
+    } catch (IOException e) {
+      LOG.error(String.format("Failed to fetch all files for partitions (%s)", 
partitionPaths));
+      throw new HoodieIOException("Failed to fetch all files for partitions", 
e);
+    }
+  }
+
+  static class AffineBloomIndexFileGroupPartitioner extends Partitioner {
+
+    private final Broadcast<HoodieTableFileSystemView> 
latestBaseFilesBroadcast;
+
+    // TODO(HUDI-5619) remove when addressed
+    private final Map<String, Map<String, String>> cachedLatestBaseFileNames =
+        new HashMap<>(16);
+
+    private final int targetPartitions;
+
+    AffineBloomIndexFileGroupPartitioner(Broadcast<HoodieTableFileSystemView> 
baseFileOnlyViewBroadcast,
+                                         int targetPartitions) {
+      this.targetPartitions = targetPartitions;
+      this.latestBaseFilesBroadcast = baseFileOnlyViewBroadcast;
+    }
+
+    @Override
+    public int numPartitions() {
+      return targetPartitions;
+    }
+
+    @Override
+    public int getPartition(Object key) {
+      HoodieFileGroupId partitionFileGroupId = (HoodieFileGroupId) key;
+      String partitionPath = partitionFileGroupId.getPartitionPath();
+      String fileGroupId = partitionFileGroupId.getFileId();
+
+      /*
+      // TODO(HUDI-5619) uncomment when addressed
+      String baseFileName =
+          latestBaseFilesBroadcast.getValue()
+              .getLatestBaseFile(partitionPath, fileGroupId)
+              .orElseThrow(() -> new HoodieException(
+                  String.format("File from file-group (%s) not found in 
partition path (%s)", fileGroupId, partitionPath)))
+              .getFileName();
+       */
+
+      // NOTE: This is a workaround to alleviate performance impact of needing 
to process whole
+      //       partition for every file-group being looked up.
+      //       See HUDI-5619 for more details
+      String baseFileName = 
cachedLatestBaseFileNames.computeIfAbsent(partitionPath, ignored ->
+              latestBaseFilesBroadcast.getValue()
+                  .getLatestBaseFiles(partitionPath)
+                  .collect(
+                      Collectors.toMap(HoodieBaseFile::getFileId, 
BaseFile::getFileName)
+                  )
+          )
+          .get(fileGroupId);
+
+      if (baseFileName == null) {
+        throw new HoodieException(
+            String.format("File from file-group (%s) not found in partition 
path (%s)", fileGroupId, partitionPath));
+      }
+
+      String bloomIndexEncodedKey =
+          getBloomFilterIndexKey(new PartitionIndexID(partitionPath), new 
FileIndexID(baseFileName));
+
+      // NOTE: It's crucial that [[targetPartitions]] be congruent w/ the 
number of
+      //       actual file-groups in the Bloom Index in MT
+      return mapRecordKeyToFileGroupIndex(bloomIndexEncodedKey, 
targetPartitions);
+    }
+  }
+
+  public static class HoodieSparkBloomIndexCheckFunction extends 
HoodieBloomIndexCheckFunction<Tuple2<HoodieFileGroupId, String>>
+      implements FlatMapFunction<Iterator<Tuple2<HoodieFileGroupId, String>>, 
List<HoodieKeyLookupResult>> {
+
+    public HoodieSparkBloomIndexCheckFunction(HoodieTable hoodieTable,
+                                              HoodieWriteConfig config) {
+      super(hoodieTable, config, t -> t._1, t -> t._2);
+    }
+
+    @Override
+    public Iterator<List<HoodieKeyLookupResult>> 
call(Iterator<Tuple2<HoodieFileGroupId, String>> 
fileGroupIdRecordKeyPairIterator) {
+      return new LazyKeyCheckIterator(fileGroupIdRecordKeyPairIterator);
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
index f4edf7fb942..5853b4eb8a8 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/SparkAdapter.scala
@@ -23,17 +23,17 @@ import org.apache.hadoop.fs.Path
 import org.apache.hudi.client.utils.SparkRowSerDe
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.hudi.common.util.TablePathUtils
+import org.apache.spark.sql._
 import org.apache.spark.sql.avro.{HoodieAvroDeserializer, 
HoodieAvroSchemaConverters, HoodieAvroSerializer}
-import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
Expression, InterpretedPredicate}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan, 
SubqueryAlias}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.sql._
 import org.apache.spark.storage.StorageLevel
 
 import java.util.Locale
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
index e946450c904..fe178839c0d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestBucketizedBloomCheckPartitioner.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.index.bloom;
 
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.util.collection.Pair;
 
 import org.junit.jupiter.api.Test;
@@ -36,33 +37,37 @@ public class TestBucketizedBloomCheckPartitioner {
 
   @Test
   public void testAssignmentCorrectness() {
-    Map<String, Long> fileToComparisons = new HashMap<String, Long>() {
+    HoodieFileGroupId fg1 = new HoodieFileGroupId("p1", "f1");
+    HoodieFileGroupId fg2 = new HoodieFileGroupId("p1", "f2");
+    HoodieFileGroupId fg3 = new HoodieFileGroupId("p1", "f3");
+
+    Map<HoodieFileGroupId, Long> fileToComparisons = new 
HashMap<HoodieFileGroupId, Long>() {
       {
-        put("f1", 40L);
-        put("f2", 35L);
-        put("f3", 20L);
+        put(fg1, 40L);
+        put(fg2, 35L);
+        put(fg3, 20L);
       }
     };
     BucketizedBloomCheckPartitioner p = new BucketizedBloomCheckPartitioner(4, 
fileToComparisons, 10);
-    Map<String, List<Integer>> assignments = p.getFileGroupToPartitions();
-    assertEquals(4, assignments.get("f1").size(), "f1 should have 4 buckets");
-    assertEquals(4, assignments.get("f2").size(), "f2 should have 4 buckets");
-    assertEquals(2, assignments.get("f3").size(), "f3 should have 2 buckets");
-    assertArrayEquals(new Integer[] {0, 0, 1, 3}, 
assignments.get("f1").toArray(), "f1 spread across 3 partitions");
-    assertArrayEquals(new Integer[] {1, 2, 2, 0}, 
assignments.get("f2").toArray(), "f2 spread across 3 partitions");
-    assertArrayEquals(new Integer[] {3, 1}, assignments.get("f3").toArray(), 
"f3 spread across 2 partitions");
+    Map<HoodieFileGroupId, List<Integer>> assignments = 
p.getFileGroupToPartitions();
+    assertEquals(4, assignments.get(fg1).size(), "f1 should have 4 buckets");
+    assertEquals(4, assignments.get(fg2).size(), "f2 should have 4 buckets");
+    assertEquals(2, assignments.get(fg3).size(), "f3 should have 2 buckets");
+    assertArrayEquals(new Integer[] {0, 0, 1, 3}, 
assignments.get(fg1).toArray(), "f1 spread across 3 partitions");
+    assertArrayEquals(new Integer[] {2, 2, 3, 1}, 
assignments.get(fg2).toArray(), "f2 spread across 3 partitions");
+    assertArrayEquals(new Integer[] {1, 0}, assignments.get(fg3).toArray(), 
"f3 spread across 2 partitions");
   }
 
   @Test
   public void testUniformPacking() {
     // evenly distribute 10 buckets/file across 100 partitions
-    Map<String, Long> comparisons1 = new HashMap<String, Long>() {
+    Map<HoodieFileGroupId, Long> comparisons1 = new HashMap<HoodieFileGroupId, 
Long>() {
       {
-        IntStream.range(0, 10).forEach(f -> put("f" + f, 100L));
+        IntStream.range(0, 10).forEach(f -> put(new HoodieFileGroupId("p1", 
"f" + f), 100L));
       }
     };
     BucketizedBloomCheckPartitioner partitioner = new 
BucketizedBloomCheckPartitioner(100, comparisons1, 10);
-    Map<String, List<Integer>> assignments = 
partitioner.getFileGroupToPartitions();
+    Map<HoodieFileGroupId, List<Integer>> assignments = 
partitioner.getFileGroupToPartitions();
     assignments.forEach((key, value) -> assertEquals(10, value.size()));
     Map<Integer, Long> partitionToNumBuckets =
         assignments.entrySet().stream().flatMap(e -> 
e.getValue().stream().map(p -> Pair.of(p, e.getKey())))
@@ -72,9 +77,9 @@ public class TestBucketizedBloomCheckPartitioner {
 
   @Test
   public void testNumPartitions() {
-    Map<String, Long> comparisons1 = new HashMap<String, Long>() {
+    Map<HoodieFileGroupId, Long> comparisons1 = new HashMap<HoodieFileGroupId, 
Long>() {
       {
-        IntStream.range(0, 10).forEach(f -> put("f" + f, 100L));
+        IntStream.range(0, 10).forEach(f -> put(new HoodieFileGroupId("p1", 
"f" + f), 100L));
       }
     };
     BucketizedBloomCheckPartitioner p = new 
BucketizedBloomCheckPartitioner(10000, comparisons1, 10);
@@ -83,15 +88,15 @@ public class TestBucketizedBloomCheckPartitioner {
 
   @Test
   public void testGetPartitions() {
-    Map<String, Long> comparisons1 = new HashMap<String, Long>() {
+    Map<HoodieFileGroupId, Long> comparisons1 = new HashMap<HoodieFileGroupId, 
Long>() {
       {
-        IntStream.range(0, 100000).forEach(f -> put("f" + f, 100L));
+        IntStream.range(0, 100000).forEach(f -> put(new 
HoodieFileGroupId("p1", "f" + f), 100L));
       }
     };
     BucketizedBloomCheckPartitioner p = new 
BucketizedBloomCheckPartitioner(1000, comparisons1, 10);
 
     IntStream.range(0, 100000).forEach(f -> {
-      int partition = p.getPartition(Pair.of("f" + f, "value"));
+      int partition = p.getPartition(Pair.of(new HoodieFileGroupId("p1", "f" + 
f), "value"));
       assertTrue(0 <= partition && partition <= 1000, "partition is out of 
range: " + partition);
     });
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
index 5be4e4ce624..3c906062c16 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieBloomIndex.java
@@ -24,6 +24,7 @@ import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -35,7 +36,6 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaPairRDD;
-import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.HoodieIndexUtils;
 import org.apache.hudi.metadata.SparkHoodieBackedTableMetadataWriter;
@@ -259,12 +259,14 @@ public class TestHoodieBloomIndex extends 
TestHoodieMetadataBase {
         jsc.parallelize(Arrays.asList(new Tuple2<>("2017/10/22", "003"), new 
Tuple2<>("2017/10/22", "002"),
             new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/22", 
"004"))).mapToPair(t -> t);
 
-    List<Pair<String, HoodieKey>> comparisonKeyList = HoodieJavaRDD.getJavaRDD(
-        index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, 
HoodieJavaPairRDD.of(partitionRecordKeyPairRDD))).collect();
+    List<Pair<HoodieFileGroupId, String>> comparisonKeyList =
+        index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, 
HoodieJavaPairRDD.of(partitionRecordKeyPairRDD)).collectAsList();
 
     assertEquals(10, comparisonKeyList.size());
     Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
-        .collect(Collectors.groupingBy(t -> t.getRight().getRecordKey(), 
Collectors.mapping(Pair::getLeft, Collectors.toList())));
+        .collect(
+            Collectors.groupingBy(t -> t.getRight(),
+                Collectors.mapping(t -> t.getLeft().getFileId(), 
Collectors.toList())));
 
     assertEquals(4, recordKeyToFileComps.size());
     assertEquals(new HashSet<>(Arrays.asList("f1", "f3", "f4")), new 
HashSet<>(recordKeyToFileComps.get("002")));
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
index 3ad8952feea..2577f9ba284 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -21,6 +21,7 @@ package org.apache.hudi.index.bloom;
 import org.apache.hudi.client.functional.TestHoodieMetadataBase;
 import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -29,7 +30,6 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaPairRDD;
-import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
@@ -201,9 +201,9 @@ public class TestHoodieGlobalBloomIndex extends 
TestHoodieMetadataBase {
         jsc.parallelize(Arrays.asList(new Tuple2<>("2017/10/21", "003"), new 
Tuple2<>("2017/10/22", "002"),
             new Tuple2<>("2017/10/22", "005"), new Tuple2<>("2017/10/23", 
"004"))).mapToPair(t -> t);
 
-    List<Pair<String, HoodieKey>> comparisonKeyList = HoodieJavaRDD.getJavaRDD(
-        index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo,
-            HoodieJavaPairRDD.of(partitionRecordKeyPairRDD))).collect();
+    List<Pair<HoodieFileGroupId, String>> comparisonKeyList =
+        index.explodeRecordsWithFileComparisons(partitionToFileIndexInfo, 
HoodieJavaPairRDD.of(partitionRecordKeyPairRDD))
+            .collectAsList();
 
     /*
      * expecting: f4, HoodieKey { recordKey=003 partitionPath=2017/10/23} f1, 
HoodieKey { recordKey=003
@@ -216,7 +216,7 @@ public class TestHoodieGlobalBloomIndex extends 
TestHoodieMetadataBase {
     assertEquals(10, comparisonKeyList.size());
 
     Map<String, List<String>> recordKeyToFileComps = comparisonKeyList.stream()
-        .collect(Collectors.groupingBy(t -> t.getRight().getRecordKey(), 
Collectors.mapping(Pair::getKey, Collectors.toList())));
+        .collect(Collectors.groupingBy(t -> t.getRight(), Collectors.mapping(t 
-> t.getLeft().getFileId(), Collectors.toList())));
 
     assertEquals(4, recordKeyToFileComps.size());
     assertEquals(new HashSet<>(Arrays.asList("f4", "f1", "f3")), new 
HashSet<>(recordKeyToFileComps.get("002")));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index 1d56e63fad9..9ce0947883f 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -105,9 +105,9 @@ public interface HoodieData<T> extends Serializable {
       Iterator<O>> func, boolean preservesPartitioning);
 
   /**
-   * Maps every element in the collection into a collection of the new 
elements (provided by
-   * {@link Iterator}) using provided mapping {@code func}, subsequently 
flattening the result
-   * (by concatenating) into a single collection
+   * Maps every element in the collection into a collection of the new 
elements using provided
+   * mapping {@code func}, subsequently flattening the result (by 
concatenating) into a single
+   * collection
    *
    * This is an intermediate operation
    *
@@ -117,6 +117,17 @@ public interface HoodieData<T> extends Serializable {
    */
   <O> HoodieData<O> flatMap(SerializableFunction<T, Iterator<O>> func);
 
+  /**
+   * Maps every element in the collection into a collection of the {@link 
Pair}s of new elements
+   * using provided mapping {@code func}, subsequently flattening the result 
(by concatenating) into
+   * a single collection
+   *
+   * NOTE: That this operation will convert container from {@link HoodieData} 
to {@link HoodiePairData}
+   *
+   * This is an intermediate operation
+   */
+  <K, V> HoodiePairData<K, V> flatMapToPair(SerializableFunction<T, Iterator<? 
extends Pair<K, V>>> func);
+
   /**
    * Maps every element in the collection using provided mapping {@code func} 
into a {@link Pair<K, V>}
    * of elements {@code K} and {@code V}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
index b2a503a85b3..c6287a744e0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
@@ -125,6 +125,16 @@ public class HoodieListData<T> extends 
HoodieBaseListData<T> implements HoodieDa
     return new HoodieListData<>(mappedStream, lazy);
   }
 
+  @Override
+  public <K, V> HoodiePairData<K, V> flatMapToPair(SerializableFunction<T, 
Iterator<? extends Pair<K, V>>> func) {
+    Function<T, Iterator<? extends Pair<K, V>>> mapper = 
throwingMapWrapper(func);
+    Stream<Pair<K, V>> mappedStream = asStream().flatMap(e ->
+        StreamSupport.stream(
+            Spliterators.spliteratorUnknownSize(mapper.apply(e), 
Spliterator.ORDERED), true));
+
+    return new HoodieListPairData<>(mappedStream, lazy);
+  }
+
   @Override
   public <K, V> HoodiePairData<K, V> mapToPair(SerializablePairFunction<T, K, 
V> func) {
     Function<T, Pair<K, V>> throwableMapToPairFunc = 
throwingMapToPairWrapper(func);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
index a389649548e..39ce1411575 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListPairData.java
@@ -23,15 +23,20 @@ import org.apache.hudi.common.function.SerializableFunction;
 import org.apache.hudi.common.function.SerializablePairFunction;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.MappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Spliterator;
+import java.util.Spliterators;
 import java.util.function.Function;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import static 
org.apache.hudi.common.function.FunctionWrapper.throwingMapToPairWrapper;
 import static 
org.apache.hudi.common.function.FunctionWrapper.throwingMapWrapper;
@@ -136,6 +141,24 @@ public class HoodieListPairData<K, V> extends 
HoodieBaseListData<Pair<K, V>> imp
     return new HoodieListData<>(asStream().map(uncheckedMapper), lazy);
   }
 
+  @Override
+  public <W> HoodiePairData<K, W> mapValues(SerializableFunction<V, W> func) {
+    Function<V, W> uncheckedMapper = throwingMapWrapper(func);
+    return new HoodieListPairData<>(asStream().map(p -> Pair.of(p.getKey(), 
uncheckedMapper.apply(p.getValue()))), lazy);
+  }
+
+  public <W> HoodiePairData<K, W> flatMapValues(SerializableFunction<V, 
Iterator<W>> func) {
+    Function<V, Iterator<W>> uncheckedMapper = throwingMapWrapper(func);
+    return new HoodieListPairData<>(asStream().flatMap(p -> {
+      Iterator<W> mappedValuesIterator = uncheckedMapper.apply(p.getValue());
+      Iterator<Pair<K, W>> mappedPairsIterator =
+          new MappingIterator<>(mappedValuesIterator, w -> Pair.of(p.getKey(), 
w));
+
+      return StreamSupport.stream(
+          Spliterators.spliteratorUnknownSize(mappedPairsIterator, 
Spliterator.ORDERED), true);
+    }), lazy);
+  }
+
   @Override
   public <L, W> HoodiePairData<L, W> 
mapToPair(SerializablePairFunction<Pair<K, V>, L, W> mapToPairFunc) {
     return new HoodieListPairData<>(asStream().map(p -> 
throwingMapToPairWrapper(mapToPairFunc).apply(p)), lazy);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
index 49fa7174da9..1d3622786fd 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodiePairData.java
@@ -90,12 +90,17 @@ public interface HoodiePairData<K, V> extends Serializable {
   HoodiePairData<K, V> reduceByKey(SerializableBiFunction<V, V, V> combiner, 
int parallelism);
 
   /**
-   * @param func serializable map function.
-   * @param <O>  output object type.
-   * @return {@link HoodieData<O>} containing the result. Actual execution may 
be deferred.
+   * Maps key-value pairs of this {@link HoodiePairData} container leveraging 
provided mapper
+   *
+   * NOTE: That this returns {@link HoodieData} and not {@link HoodiePairData}
    */
   <O> HoodieData<O> map(SerializableFunction<Pair<K, V>, O> func);
 
+  /**
+   * Maps values of this {@link HoodiePairData} container leveraging provided 
mapper
+   */
+  <W> HoodiePairData<K, W> mapValues(SerializableFunction<V, W> func);
+
   /**
    * @param mapToPairFunc serializable map function to generate another pair.
    * @param <L>           new key type.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/function/ThrowingConsumer.java
similarity index 59%
copy from 
hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java
copy to 
hudi-common/src/main/java/org/apache/hudi/common/function/ThrowingConsumer.java
index a4655764a97..1cbae113fcb 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/function/ThrowingConsumer.java
@@ -16,29 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.common.util.collection;
+package org.apache.hudi.common.function;
 
-import java.util.Iterator;
-import java.util.function.Function;
-
-// TODO java-docs
-public class MappingIterator<I, O> implements Iterator<O> {
-
-  protected final Iterator<I> source;
-  private final Function<I, O> mapper;
-
-  public MappingIterator(Iterator<I> source, Function<I, O> mapper) {
-    this.source = source;
-    this.mapper = mapper;
-  }
+/**
+ * Throwing counterpart of {@link java.util.function.Consumer}
+ */
+@FunctionalInterface
+public interface ThrowingConsumer<T> {
 
-  @Override
-  public boolean hasNext() {
-    return source.hasNext();
-  }
+  void accept(T t) throws Exception;
 
-  @Override
-  public O next() {
-    return mapper.apply(source.next());
-  }
-}
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
index 7d497408e64..a8c695240cd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieAvroIndexedRecord.java
@@ -113,7 +113,7 @@ public class HoodieAvroIndexedRecord extends 
HoodieRecord<IndexedRecord> {
 
   @Override
   public HoodieRecord rewriteRecord(Schema recordSchema, Properties props, 
Schema targetSchema) throws IOException {
-    GenericRecord record = HoodieAvroUtils.rewriteRecord((GenericRecord) data, 
targetSchema);
+    GenericRecord record = HoodieAvroUtils.rewriteRecordWithNewSchema(data, 
targetSchema);
     return new HoodieAvroIndexedRecord(key, record, operation, metaData);
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index e7c749a2911..00fa1b97db0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -338,6 +338,10 @@ public class HoodieTableMetaClient implements Serializable 
{
     return hadoopConf.get();
   }
 
+  public SerializableConfiguration getSerializableHadoopConf() {
+    return hadoopConf;
+  }
+
   /**
    * Get the active instants as a timeline.
    *
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FlatteningIterator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FlatteningIterator.java
new file mode 100644
index 00000000000..4adf6b5c91f
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/FlatteningIterator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Iterator flattening source {@link Iterator} holding other {@link Iterator}s
+ */
+public final class FlatteningIterator<T, I extends Iterator<T>> implements 
Iterator<T> {
+
+  private final Iterator<I> sourceIterator;
+  private Iterator<T> innerSourceIterator;
+
+  public FlatteningIterator(Iterator<I> source) {
+    this.sourceIterator = source;
+  }
+
+  public boolean hasNext() {
+    while (innerSourceIterator == null || !innerSourceIterator.hasNext()) {
+      if (sourceIterator.hasNext()) {
+        innerSourceIterator = sourceIterator.next();
+      } else {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Override
+  public T next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException();
+    }
+
+    return innerSourceIterator.next();
+  }
+}
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java
index a4655764a97..24b0961470b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/MappingIterator.java
@@ -21,7 +21,9 @@ package org.apache.hudi.common.util.collection;
 import java.util.Iterator;
 import java.util.function.Function;
 
-// TODO java-docs
+/**
+ * Iterator mapping elements of the provided source {@link Iterator} from 
{@code I} to {@code O}
+ */
 public class MappingIterator<I, O> implements Iterator<O> {
 
   protected final Iterator<I> source;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 1034aa31255..f343d6437cf 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -19,12 +19,12 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.avro.model.HoodieMetadataBloomFilter;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
@@ -73,11 +73,12 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
 
   private static final Logger LOG = 
LogManager.getLogger(BaseTableMetadata.class);
 
-  public static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
-  public static final int BUFFER_SIZE = 10 * 1024 * 1024;
+  protected static final long MAX_MEMORY_SIZE_IN_BYTES = 1024 * 1024 * 1024;
+  // NOTE: Buffer-size is deliberately set pretty low, since MT internally is 
relying
+  //       on HFile (serving as persisted binary key-value mapping) to do 
caching
+  protected static final int BUFFER_SIZE = 10 * 1024; // 10Kb
 
   protected final transient HoodieEngineContext engineContext;
-  protected final SerializableConfiguration hadoopConf;
   protected final SerializablePath dataBasePath;
   protected final HoodieTableMetaClient dataMetaClient;
   protected final Option<HoodieMetadataMetrics> metrics;
@@ -92,9 +93,11 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
   protected BaseTableMetadata(HoodieEngineContext engineContext, 
HoodieMetadataConfig metadataConfig,
                               String dataBasePath, String 
spillableMapDirectory) {
     this.engineContext = engineContext;
-    this.hadoopConf = new 
SerializableConfiguration(engineContext.getHadoopConf());
     this.dataBasePath = new SerializablePath(new CachingPath(dataBasePath));
-    this.dataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(dataBasePath).build();
+    this.dataMetaClient = HoodieTableMetaClient.builder()
+        .setConf(engineContext.getHadoopConf().get())
+        .setBasePath(dataBasePath)
+        .build();
     this.spillableMapDirectory = spillableMapDirectory;
     this.metadataConfig = metadataConfig;
 
@@ -123,8 +126,10 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
         throw new HoodieMetadataException("Failed to retrieve list of 
partition from metadata", e);
       }
     }
-    return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, 
dataBasePath.toString(),
-        metadataConfig.shouldAssumeDatePartitioning()).getAllPartitionPaths();
+
+    FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
+        createFileSystemBackedTableMetadata();
+    return fileSystemBackedTableMetadata.getAllPartitionPaths();
   }
 
   /**
@@ -148,8 +153,9 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
       }
     }
 
-    return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, 
dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning())
-        .getAllFilesInPartition(partitionPath);
+    FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
+        createFileSystemBackedTableMetadata();
+    return fileSystemBackedTableMetadata.getAllFilesInPartition(partitionPath);
   }
 
   @Override
@@ -168,8 +174,9 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
       }
     }
 
-    return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, 
dataBasePath.toString(), metadataConfig.shouldAssumeDatePartitioning())
-        .getAllFilesInPartitions(partitions);
+    FileSystemBackedTableMetadata fileSystemBackedTableMetadata =
+        createFileSystemBackedTableMetadata();
+    return fileSystemBackedTableMetadata.getAllFilesInPartitions(partitions);
   }
 
   @Override
@@ -205,6 +212,7 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
     HoodieTimer timer = HoodieTimer.start();
     Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
     Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    // TODO simplify (no sorting is required)
     partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
           final String bloomFilterIndexKey = 
HoodieMetadataPayload.getBloomFilterIndexKey(
               new PartitionIndexID(partitionNameFileNamePair.getLeft()), new 
FileIndexID(partitionNameFileNamePair.getRight()));
@@ -227,7 +235,11 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
         if (bloomFilterMetadata.isPresent()) {
           if (!bloomFilterMetadata.get().getIsDeleted()) {
             
ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
-            final ByteBuffer bloomFilterByteBuffer = 
bloomFilterMetadata.get().getBloomFilter();
+            // NOTE: We have to duplicate the [[ByteBuffer]] object here since:
+            //        - Reading out [[ByteBuffer]] mutates its state
+            //        - [[BloomFilterMetadata]] could be re-used, and hence 
have to stay immutable
+            final ByteBuffer bloomFilterByteBuffer =
+                bloomFilterMetadata.get().getBloomFilter().duplicate();
             final String bloomFilterType = bloomFilterMetadata.get().getType();
             final BloomFilter bloomFilter = BloomFilterFactory.fromString(
                 
StandardCharsets.UTF_8.decode(bloomFilterByteBuffer).toString(), 
bloomFilterType);
@@ -332,7 +344,7 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
       HoodieMetadataPayload metadataPayload = record.getData();
       checkForSpuriousDeletes(metadataPayload, recordKey);
       try {
-        return metadataPayload.getFileStatuses(hadoopConf.get(), 
partitionPath);
+        return metadataPayload.getFileStatuses(getHadoopConf(), partitionPath);
       } catch (IOException e) {
         throw new HoodieIOException("Failed to extract file-statuses from the 
payload", e);
       }
@@ -358,7 +370,7 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
         getRecordsByKeys(new ArrayList<>(partitionIdToPathMap.keySet()), 
MetadataPartitionType.FILES.getPartitionPath());
     metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
 
-    FileSystem fs = partitionPaths.get(0).getFileSystem(hadoopConf.get());
+    FileSystem fs = partitionPaths.get(0).getFileSystem(getHadoopConf());
 
     Map<String, FileStatus[]> partitionPathToFilesMap = 
partitionIdRecordPairs.parallelStream()
         .map(pair -> {
@@ -399,18 +411,27 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
     }
   }
 
+  private FileSystemBackedTableMetadata createFileSystemBackedTableMetadata() {
+    return new FileSystemBackedTableMetadata(getEngineContext(), 
dataMetaClient.getSerializableHadoopConf(), dataBasePath.toString(),
+        metadataConfig.shouldAssumeDatePartitioning());
+  }
+
   protected abstract Option<HoodieRecord<HoodieMetadataPayload>> 
getRecordByKey(String key, String partitionName);
 
   public abstract List<Pair<String, 
Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> 
key, String partitionName);
 
   protected HoodieEngineContext getEngineContext() {
-    return engineContext != null ? engineContext : new 
HoodieLocalEngineContext(hadoopConf.get());
+    return engineContext != null ? engineContext : new 
HoodieLocalEngineContext(getHadoopConf());
   }
 
   public HoodieMetadataConfig getMetadataConfig() {
     return metadataConfig;
   }
 
+  protected Configuration getHadoopConf() {
+    return dataMetaClient.getHadoopConf();
+  }
+
   protected String getLatestDataInstantTime() {
     return 
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant()
         .map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 09c6b35309a..ecb0da8792d 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -24,7 +24,6 @@ import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -53,6 +52,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.exception.TableNotFoundException;
+import org.apache.hudi.util.Transient;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.io.storage.HoodieSeekingFileReader;
 
@@ -90,19 +90,18 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieBackedTableMetadata.class);
 
-  private static final Schema METADATA_RECORD_SCHEMA = 
HoodieMetadataRecord.getClassSchema();
+  private final String metadataBasePath;
 
-  private String metadataBasePath;
-  // Metadata table's timeline and metaclient
   private HoodieTableMetaClient metadataMetaClient;
   private HoodieTableConfig metadataTableConfig;
+
   private HoodieTableFileSystemView metadataFileSystemView;
   // should we reuse the open file handles, across calls
   private final boolean reuse;
 
   // Readers for the latest file slice corresponding to file groups in the 
metadata partition
-  private final Map<Pair<String, String>, Pair<HoodieSeekingFileReader<?>, 
HoodieMetadataLogRecordReader>> partitionReaders =
-      new ConcurrentHashMap<>();
+  private final Transient<Map<Pair<String, String>, 
Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader>>> 
partitionReaders =
+      Transient.lazy(ConcurrentHashMap::new);
 
   public HoodieBackedTableMetadata(HoodieEngineContext engineContext, 
HoodieMetadataConfig metadataConfig,
                                    String datasetBasePath, String 
spillableMapDirectory) {
@@ -113,18 +112,19 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
                                    String datasetBasePath, String 
spillableMapDirectory, boolean reuse) {
     super(engineContext, metadataConfig, datasetBasePath, 
spillableMapDirectory);
     this.reuse = reuse;
+    this.metadataBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(dataBasePath.toString());
+
     initIfNeeded();
   }
 
   private void initIfNeeded() {
-    this.metadataBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(dataBasePath.toString());
     if (!isMetadataTableEnabled) {
       if (!HoodieTableMetadata.isMetadataTable(metadataBasePath)) {
         LOG.info("Metadata table is disabled.");
       }
     } else if (this.metadataMetaClient == null) {
       try {
-        this.metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataBasePath).build();
+        this.metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(getHadoopConf()).setBasePath(metadataBasePath).build();
         this.metadataFileSystemView = getFileSystemView(metadataMetaClient);
         this.metadataTableConfig = metadataMetaClient.getTableConfig();
         this.isBloomFilterIndexEnabled = 
metadataConfig.isBloomFilterIndexEnabled();
@@ -213,14 +213,14 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
 
               return mergedRecords.stream()
                 .map(keyRecordPair -> keyRecordPair.getValue().orElse(null))
+                .filter(Objects::nonNull)
                 .iterator();
             } catch (IOException ioe) {
               throw new HoodieIOException("Error merging records from metadata 
table for  " + sortedKeyPrefixes.size() + " key : ", ioe);
             } finally {
               closeReader(readers);
             }
-          })
-        .filter(Objects::nonNull);
+          });
   }
 
   @Override
@@ -425,7 +425,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   private Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> 
getOrCreateReaders(String partitionName, FileSlice slice) {
     if (reuse) {
       Pair<String, String> key = Pair.of(partitionName, slice.getFileId());
-      return partitionReaders.computeIfAbsent(key, ignored -> 
openReaders(partitionName, slice));
+      return partitionReaders.get().computeIfAbsent(key, ignored -> 
openReaders(partitionName, slice));
     } else {
       return openReaders(partitionName, slice);
     }
@@ -462,7 +462,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     if (basefile.isPresent()) {
       String baseFilePath = basefile.get().getPath();
       baseFileReader = (HoodieSeekingFileReader<?>) 
HoodieFileReaderFactory.getReaderFactory(HoodieRecordType.AVRO)
-          .getFileReader(hadoopConf.get(), new Path(baseFilePath));
+          .getFileReader(getHadoopConf(), new Path(baseFilePath));
       baseFileOpenMs = timer.endTimer();
       LOG.info(String.format("Opened metadata base file from %s at instant %s 
in %d ms", baseFilePath,
           basefile.get().getCommitTime(), baseFileOpenMs));
@@ -596,7 +596,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
    */
   private synchronized void close(Pair<String, String> partitionFileSlicePair) 
{
     Pair<HoodieSeekingFileReader<?>, HoodieMetadataLogRecordReader> readers =
-        partitionReaders.remove(partitionFileSlicePair);
+        partitionReaders.get().remove(partitionFileSlicePair);
     closeReader(readers);
   }
 
@@ -604,10 +604,10 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
    * Close and clear all the partitions readers.
    */
   private void closePartitionReaders() {
-    for (Pair<String, String> partitionFileSlicePair : 
partitionReaders.keySet()) {
+    for (Pair<String, String> partitionFileSlicePair : 
partitionReaders.get().keySet()) {
       close(partitionFileSlicePair);
     }
-    partitionReaders.clear();
+    partitionReaders.get().clear();
   }
 
   private void closeReader(Pair<HoodieSeekingFileReader<?>, 
HoodieMetadataLogRecordReader> readers) {
@@ -629,10 +629,6 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     return isMetadataTableEnabled;
   }
 
-  public SerializableConfiguration getHadoopConf() {
-    return hadoopConf;
-  }
-
   public HoodieTableMetaClient getMetadataMetaClient() {
     return metadataMetaClient;
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
index 76b341609de..4d1db7e81d4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataFileSystemView.java
@@ -54,9 +54,8 @@ public class HoodieMetadataFileSystemView extends 
HoodieTableFileSystemView {
                                       HoodieTableMetaClient metaClient,
                                       HoodieTimeline visibleActiveTimeline,
                                       HoodieMetadataConfig metadataConfig) {
-    super(metaClient, visibleActiveTimeline);
-    this.tableMetadata = HoodieTableMetadata.create(engineContext, 
metadataConfig, metaClient.getBasePath(),
-        FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true);
+    this(metaClient, visibleActiveTimeline, 
HoodieTableMetadata.create(engineContext, metadataConfig,
+        metaClient.getBasePath(), 
FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true));
   }
 
   /**
diff --git a/hudi-common/src/main/java/org/apache/hudi/util/Transient.java 
b/hudi-common/src/main/java/org/apache/hudi/util/Transient.java
new file mode 100644
index 00000000000..0d8f6ad6565
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/util/Transient.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.util;
+
+import org.apache.hudi.common.function.SerializableSupplier;
+import org.apache.hudi.common.function.ThrowingConsumer;
+
+import javax.annotation.concurrent.ThreadSafe;
+import java.io.Serializable;
+
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
+/**
+ * {@link Serializable} counterpart of {@link Lazy}
+ *
+ * @param <T> type of the object being held by {@link Transient}
+ */
+@ThreadSafe
+public class Transient<T> implements Serializable {
+
+  private SerializableSupplier<T> initializer;
+
+  private transient boolean initialized;
+  private transient T ref;
+
+  private Transient(SerializableSupplier<T> initializer) {
+    checkArgument(initializer != null);
+
+    this.initializer = initializer;
+    this.ref = null;
+    this.initialized = false;
+  }
+
+  private Transient(T value, SerializableSupplier<T> initializer) {
+    checkArgument(value != null);
+    checkArgument(initializer != null);
+
+    this.initializer = initializer;
+    this.ref = value;
+    this.initialized = true;
+  }
+
+  public T get() {
+    if (!initialized) {
+      synchronized (this) {
+        if (!initialized) {
+          this.ref = initializer.get();
+          initialized = true;
+        }
+      }
+    }
+
+    return ref;
+  }
+
+  public void reset() {
+    synchronized (this) {
+      this.ref = null;
+      this.initialized = false;
+    }
+  }
+
+  public void destroy(ThrowingConsumer<T> cleaner) throws Exception {
+    synchronized (this) {
+      if (initialized) {
+        cleaner.accept(ref);
+      }
+
+      this.ref = null;
+      this.initialized = false;
+      this.initializer = null;
+    }
+  }
+
+  /**
+   * Creates instance of {@link Transient} by lazily executing provided {@code 
initializer},
+   * to instantiate value of type {@link T}. Same initializer will be used to 
re-instantiate
+   * the value after original one being dropped during 
serialization/deserialization cycle
+   */
+  public static <T> Transient<T> lazy(SerializableSupplier<T> initializer) {
+    return new Transient<>(initializer);
+  }
+
+  /**
+   * Creates instance of {@link Transient} by eagerly setting it to provided 
{@code value},
+   * while given {@code initializer} will be used to re-instantiate the value 
after original
+   * one being dropped during serialization/deserialization cycle
+   */
+  public static <T> Transient<T> eager(T value, SerializableSupplier<T> 
initializer) {
+    return new Transient<>(value, initializer);
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestIterators.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestIterators.java
new file mode 100644
index 00000000000..6e9d43388b1
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/collection/TestIterators.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.collection;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestIterators {
+
+  @Test
+  public void testFlatteningIterator() {
+    List<List<Integer>> listOfList =
+        Arrays.asList(
+            Arrays.asList(0),
+            Arrays.asList(1, 2),
+            Collections.emptyList(),
+            Arrays.asList(3, 4, 5)
+        );
+
+    List<Integer> flattenedList =
+        toStream(new FlatteningIterator<>(new 
MappingIterator<>(listOfList.iterator(), List::iterator)))
+            .collect(Collectors.toList());
+
+    assertEquals(Arrays.asList(0, 1, 2, 3, 4, 5), flattenedList);
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
index da53f00e697..c4d7c1ff5b0 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/adapter/Spark2Adapter.scala
@@ -33,13 +33,11 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.{Command, DeleteFromTable, 
LogicalPlan}
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
Spark24HoodieParquetFileFormat}
+import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
 import org.apache.spark.sql.hudi.SparkAdapter
 import org.apache.spark.sql.hudi.parser.HoodieSpark2ExtendedSqlParser
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.{DataType, StructType}
-import org.apache.spark.sql._
-import org.apache.spark.sql.execution.vectorized.MutableColumnarRow
-import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel._
 

Reply via email to