nsivabalan commented on a change in pull request #4352:
URL: https://github.com/apache/hudi/pull/4352#discussion_r795049484



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
##########
@@ -199,6 +260,21 @@ public Builder enable(boolean enable) {
       return this;
     }
 
+    public Builder withMetadataIndexBloomFilter(boolean enable) {
+      metadataConfig.setValue(ENABLE_METADATA_INDEX_BLOOM_FILTER, 
String.valueOf(enable));
+      return this;
+    }
+
+    public Builder withMetadataIndexColumnStats(boolean enable) {
+      metadataConfig.setValue(ENABLE_METADATA_INDEX_COLUMN_STATS, 
String.valueOf(enable));
+      return this;
+    }
+
+    public Builder withMetadataIndexForAllColumns(boolean enable) {

Review comment:
       I don't see setter methods for file group count?  

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -125,30 +128,43 @@ private void initIfNeeded() {
     return recordsByKeys.size() == 0 ? Option.empty() : 
recordsByKeys.get(0).getValue();
   }
 
-  protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
getRecordsByKeys(List<String> keys, String partitionName) {
-    Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = 
openReadersIfNeeded(keys.get(0), partitionName);
-    try {
-      List<Long> timings = new ArrayList<>();
-      HoodieFileReader baseFileReader = readers.getKey();
-      HoodieMetadataMergedLogRecordReader logRecordScanner = 
readers.getRight();
+  @Override
+  protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
getRecordsByKeys(List<String> keys,
+                                                                               
              String partitionName) {
+    Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = 
getPartitionFileSlices(partitionName, keys);

Review comment:
       I have added comments down below. But getPartitionFileSlices should 
follow same semantics as openReadersIfNeeded. may be we should try to cache 
them and reuse if possible. I see that we don't do that as of now in this 
patch. Prior to this patch, this call was made within openReadersIfNeeded. 
   Or do you think there are any challenges with that?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -110,55 +199,97 @@ private HoodieMetadataPayload(String key, int type, 
Map<String, HoodieMetadataFi
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionListRecord(List<String> partitions) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
-    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L,  false)));
+    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L, false)));
 
-    HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+    HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.getPartitionPath());
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+        fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
   /**
    * Create and return a {@code HoodieMetadataPayload} to save list of files 
within a partition.
    *
-   * @param partition The name of the partition
-   * @param filesAdded Mapping of files to their sizes for files which have 
been added to this partition
+   * @param partition    The name of the partition
+   * @param filesAdded   Mapping of files to their sizes for files which have 
been added to this partition
    * @param filesDeleted List of files which have been deleted from this 
partition
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionFilesRecord(String partition,
-                                                                               
Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+                                                                               
Option<Map<String, Long>> filesAdded,
+                                                                               
Option<List<String>> filesDeleted) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
     filesAdded.ifPresent(
         m -> m.forEach((filename, size) -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(size, false))));
     filesDeleted.ifPresent(
-        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L,  true))));
+        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L, true))));
 
-    HoodieKey key = new HoodieKey(partition, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+    HoodieKey key = new HoodieKey(partition, 
MetadataPartitionType.FILES.getPartitionPath());
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
+  /**
+   * Create bloom filter metadata record.
+   *
+   * @param partitionName - Partition name
+   * @param baseFileName  - Base file name for which the bloom filter needs to 
persisted
+   * @param timestamp     - Instant timestamp responsible for this record
+   * @param bloomFilter   - Bloom filter for the File
+   * @param isDeleted     - Is the bloom filter no more valid
+   * @return Metadata payload containing the fileID and its bloom filter record
+   */
+  public static HoodieRecord<HoodieMetadataPayload> 
createBloomFilterMetadataRecord(final String partitionName,
+                                                                               
     final String baseFileName,
+                                                                               
     final String timestamp,
+                                                                               
     final ByteBuffer bloomFilter,
+                                                                               
     final boolean isDeleted) {
+    ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
+            && FSUtils.isBaseFile(new Path(baseFileName)),
+        "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
+    final String bloomFilterKey = new 
PartitionIndexID(partitionName).asBase64EncodedString()
+        .concat(new FileIndexID(baseFileName).asBase64EncodedString());
+    HoodieKey key = new HoodieKey(bloomFilterKey, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+
+    // TODO: Get the bloom filter type from the file
+    HoodieMetadataBloomFilter metadataBloomFilter =
+        new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(),
+            timestamp, bloomFilter, isDeleted);
+    HoodieMetadataPayload metadataPayload = new 
HoodieMetadataPayload(key.getRecordKey(),
+        HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter);
+    return new HoodieRecord<>(key, metadataPayload);
+  }
+
   @Override
   public HoodieMetadataPayload preCombine(HoodieMetadataPayload 
previousRecord) {
     ValidationUtils.checkArgument(previousRecord.type == type,
-        "Cannot combine " + previousRecord.type  + " with " + type);
-
-    Map<String, HoodieMetadataFileInfo> combinedFileInfo = null;
+        "Cannot combine " + previousRecord.type + " with " + type);
 
     switch (type) {
-      case PARTITION_LIST:
-      case FILE_LIST:
-        combinedFileInfo = combineFilesystemMetadata(previousRecord);
-        break;
+      case METADATA_TYPE_PARTITION_LIST:
+      case METADATA_TYPE_FILE_LIST:
+        Map<String, HoodieMetadataFileInfo> combinedFileInfo = 
combineFilesystemMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, combinedFileInfo);
+      case METADATA_TYPE_BLOOM_FILTER:
+        HoodieMetadataBloomFilter combineBloomFilterMetadata = 
combineBloomFilterMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, 
combineBloomFilterMetadata);
+      case METADATA_TYPE_COLUMN_STATS:
+        return new HoodieMetadataPayload(key, type, 
combineColumnStatsMetadatat(previousRecord));
       default:
         throw new HoodieMetadataException("Unknown type of 
HoodieMetadataPayload: " + type);
     }
+  }
+
+  private HoodieMetadataBloomFilter 
combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) {
+    return this.bloomFilterMetadata;
+  }
 
-    return new HoodieMetadataPayload(key, type, combinedFileInfo);
+  private HoodieColumnStats combineColumnStatsMetadatat(HoodieMetadataPayload 
previousRecord) {
+    return this.columnStatMetadata;

Review comment:
       just so I we are in same page. 
   for bloom filter partition, I understand either a file will be added or 
deleted and there are no mutations as such. but for column stats, if a file is 
updated, we fetch column stats from latest file and so we should be good to 
completely ignore older version of the record? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -233,38 +249,69 @@ private void initIfNeeded() {
   }
 
   /**
-   * Returns a new pair of readers to the base and log files.
+   * Get the file slice details for the given key in a partition.
+   *
+   * @param partitionName - Metadata partition name
+   * @param key           - Key to get the file slice for
+   * @return Partition and file slice pair for the given key
    */
-  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> 
openReadersIfNeeded(String key, String partitionName) {
-    return partitionReaders.computeIfAbsent(partitionName, k -> {
-      try {
-        final long baseFileOpenMs;
-        final long logScannerOpenMs;
-        HoodieFileReader baseFileReader = null;
-        HoodieMetadataMergedLogRecordReader logRecordScanner = null;
+  private Pair<String, FileSlice> getPartitionFileSlice(final String 
partitionName, final String key) {
+    // Metadata is in sync till the latest completed instant on the dataset
+    List<FileSlice> latestFileSlices =
+        
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
partitionName);
+
+    final FileSlice slice = 
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key,
+        latestFileSlices.size()));
+    return Pair.of(partitionName, slice);
+  }
+
+  /**
+   * Get the latest file slices for the interested keys in a given partition.
+   *
+   * @param partitionName - Partition to get the file slices from
+   * @param keys          - Interested keys
+   * @return FileSlices for the keys
+   */
+  private Map<Pair<String, FileSlice>, List<String>> 
getPartitionFileSlices(final String partitionName, final List<String> keys) {
+    // Metadata is in sync till the latest completed instant on the dataset
+    List<FileSlice> latestFileSlices =
+        
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
partitionName);
+
+    Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = 
new HashMap<>();
+    for (String key : keys) {
+      final FileSlice slice = 
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key,
+          latestFileSlices.size()));
+      final Pair<String, FileSlice> keyFileSlicePair = Pair.of(partitionName, 
slice);
+      partitionFileSliceToKeysMap.computeIfAbsent(keyFileSlicePair, k -> new 
ArrayList<>()).add(key);
+    }
+    return partitionFileSliceToKeysMap;
+  }
 
-        // Metadata is in sync till the latest completed instant on the dataset
+  /**
+   * Create a file reader and the record scanner for a given partition and 
file slice
+   * if readers are not already available.
+   *
+   * @param partitionName - Partition name
+   * @param slice         - The file slice to open readers for
+   * @return File reader and the record scanner pair for the requested file 
slice
+   */
+  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> 
openReadersIfNeeded(String partitionName, FileSlice slice) {
+    return partitionReaders.computeIfAbsent(Pair.of(partitionName, 
slice.getFileId()), k -> {
+      try {
         HoodieTimer timer = new HoodieTimer().startTimer();
-        List<FileSlice> latestFileSlices = 
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
partitionName);
-        if (latestFileSlices.size() == 0) {
-          // empty partition
-          return Pair.of(null, null);
-        }
-        ValidationUtils.checkArgument(latestFileSlices.size() == 1, 
String.format("Invalid number of file slices: found=%d, required=%d", 
latestFileSlices.size(), 1));
-        final FileSlice slice = 
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key, 
latestFileSlices.size()));
 
         // Open base file reader
         Pair<HoodieFileReader, Long> baseFileReaderOpenTimePair = 
getBaseFileReader(slice, timer);
-        baseFileReader = baseFileReaderOpenTimePair.getKey();
-        baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
+        HoodieFileReader baseFileReader = baseFileReaderOpenTimePair.getKey();
+        final long baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
 
         // Open the log record scanner using the log files from the latest 
file slice
-        Pair<HoodieMetadataMergedLogRecordReader, Long> 
logRecordScannerOpenTimePair = getLogRecordScanner(slice,
-            partitionName);
-        logRecordScanner = logRecordScannerOpenTimePair.getKey();
-        logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
+        Pair<HoodieMetadataMergedLogRecordReader, Long> 
logRecordScannerOpenTimePair = getLogRecordScanner(slice, partitionName);
+        HoodieMetadataMergedLogRecordReader logRecordScanner = 
logRecordScannerOpenTimePair.getKey();
+        final long logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
 
-        metrics.ifPresent(metrics -> 
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR, baseFileOpenMs + 
logScannerOpenMs));
+        metrics.ifPresent(metrics -> 
metrics.updateMetrics(HoodieMetadataMetrics.SCAN_STR,

Review comment:
       minor. can we fix the name of the metric 
(HoodieMetadataMetrics.SCAN_STR). this metric represents time to just open the 
readers. not entire scan time.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -94,13 +136,60 @@ public HoodieMetadataPayload(Option<GenericRecord> record) 
{
           filesystemMetadata.put(k.toString(), new 
HoodieMetadataFileInfo((Long) v.get("size"), (Boolean) v.get("isDeleted")));
         });
       }
+
+      if (type == METADATA_TYPE_BLOOM_FILTER) {
+        final GenericRecord metadataRecord = (GenericRecord) 
record.get().get(SCHEMA_FIELD_ID_BLOOM_FILTER);
+        if (metadataRecord == null) {
+          throw new HoodieMetadataException("Valid " + 
SCHEMA_FIELD_ID_BLOOM_FILTER + " record expected for type: " + 
METADATA_TYPE_BLOOM_FILTER);
+        }
+        bloomFilterMetadata = new HoodieMetadataBloomFilter(
+            (String) metadataRecord.get(BLOOM_FILTER_FIELD_TYPE),
+            (String) metadataRecord.get(BLOOM_FILTER_FIELD_TIMESTAMP),
+            (ByteBuffer) metadataRecord.get(BLOOM_FILTER_FIELD_BLOOM_FILTER),
+            (Boolean) metadataRecord.get(BLOOM_FILTER_FIELD_IS_DELETED)
+        );
+      }
+
+      if (type == METADATA_TYPE_COLUMN_STATS) {
+        GenericRecord v = (GenericRecord) 
record.get().get(SCHEMA_FIELD_ID_COLUMN_STATS);
+        if (v == null) {
+          throw new HoodieMetadataException("Valid " + 
SCHEMA_FIELD_ID_COLUMN_STATS + " record expected for type: " + 
METADATA_TYPE_COLUMN_STATS);
+        }
+        columnStatMetadata = new HoodieColumnStats(
+            (String) v.get(COLUMN_STATS_FIELD_RESOURCE_NAME),
+            (String) v.get(COLUMN_STATS_FIELD_MIN_VALUE),
+            (String) v.get(COLUMN_STATS_FIELD_MAX_VALUE),
+            (Long) v.get(COLUMN_STATS_FIELD_NULL_COUNT),
+            (Long) v.get(COLUMN_STATS_FIELD_VALUE_COUNT),
+            (Long) v.get(COLUMN_STATS_FIELD_TOTAL_SIZE),
+            (Long) v.get(COLUMN_STATS_FIELD_TOTAL_UNCOMPRESSED_SIZE),
+            (Boolean) v.get(COLUMN_STATS_FIELD_IS_DELETED)
+        );
+      }
     }
   }
 
   private HoodieMetadataPayload(String key, int type, Map<String, 
HoodieMetadataFileInfo> filesystemMetadata) {
+    this(key, type, filesystemMetadata, null, null);
+  }
+
+  private HoodieMetadataPayload(String key, int type, 
HoodieMetadataBloomFilter metadataBloomFilter) {
+    this(key, type, null, metadataBloomFilter, null);
+  }
+
+  private HoodieMetadataPayload(String key, int type, HoodieColumnStats 
columnStats) {

Review comment:
       do you think we should name this HoodieMetadataColumnStats to be in line 
with HoodieMetadataBloomFilter ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -134,22 +139,41 @@ public BloomFilter readBloomFilter() {
   }
 
   @Override
-  public Set<String> filterRowKeys(Set candidateRowKeys) {
-    // Current implementation reads all records and filters them. In certain 
cases, it many be better to:
-    //  1. Scan a limited subset of keys (min/max range of candidateRowKeys)
-    //  2. Lookup keys individually (if the size of candidateRowKeys is much 
less than the total keys in file)
-    try {
-      List<Pair<String, R>> allRecords = readAllRecords();
-      Set<String> rowKeys = new HashSet<>();
-      allRecords.forEach(t -> {
-        if (candidateRowKeys.contains(t.getFirst())) {
-          rowKeys.add(t.getFirst());
-        }
-      });
-      return rowKeys;
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to read row keys from " + path, e);
+  public Set<String> filterRowKeys(Set<String> candidateRowKeys) {
+    return candidateRowKeys.stream().filter(k -> {
+      try {
+        return isKeyAvailable(k);

Review comment:
       I see that prior to this patch, we were reading all records and then 
looking up the cadidateRowKeys. here we are doing pointed look up for each key 
at a time. Is this based on some perf tuning? will sync up w/ you f2f to 
understand more. Can you add a line in java doc that this method expects 
candidate keys to be sorted. So, in future if someone develops more code using 
this, would know the contract from this method.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -125,30 +128,43 @@ private void initIfNeeded() {
     return recordsByKeys.size() == 0 ? Option.empty() : 
recordsByKeys.get(0).getValue();
   }
 
-  protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
getRecordsByKeys(List<String> keys, String partitionName) {
-    Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = 
openReadersIfNeeded(keys.get(0), partitionName);
-    try {
-      List<Long> timings = new ArrayList<>();
-      HoodieFileReader baseFileReader = readers.getKey();
-      HoodieMetadataMergedLogRecordReader logRecordScanner = 
readers.getRight();
+  @Override
+  protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
getRecordsByKeys(List<String> keys,
+                                                                               
              String partitionName) {
+    Map<Pair<String, FileSlice>, List<String>> partitionFileSliceToKeysMap = 
getPartitionFileSlices(partitionName, keys);
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = 
new ArrayList<>();
+    AtomicInteger fileSlicesKeysCount = new AtomicInteger();
+    partitionFileSliceToKeysMap.forEach((partitionFileSlicePair, 
fileSliceKeys) -> {
+      Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = 
openReadersIfNeeded(partitionName,
+          partitionFileSlicePair.getRight());
+      try {
+        List<Long> timings = new ArrayList<>();
+        HoodieFileReader baseFileReader = readers.getKey();
+        HoodieMetadataMergedLogRecordReader logRecordScanner = 
readers.getRight();
 
-      if (baseFileReader == null && logRecordScanner == null) {
-        return Collections.emptyList();
-      }
+        if (baseFileReader == null && logRecordScanner == null) {
+          return;
+        }
 
-      // local map to assist in merging with base file records
-      Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = 
readLogRecords(logRecordScanner, keys, timings);
-      List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = 
readFromBaseAndMergeWithLogRecords(
-          baseFileReader, keys, logRecords, timings, partitionName);
-      LOG.info(String.format("Metadata read for %s keys took [baseFileRead, 
logMerge] %s ms", keys.size(), timings));
-      return result;
-    } catch (IOException ioe) {
-      throw new HoodieIOException("Error merging records from metadata table 
for  " + keys.size() + " key : ", ioe);
-    } finally {
-      if (!reuse) {
-        close(partitionName);
+        // local map to assist in merging with base file records
+        Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = 
readLogRecords(logRecordScanner,
+            fileSliceKeys, timings);
+        result.addAll(readFromBaseAndMergeWithLogRecords(baseFileReader, 
fileSliceKeys, logRecords,
+            timings, partitionName));
+        LOG.debug(String.format("Metadata read for %s keys took [baseFileRead, 
logMerge] %s ms",
+            fileSliceKeys.size(), timings));
+        fileSlicesKeysCount.addAndGet(fileSliceKeys.size());
+      } catch (IOException ioe) {
+        throw new HoodieIOException("Error merging records from metadata table 
for  " + keys.size() + " key : ", ioe);
+      } finally {
+        if (!reuse) {
+          close(Pair.of(partitionFileSlicePair.getLeft(), 
partitionFileSlicePair.getRight().getFileId()));
+        }
       }
-    }
+    });
+
+    ValidationUtils.checkState(keys.size() == fileSlicesKeysCount.get());

Review comment:
       Are there chances that if a key is deleted (lets say a file is deleted) 
and do get called with getRecordsByKey() with invalid/deleted key. wouldn't 
that return empt when we do readFromBaseAndMergeWithLogRecords. So, total 
incoming might differ from result list is it? 
   Or do we atleast return Option.empty and so the total entries should match 
here for such cases? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -233,38 +249,69 @@ private void initIfNeeded() {
   }
 
   /**
-   * Returns a new pair of readers to the base and log files.
+   * Get the file slice details for the given key in a partition.
+   *
+   * @param partitionName - Metadata partition name
+   * @param key           - Key to get the file slice for
+   * @return Partition and file slice pair for the given key
    */
-  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> 
openReadersIfNeeded(String key, String partitionName) {
-    return partitionReaders.computeIfAbsent(partitionName, k -> {
-      try {
-        final long baseFileOpenMs;
-        final long logScannerOpenMs;
-        HoodieFileReader baseFileReader = null;
-        HoodieMetadataMergedLogRecordReader logRecordScanner = null;
+  private Pair<String, FileSlice> getPartitionFileSlice(final String 
partitionName, final String key) {
+    // Metadata is in sync till the latest completed instant on the dataset
+    List<FileSlice> latestFileSlices =
+        
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
partitionName);
+
+    final FileSlice slice = 
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key,
+        latestFileSlices.size()));
+    return Pair.of(partitionName, slice);
+  }
+
+  /**
+   * Get the latest file slices for the interested keys in a given partition.
+   *
+   * @param partitionName - Partition to get the file slices from
+   * @param keys          - Interested keys
+   * @return FileSlices for the keys
+   */
+  private Map<Pair<String, FileSlice>, List<String>> 
getPartitionFileSlices(final String partitionName, final List<String> keys) {

Review comment:
       can we find a right name for this method. this is essentially mapping 
keys to file slices. but the return value is mapped the reverse way. but 
getPartitionFileSlices() does not convey that. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -79,14 +98,53 @@ public static void deleteMetadataTable(String basePath, 
HoodieEngineContext cont
     }
   }
 
+  /**
+   * Convert commit action to metadata records for the enabled partition types.
+   *
+   * @param commitMetadata                      - Commit action metadata
+   * @param dataMetaClient                      - Meta client for the data 
table
+   * @param isMetaIndexColumnStatsForAllColumns - Do all columns need meta 
indexing?
+   * @param instantTime                         - Action instant time
+   * @return Map of partition to metadata records for the commit action
+   */
+  public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
convertMetadataToRecords(
+      HoodieEngineContext context, List<MetadataPartitionType> 
enabledPartitionTypes,
+      HoodieCommitMetadata commitMetadata, HoodieTableMetaClient 
dataMetaClient,
+      boolean isMetaIndexColumnStatsForAllColumns, String instantTime) {
+    final Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordsMap = new HashMap<>();
+    final HoodieData<HoodieRecord> filesPartitionRecordsRDD = 
context.parallelize(
+        convertMetadataToFilesPartitionRecords(commitMetadata, instantTime), 
1);
+    partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecordsRDD);
+
+    if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) {
+      final List<HoodieRecord> metadataBloomFilterRecords = 
convertMetadataToBloomFilterRecords(commitMetadata,
+          dataMetaClient, instantTime);
+      if (!metadataBloomFilterRecords.isEmpty()) {
+        final HoodieData<HoodieRecord> metadataBloomFilterRecordsRDD = 
context.parallelize(metadataBloomFilterRecords, 1);
+        partitionToRecordsMap.put(MetadataPartitionType.BLOOM_FILTERS, 
metadataBloomFilterRecordsRDD);
+      }
+    }
+
+    if (enabledPartitionTypes.contains(MetadataPartitionType.COLUMN_STATS)) {
+      final List<HoodieRecord> metadataColumnStats = 
convertMetadataToColumnStatsRecords(commitMetadata, context,

Review comment:
       same comment as above. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -110,55 +199,97 @@ private HoodieMetadataPayload(String key, int type, 
Map<String, HoodieMetadataFi
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionListRecord(List<String> partitions) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
-    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L,  false)));
+    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L, false)));
 
-    HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+    HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.getPartitionPath());
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+        fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
   /**
    * Create and return a {@code HoodieMetadataPayload} to save list of files 
within a partition.
    *
-   * @param partition The name of the partition
-   * @param filesAdded Mapping of files to their sizes for files which have 
been added to this partition
+   * @param partition    The name of the partition
+   * @param filesAdded   Mapping of files to their sizes for files which have 
been added to this partition
    * @param filesDeleted List of files which have been deleted from this 
partition
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionFilesRecord(String partition,
-                                                                               
Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+                                                                               
Option<Map<String, Long>> filesAdded,
+                                                                               
Option<List<String>> filesDeleted) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
     filesAdded.ifPresent(
         m -> m.forEach((filename, size) -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(size, false))));
     filesDeleted.ifPresent(
-        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L,  true))));
+        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L, true))));
 
-    HoodieKey key = new HoodieKey(partition, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+    HoodieKey key = new HoodieKey(partition, 
MetadataPartitionType.FILES.getPartitionPath());
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
+  /**
+   * Create bloom filter metadata record.
+   *
+   * @param partitionName - Partition name
+   * @param baseFileName  - Base file name for which the bloom filter needs to 
persisted
+   * @param timestamp     - Instant timestamp responsible for this record
+   * @param bloomFilter   - Bloom filter for the File
+   * @param isDeleted     - Is the bloom filter no more valid
+   * @return Metadata payload containing the fileID and its bloom filter record
+   */
+  public static HoodieRecord<HoodieMetadataPayload> 
createBloomFilterMetadataRecord(final String partitionName,
+                                                                               
     final String baseFileName,
+                                                                               
     final String timestamp,
+                                                                               
     final ByteBuffer bloomFilter,
+                                                                               
     final boolean isDeleted) {
+    ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
+            && FSUtils.isBaseFile(new Path(baseFileName)),
+        "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
+    final String bloomFilterKey = new 
PartitionIndexID(partitionName).asBase64EncodedString()
+        .concat(new FileIndexID(baseFileName).asBase64EncodedString());
+    HoodieKey key = new HoodieKey(bloomFilterKey, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+
+    // TODO: Get the bloom filter type from the file
+    HoodieMetadataBloomFilter metadataBloomFilter =
+        new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(),

Review comment:
       if someone has explicitly set SIMPLE as bloom type for the data table, 
this might crash right. probably good to fix in this patch only. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,12 +156,123 @@ protected BaseTableMetadata(HoodieEngineContext 
engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final String partitionName, final 
String fileName)
+      throws HoodieMetadataException {
+    if (!isBloomFilterIndexEnabled) {
+      LOG.error("Metadata bloom filter index is disabled!");
+      return Option.empty();
+    }
+
+    final Pair<String, String> partitionFileName = Pair.of(partitionName, 
fileName);
+    Map<Pair<String, String>, ByteBuffer> bloomFilters = 
getBloomFilters(Collections.singletonList(partitionFileName));
+    if (bloomFilters.isEmpty()) {
+      LOG.error("Meta index: missing bloom filter for partition: " + 
partitionName + ", file: " + fileName);
+      return Option.empty();
+    }
+
+    ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+    return Option.of(bloomFilters.get(partitionFileName));
+  }
+
+  @Override
+  public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList)
+      throws HoodieMetadataException {
+    if (!isBloomFilterIndexEnabled) {
+      LOG.error("Metadata bloom filter index is disabled!");
+      return Collections.emptyMap();
+    }
+    if (partitionNameFileNameList.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+    Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+          final String bloomKey = new 
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()
+              .concat(new 
FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString());
+          partitionIDFileIDSortedStrings.add(bloomKey);
+          fileToKeyMap.put(bloomKey, partitionNameFileNamePair);
+        }
+    );
+
+    List<String> partitionIDFileIDStrings = new 
ArrayList<>(partitionIDFileIDSortedStrings);
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
hoodieRecordList =
+        getRecordsByKeys(partitionIDFileIDStrings, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
+        (timer.endTimer() / partitionIDFileIDStrings.size())));
+
+    Map<Pair<String, String>, ByteBuffer> partitionFileToBloomFilterMap = new 
HashMap<>();
+    for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry 
: hoodieRecordList) {
+      if (entry.getRight().isPresent()) {
+        final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
+            entry.getRight().get().getData().getBloomFilterMetadata();
+        if (bloomFilterMetadata.isPresent()) {
+          if (!bloomFilterMetadata.get().getIsDeleted()) {
+            
ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
+            
partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()), 
bloomFilterMetadata.get().getBloomFilter());
+          }
+        } else {
+          LOG.error("Meta index bloom filter missing for: " + 
fileToKeyMap.get(entry.getLeft()));
+        }
+      }
+    }
+    return partitionFileToBloomFilterMap;
+  }
+
+  @Override
+  public Map<Pair<String, String>, HoodieColumnStats> getColumnStats(final 
List<Pair<String, String>> partitionNameFileNameList, final String columnName)
+      throws HoodieMetadataException {
+    if (!isColumnStatsIndexEnabled) {
+      LOG.error("Metadata column stats index is disabled!");
+      return Collections.emptyMap();
+    }
+
+    Map<String, Pair<String, String>> columnStatKeyToFileNameMap = new 
HashMap<>();
+    TreeSet<String> sortedKeys = new TreeSet<>();
+    final String columnIndexStr = new 
ColumnIndexID(columnName).asBase64EncodedString();
+    for (Pair<String, String> partitionNameFileNamePair : 
partitionNameFileNameList) {
+      final String columnStatIndexKey = columnIndexStr
+          .concat(new 
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString())

Review comment:
       same comment as above. I guess we have a static method in 
HoodieMetadataPayload to construct the key. can we use the same. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieColumnRangeMetadata.java
##########
@@ -30,16 +28,21 @@
   private final String columnName;
   private final T minValue;
   private final T maxValue;
-  private final long numNulls;
-  private final PrimitiveStringifier stringifier;
+  private final long nullCount;
+  private final long valueCount;
+  private final long totalSize;
+  private final long totalUncompressedSize;
 
-  public HoodieColumnRangeMetadata(final String filePath, final String 
columnName, final T minValue, final T maxValue, final long numNulls, final 
PrimitiveStringifier stringifier) {
+  public HoodieColumnRangeMetadata(final String filePath, final String 
columnName, final T minValue, final T maxValue,

Review comment:
       should we rename this class to HoodieColumnStatsMetadata or something. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -233,38 +249,69 @@ private void initIfNeeded() {
   }
 
   /**
-   * Returns a new pair of readers to the base and log files.
+   * Get the file slice details for the given key in a partition.
+   *
+   * @param partitionName - Metadata partition name
+   * @param key           - Key to get the file slice for
+   * @return Partition and file slice pair for the given key
    */
-  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> 
openReadersIfNeeded(String key, String partitionName) {
-    return partitionReaders.computeIfAbsent(partitionName, k -> {
-      try {
-        final long baseFileOpenMs;
-        final long logScannerOpenMs;
-        HoodieFileReader baseFileReader = null;
-        HoodieMetadataMergedLogRecordReader logRecordScanner = null;
+  private Pair<String, FileSlice> getPartitionFileSlice(final String 
partitionName, final String key) {
+    // Metadata is in sync till the latest completed instant on the dataset
+    List<FileSlice> latestFileSlices =
+        
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
partitionName);
+
+    final FileSlice slice = 
latestFileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(key,
+        latestFileSlices.size()));
+    return Pair.of(partitionName, slice);
+  }
+
+  /**
+   * Get the latest file slices for the interested keys in a given partition.
+   *
+   * @param partitionName - Partition to get the file slices from
+   * @param keys          - Interested keys
+   * @return FileSlices for the keys
+   */
+  private Map<Pair<String, FileSlice>, List<String>> 
getPartitionFileSlices(final String partitionName, final List<String> keys) {
+    // Metadata is in sync till the latest completed instant on the dataset
+    List<FileSlice> latestFileSlices =

Review comment:
       something to follow up after this patch may be.
   I see we call getPartitionLatestMergedFileSlices and getPartitionFileSlices 
in few places and could be repeated as well. Can we cache the return value 
based on latest instant. If latest instant has not changed, then 
latestFileSlice is not going to change right. So, might as well used the cached 
copy if we have one. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -79,14 +98,53 @@ public static void deleteMetadataTable(String basePath, 
HoodieEngineContext cont
     }
   }
 
+  /**
+   * Convert commit action to metadata records for the enabled partition types.
+   *
+   * @param commitMetadata                      - Commit action metadata
+   * @param dataMetaClient                      - Meta client for the data 
table
+   * @param isMetaIndexColumnStatsForAllColumns - Do all columns need meta 
indexing?
+   * @param instantTime                         - Action instant time
+   * @return Map of partition to metadata records for the commit action
+   */
+  public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
convertMetadataToRecords(
+      HoodieEngineContext context, List<MetadataPartitionType> 
enabledPartitionTypes,
+      HoodieCommitMetadata commitMetadata, HoodieTableMetaClient 
dataMetaClient,
+      boolean isMetaIndexColumnStatsForAllColumns, String instantTime) {
+    final Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
partitionToRecordsMap = new HashMap<>();
+    final HoodieData<HoodieRecord> filesPartitionRecordsRDD = 
context.parallelize(
+        convertMetadataToFilesPartitionRecords(commitMetadata, instantTime), 
1);
+    partitionToRecordsMap.put(MetadataPartitionType.FILES, 
filesPartitionRecordsRDD);
+
+    if (enabledPartitionTypes.contains(MetadataPartitionType.BLOOM_FILTERS)) {
+      final List<HoodieRecord> metadataBloomFilterRecords = 
convertMetadataToBloomFilterRecords(commitMetadata,

Review comment:
       if this is happening in the driver, can we try to parallelize across 
executors. reading bloom could add some latency if there are too many files. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -110,55 +199,97 @@ private HoodieMetadataPayload(String key, int type, 
Map<String, HoodieMetadataFi
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionListRecord(List<String> partitions) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
-    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L,  false)));
+    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L, false)));
 
-    HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+    HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.getPartitionPath());
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+        fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
   /**
    * Create and return a {@code HoodieMetadataPayload} to save list of files 
within a partition.
    *
-   * @param partition The name of the partition
-   * @param filesAdded Mapping of files to their sizes for files which have 
been added to this partition
+   * @param partition    The name of the partition
+   * @param filesAdded   Mapping of files to their sizes for files which have 
been added to this partition
    * @param filesDeleted List of files which have been deleted from this 
partition
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionFilesRecord(String partition,
-                                                                               
Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+                                                                               
Option<Map<String, Long>> filesAdded,
+                                                                               
Option<List<String>> filesDeleted) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
     filesAdded.ifPresent(
         m -> m.forEach((filename, size) -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(size, false))));
     filesDeleted.ifPresent(
-        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L,  true))));
+        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L, true))));
 
-    HoodieKey key = new HoodieKey(partition, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+    HoodieKey key = new HoodieKey(partition, 
MetadataPartitionType.FILES.getPartitionPath());
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
+  /**
+   * Create bloom filter metadata record.
+   *
+   * @param partitionName - Partition name
+   * @param baseFileName  - Base file name for which the bloom filter needs to 
persisted
+   * @param timestamp     - Instant timestamp responsible for this record
+   * @param bloomFilter   - Bloom filter for the File
+   * @param isDeleted     - Is the bloom filter no more valid
+   * @return Metadata payload containing the fileID and its bloom filter record
+   */
+  public static HoodieRecord<HoodieMetadataPayload> 
createBloomFilterMetadataRecord(final String partitionName,
+                                                                               
     final String baseFileName,
+                                                                               
     final String timestamp,
+                                                                               
     final ByteBuffer bloomFilter,
+                                                                               
     final boolean isDeleted) {
+    ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
+            && FSUtils.isBaseFile(new Path(baseFileName)),
+        "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
+    final String bloomFilterKey = new 
PartitionIndexID(partitionName).asBase64EncodedString()

Review comment:
       can we use the same name everywhere. may be bloomFilterIndexKey. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
##########
@@ -233,38 +249,69 @@ private void initIfNeeded() {
   }
 
   /**
-   * Returns a new pair of readers to the base and log files.
+   * Get the file slice details for the given key in a partition.
+   *
+   * @param partitionName - Metadata partition name
+   * @param key           - Key to get the file slice for
+   * @return Partition and file slice pair for the given key
    */
-  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> 
openReadersIfNeeded(String key, String partitionName) {
-    return partitionReaders.computeIfAbsent(partitionName, k -> {
-      try {
-        final long baseFileOpenMs;
-        final long logScannerOpenMs;
-        HoodieFileReader baseFileReader = null;
-        HoodieMetadataMergedLogRecordReader logRecordScanner = null;
+  private Pair<String, FileSlice> getPartitionFileSlice(final String 
partitionName, final String key) {

Review comment:
       I guess this method is not used anywhere. can you check.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -124,14 +182,111 @@ public static void deleteMetadataTable(String basePath, 
HoodieEngineContext cont
     return records;
   }
 
+  /**
+   * Convert commit action metadata to bloom filter records.
+   *
+   * @param commitMetadata - Commit action metadata
+   * @param dataMetaClient - Meta client for the data table
+   * @param instantTime    - Action instant time
+   * @return List of metadata table records
+   */
+  public static List<HoodieRecord> 
convertMetadataToBloomFilterRecords(HoodieCommitMetadata commitMetadata,
+                                                                       
HoodieTableMetaClient dataMetaClient,
+                                                                       String 
instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    commitMetadata.getPartitionToWriteStats().forEach((partitionStatName, 
writeStats) -> {

Review comment:
       if the commit metadata has only log files, we won't return anything from 
this method right. can you confirm that our writes still go through? or do we 
have any check that return list should match num files in commit metadata or 
atleast not empty ? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -319,9 +616,88 @@ private static void 
processRollbackMetadata(HoodieActiveTimeline metadataTableTi
     return records;
   }
 
+  /**
+   * Convert rollback action metadata to bloom filter index records.
+   */
+  private static List<HoodieRecord> 
convertFilesToBloomFilterRecords(HoodieEngineContext engineContext,
+                                                                     
HoodieTableMetaClient dataMetaClient,
+                                                                     
Map<String, List<String>> partitionToDeletedFiles,
+                                                                     
Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                     String 
instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> 
deletedFileList.forEach(deletedFile -> {
+      if (!FSUtils.isBaseFile(new Path(deletedFile))) {
+        return;
+      }
+
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
+      records.add(HoodieMetadataPayload.createBloomFilterMetadataRecord(
+          partition, deletedFile, instantTime, ByteBuffer.allocate(0), true));
+    }));
+
+    partitionToAppendedFiles.forEach((partitionName, appendedFileMap) -> {
+      final String partition = partitionName.equals(EMPTY_PARTITION_NAME) ? 
NON_PARTITIONED_NAME : partitionName;
+      appendedFileMap.forEach((appendedFile, length) -> {
+        if (!FSUtils.isBaseFile(new Path(appendedFile))) {
+          return;
+        }
+        final String pathWithPartition = partitionName + "/" + appendedFile;
+        final Path appendedFilePath = new Path(dataMetaClient.getBasePath(), 
pathWithPartition);
+        try {
+          HoodieFileReader<IndexedRecord> fileReader =
+              
HoodieFileReaderFactory.getFileReader(dataMetaClient.getHadoopConf(), 
appendedFilePath);
+          final BloomFilter fileBloomFilter = fileReader.readBloomFilter();
+          if (fileBloomFilter == null) {
+            LOG.error("Failed to read bloom filter for " + appendedFilePath);
+            return;
+          }
+          ByteBuffer bloomByteBuffer = 
ByteBuffer.wrap(fileBloomFilter.serializeToString().getBytes());
+          HoodieRecord record = 
HoodieMetadataPayload.createBloomFilterMetadataRecord(
+              partition, appendedFile, instantTime, bloomByteBuffer, false);
+          records.add(record);
+          fileReader.close();
+        } catch (IOException e) {
+          LOG.error("Failed to get bloom filter for file: " + 
appendedFilePath);
+        }
+      });
+    });
+    return records;
+  }
+
+  /**
+   * Convert rollback action metadata to column stats index records.
+   */
+  private static List<HoodieRecord> 
convertFilesToColumnStatsRecords(HoodieEngineContext engineContext,
+                                                                     
HoodieTableMetaClient datasetMetaClient,
+                                                                     
Map<String, List<String>> partitionToDeletedFiles,
+                                                                     
Map<String, Map<String, Long>> partitionToAppendedFiles,
+                                                                     String 
instantTime) {
+    List<HoodieRecord> records = new LinkedList<>();
+    List<String> latestColumns = getLatestColumns(datasetMetaClient);
+    partitionToDeletedFiles.forEach((partitionName, deletedFileList) -> 
deletedFileList.forEach(deletedFile -> {

Review comment:
       I did leave this comment for bloom filter as well. Since reading col 
stats could be latency sensitive if large no of files in a commit metadata, is 
there a possibility to parallelize the reads across diff files? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -399,4 +775,116 @@ public static int mapRecordKeyToFileGroupIndex(String 
recordKey, int numFileGrou
     return fileSliceStream.sorted((s1, s2) -> 
s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList());
   }
 
+  public static List<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
+                                                                       
HoodieEngineContext engineContext,
+                                                                       
HoodieTableMetaClient dataMetaClient,
+                                                                       boolean 
isMetaIndexColumnStatsForAllColumns,
+                                                                       String 
instantTime) {
+
+    try {
+      List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+          .flatMap(entry -> entry.stream()).collect(Collectors.toList());
+      return 
HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, 
dataMetaClient, allWriteStats,
+          isMetaIndexColumnStatsForAllColumns);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate column stats records for 
metadata table ", e);
+    }
+  }
+
+  /**
+   * Create column stats from write status.
+   *
+   * @param engineContext                       - Enging context
+   * @param datasetMetaClient                   - Dataset meta client
+   * @param allWriteStats                       - Write status to convert
+   * @param isMetaIndexColumnStatsForAllColumns - Are all columns enabled for 
indexing
+   */
+  public static List<HoodieRecord> 
createColumnStatsFromWriteStats(HoodieEngineContext engineContext,
+                                                                   
HoodieTableMetaClient datasetMetaClient,
+                                                                   
List<HoodieWriteStat> allWriteStats,
+                                                                   boolean 
isMetaIndexColumnStatsForAllColumns) throws Exception {
+    if (allWriteStats.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    List<HoodieWriteStat> prunedWriteStats = 
allWriteStats.stream().filter(writeStat -> {
+      return !(writeStat instanceof HoodieDeltaWriteStat);
+    }).collect(Collectors.toList());
+    if (prunedWriteStats.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    return engineContext.flatMap(prunedWriteStats,
+        writeStat -> translateWriteStatToColumnStats(writeStat, 
datasetMetaClient,
+            getLatestColumns(datasetMetaClient, 
isMetaIndexColumnStatsForAllColumns)),
+        prunedWriteStats.size());
+  }
+
+  /**
+   * Get the latest columns for the table for column stats indexing.
+   *
+   * @param datasetMetaClient                   - Data table meta client
+   * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing 
enabled for all columns
+   */
+  private static List<String> getLatestColumns(HoodieTableMetaClient 
datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
+    if (!isMetaIndexColumnStatsForAllColumns
+        || 
datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants()
 < 1) {
+      return 
Collections.singletonList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp());
+    }
+
+    TableSchemaResolver schemaResolver = new 
TableSchemaResolver(datasetMetaClient);
+    // consider nested fields as well. if column stats is enabled only for a 
subset of columns,
+    // directly use them instead of all columns from the latest table schema
+    try {
+      return schemaResolver.getTableAvroSchema().getFields().stream()
+          .map(entry -> entry.name()).collect(Collectors.toList());
+    } catch (Exception e) {
+      throw new HoodieException("Failed to get latest columns for " + 
datasetMetaClient.getBasePath());
+    }
+  }
+
+  private static List<String> getLatestColumns(HoodieTableMetaClient 
datasetMetaClient) {
+    return getLatestColumns(datasetMetaClient, false);
+  }
+
+  public static Stream<HoodieRecord> 
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
+                                                                     
HoodieTableMetaClient datasetMetaClient,
+                                                                     
List<String> latestColumns) {
+    return getColumnStats(writeStat.getPartitionPath(), writeStat.getPath(), 
datasetMetaClient, latestColumns, false);
+
+  }
+
+  public static Stream<HoodieRecord> getColumnStats(final String 
partitionPath, final String filePathWithPartition,

Review comment:
       can you please revisit access specifiers for new methods added. intellij 
might show yellow color against these. don't think these are required to be 
public. 
   

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
##########
@@ -399,4 +775,116 @@ public static int mapRecordKeyToFileGroupIndex(String 
recordKey, int numFileGrou
     return fileSliceStream.sorted((s1, s2) -> 
s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList());
   }
 
+  public static List<HoodieRecord> 
convertMetadataToColumnStatsRecords(HoodieCommitMetadata commitMetadata,
+                                                                       
HoodieEngineContext engineContext,
+                                                                       
HoodieTableMetaClient dataMetaClient,
+                                                                       boolean 
isMetaIndexColumnStatsForAllColumns,
+                                                                       String 
instantTime) {
+
+    try {
+      List<HoodieWriteStat> allWriteStats = 
commitMetadata.getPartitionToWriteStats().values().stream()
+          .flatMap(entry -> entry.stream()).collect(Collectors.toList());
+      return 
HoodieTableMetadataUtil.createColumnStatsFromWriteStats(engineContext, 
dataMetaClient, allWriteStats,
+          isMetaIndexColumnStatsForAllColumns);
+    } catch (Exception e) {
+      throw new HoodieException("Failed to generate column stats records for 
metadata table ", e);
+    }
+  }
+
+  /**
+   * Create column stats from write status.
+   *
+   * @param engineContext                       - Enging context
+   * @param datasetMetaClient                   - Dataset meta client
+   * @param allWriteStats                       - Write status to convert
+   * @param isMetaIndexColumnStatsForAllColumns - Are all columns enabled for 
indexing
+   */
+  public static List<HoodieRecord> 
createColumnStatsFromWriteStats(HoodieEngineContext engineContext,
+                                                                   
HoodieTableMetaClient datasetMetaClient,
+                                                                   
List<HoodieWriteStat> allWriteStats,
+                                                                   boolean 
isMetaIndexColumnStatsForAllColumns) throws Exception {
+    if (allWriteStats.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    List<HoodieWriteStat> prunedWriteStats = 
allWriteStats.stream().filter(writeStat -> {
+      return !(writeStat instanceof HoodieDeltaWriteStat);
+    }).collect(Collectors.toList());
+    if (prunedWriteStats.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    return engineContext.flatMap(prunedWriteStats,
+        writeStat -> translateWriteStatToColumnStats(writeStat, 
datasetMetaClient,
+            getLatestColumns(datasetMetaClient, 
isMetaIndexColumnStatsForAllColumns)),
+        prunedWriteStats.size());
+  }
+
+  /**
+   * Get the latest columns for the table for column stats indexing.
+   *
+   * @param datasetMetaClient                   - Data table meta client
+   * @param isMetaIndexColumnStatsForAllColumns - Is column stats indexing 
enabled for all columns
+   */
+  private static List<String> getLatestColumns(HoodieTableMetaClient 
datasetMetaClient, boolean isMetaIndexColumnStatsForAllColumns) {
+    if (!isMetaIndexColumnStatsForAllColumns
+        || 
datasetMetaClient.getCommitsTimeline().filterCompletedInstants().countInstants()
 < 1) {
+      return 
Collections.singletonList(datasetMetaClient.getTableConfig().getRecordKeyFieldProp());
+    }
+
+    TableSchemaResolver schemaResolver = new 
TableSchemaResolver(datasetMetaClient);
+    // consider nested fields as well. if column stats is enabled only for a 
subset of columns,

Review comment:
       do we have a tracking jira for this.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,12 +156,123 @@ protected BaseTableMetadata(HoodieEngineContext 
engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final String partitionName, final 
String fileName)
+      throws HoodieMetadataException {
+    if (!isBloomFilterIndexEnabled) {
+      LOG.error("Metadata bloom filter index is disabled!");
+      return Option.empty();
+    }
+
+    final Pair<String, String> partitionFileName = Pair.of(partitionName, 
fileName);
+    Map<Pair<String, String>, ByteBuffer> bloomFilters = 
getBloomFilters(Collections.singletonList(partitionFileName));
+    if (bloomFilters.isEmpty()) {
+      LOG.error("Meta index: missing bloom filter for partition: " + 
partitionName + ", file: " + fileName);
+      return Option.empty();
+    }
+
+    ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+    return Option.of(bloomFilters.get(partitionFileName));
+  }
+
+  @Override
+  public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList)
+      throws HoodieMetadataException {
+    if (!isBloomFilterIndexEnabled) {
+      LOG.error("Metadata bloom filter index is disabled!");
+      return Collections.emptyMap();
+    }
+    if (partitionNameFileNameList.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+    Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+          final String bloomKey = new 
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()
+              .concat(new 
FileIndexID(partitionNameFileNamePair.getRight()).asBase64EncodedString());
+          partitionIDFileIDSortedStrings.add(bloomKey);
+          fileToKeyMap.put(bloomKey, partitionNameFileNamePair);
+        }
+    );
+
+    List<String> partitionIDFileIDStrings = new 
ArrayList<>(partitionIDFileIDSortedStrings);
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
hoodieRecordList =
+        getRecordsByKeys(partitionIDFileIDStrings, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_BLOOM_FILTERS_METADATA_STR,
+        (timer.endTimer() / partitionIDFileIDStrings.size())));
+
+    Map<Pair<String, String>, ByteBuffer> partitionFileToBloomFilterMap = new 
HashMap<>();
+    for (final Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry 
: hoodieRecordList) {
+      if (entry.getRight().isPresent()) {
+        final Option<HoodieMetadataBloomFilter> bloomFilterMetadata =
+            entry.getRight().get().getData().getBloomFilterMetadata();
+        if (bloomFilterMetadata.isPresent()) {
+          if (!bloomFilterMetadata.get().getIsDeleted()) {
+            
ValidationUtils.checkState(fileToKeyMap.containsKey(entry.getLeft()));
+            
partitionFileToBloomFilterMap.put(fileToKeyMap.get(entry.getLeft()), 
bloomFilterMetadata.get().getBloomFilter());
+          }
+        } else {
+          LOG.error("Meta index bloom filter missing for: " + 
fileToKeyMap.get(entry.getLeft()));
+        }
+      }
+    }
+    return partitionFileToBloomFilterMap;
+  }
+
+  @Override
+  public Map<Pair<String, String>, HoodieColumnStats> getColumnStats(final 
List<Pair<String, String>> partitionNameFileNameList, final String columnName)
+      throws HoodieMetadataException {
+    if (!isColumnStatsIndexEnabled) {
+      LOG.error("Metadata column stats index is disabled!");
+      return Collections.emptyMap();
+    }
+
+    Map<String, Pair<String, String>> columnStatKeyToFileNameMap = new 
HashMap<>();
+    TreeSet<String> sortedKeys = new TreeSet<>();
+    final String columnIndexStr = new 
ColumnIndexID(columnName).asBase64EncodedString();
+    for (Pair<String, String> partitionNameFileNamePair : 
partitionNameFileNameList) {
+      final String columnStatIndexKey = columnIndexStr

Review comment:
       guess we are using "Index" everywhere. so, can we add "s" to the name. 
(columnStat**s**IndexKey)

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,12 +156,123 @@ protected BaseTableMetadata(HoodieEngineContext 
engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final String partitionName, final 
String fileName)
+      throws HoodieMetadataException {
+    if (!isBloomFilterIndexEnabled) {
+      LOG.error("Metadata bloom filter index is disabled!");
+      return Option.empty();
+    }
+
+    final Pair<String, String> partitionFileName = Pair.of(partitionName, 
fileName);
+    Map<Pair<String, String>, ByteBuffer> bloomFilters = 
getBloomFilters(Collections.singletonList(partitionFileName));
+    if (bloomFilters.isEmpty()) {
+      LOG.error("Meta index: missing bloom filter for partition: " + 
partitionName + ", file: " + fileName);
+      return Option.empty();
+    }
+
+    ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+    return Option.of(bloomFilters.get(partitionFileName));
+  }
+
+  @Override
+  public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList)
+      throws HoodieMetadataException {
+    if (!isBloomFilterIndexEnabled) {
+      LOG.error("Metadata bloom filter index is disabled!");
+      return Collections.emptyMap();
+    }
+    if (partitionNameFileNameList.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+    Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+          final String bloomKey = new 
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()

Review comment:
       can we use static method getBloomFilterIndexKey() in 
HoodieMetadataPayload to construct the key where ever needed. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
##########
@@ -110,55 +199,97 @@ private HoodieMetadataPayload(String key, int type, 
Map<String, HoodieMetadataFi
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionListRecord(List<String> partitions) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
-    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L,  false)));
+    partitions.forEach(partition -> fileInfo.put(partition, new 
HoodieMetadataFileInfo(0L, false)));
 
-    HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), PARTITION_LIST, fileInfo);
+    HoodieKey key = new HoodieKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.getPartitionPath());
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_PARTITION_LIST,
+        fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
   /**
    * Create and return a {@code HoodieMetadataPayload} to save list of files 
within a partition.
    *
-   * @param partition The name of the partition
-   * @param filesAdded Mapping of files to their sizes for files which have 
been added to this partition
+   * @param partition    The name of the partition
+   * @param filesAdded   Mapping of files to their sizes for files which have 
been added to this partition
    * @param filesDeleted List of files which have been deleted from this 
partition
    */
   public static HoodieRecord<HoodieMetadataPayload> 
createPartitionFilesRecord(String partition,
-                                                                               
Option<Map<String, Long>> filesAdded, Option<List<String>> filesDeleted) {
+                                                                               
Option<Map<String, Long>> filesAdded,
+                                                                               
Option<List<String>> filesDeleted) {
     Map<String, HoodieMetadataFileInfo> fileInfo = new HashMap<>();
     filesAdded.ifPresent(
         m -> m.forEach((filename, size) -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(size, false))));
     filesDeleted.ifPresent(
-        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L,  true))));
+        m -> m.forEach(filename -> fileInfo.put(filename, new 
HoodieMetadataFileInfo(0L, true))));
 
-    HoodieKey key = new HoodieKey(partition, 
MetadataPartitionType.FILES.partitionPath());
-    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), FILE_LIST, fileInfo);
+    HoodieKey key = new HoodieKey(partition, 
MetadataPartitionType.FILES.getPartitionPath());
+    HoodieMetadataPayload payload = new 
HoodieMetadataPayload(key.getRecordKey(), METADATA_TYPE_FILE_LIST, fileInfo);
     return new HoodieRecord<>(key, payload);
   }
 
+  /**
+   * Create bloom filter metadata record.
+   *
+   * @param partitionName - Partition name
+   * @param baseFileName  - Base file name for which the bloom filter needs to 
persisted
+   * @param timestamp     - Instant timestamp responsible for this record
+   * @param bloomFilter   - Bloom filter for the File
+   * @param isDeleted     - Is the bloom filter no more valid
+   * @return Metadata payload containing the fileID and its bloom filter record
+   */
+  public static HoodieRecord<HoodieMetadataPayload> 
createBloomFilterMetadataRecord(final String partitionName,
+                                                                               
     final String baseFileName,
+                                                                               
     final String timestamp,
+                                                                               
     final ByteBuffer bloomFilter,
+                                                                               
     final boolean isDeleted) {
+    ValidationUtils.checkArgument(!baseFileName.contains(Path.SEPARATOR)
+            && FSUtils.isBaseFile(new Path(baseFileName)),
+        "Invalid base file '" + baseFileName + "' for MetaIndexBloomFilter!");
+    final String bloomFilterKey = new 
PartitionIndexID(partitionName).asBase64EncodedString()
+        .concat(new FileIndexID(baseFileName).asBase64EncodedString());
+    HoodieKey key = new HoodieKey(bloomFilterKey, 
MetadataPartitionType.BLOOM_FILTERS.getPartitionPath());
+
+    // TODO: Get the bloom filter type from the file
+    HoodieMetadataBloomFilter metadataBloomFilter =
+        new HoodieMetadataBloomFilter(BloomFilterTypeCode.DYNAMIC_V0.name(),
+            timestamp, bloomFilter, isDeleted);
+    HoodieMetadataPayload metadataPayload = new 
HoodieMetadataPayload(key.getRecordKey(),
+        HoodieMetadataPayload.METADATA_TYPE_BLOOM_FILTER, metadataBloomFilter);
+    return new HoodieRecord<>(key, metadataPayload);
+  }
+
   @Override
   public HoodieMetadataPayload preCombine(HoodieMetadataPayload 
previousRecord) {
     ValidationUtils.checkArgument(previousRecord.type == type,
-        "Cannot combine " + previousRecord.type  + " with " + type);
-
-    Map<String, HoodieMetadataFileInfo> combinedFileInfo = null;
+        "Cannot combine " + previousRecord.type + " with " + type);
 
     switch (type) {
-      case PARTITION_LIST:
-      case FILE_LIST:
-        combinedFileInfo = combineFilesystemMetadata(previousRecord);
-        break;
+      case METADATA_TYPE_PARTITION_LIST:
+      case METADATA_TYPE_FILE_LIST:
+        Map<String, HoodieMetadataFileInfo> combinedFileInfo = 
combineFilesystemMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, combinedFileInfo);
+      case METADATA_TYPE_BLOOM_FILTER:
+        HoodieMetadataBloomFilter combineBloomFilterMetadata = 
combineBloomFilterMetadata(previousRecord);
+        return new HoodieMetadataPayload(key, type, 
combineBloomFilterMetadata);
+      case METADATA_TYPE_COLUMN_STATS:
+        return new HoodieMetadataPayload(key, type, 
combineColumnStatsMetadatat(previousRecord));
       default:
         throw new HoodieMetadataException("Unknown type of 
HoodieMetadataPayload: " + type);
     }
+  }
+
+  private HoodieMetadataBloomFilter 
combineBloomFilterMetadata(HoodieMetadataPayload previousRecord) {
+    return this.bloomFilterMetadata;
+  }
 
-    return new HoodieMetadataPayload(key, type, combinedFileInfo);
+  private HoodieColumnStats combineColumnStatsMetadatat(HoodieMetadataPayload 
previousRecord) {
+    return this.columnStatMetadata;

Review comment:
       actually my bad. shouldn't we be combining at a file level? 
   for eg:
   metadata record1:
   file1: stats1_1
   file2: stats2_1
   metadata record2:
   file1: stats1_2
   file3: stats3_1
   
   what will be final value after combining both these records?
   

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
##########
@@ -146,12 +156,123 @@ protected BaseTableMetadata(HoodieEngineContext 
engineContext, HoodieMetadataCon
         .getAllFilesInPartitions(partitions);
   }
 
+  @Override
+  public Option<ByteBuffer> getBloomFilter(final String partitionName, final 
String fileName)
+      throws HoodieMetadataException {
+    if (!isBloomFilterIndexEnabled) {
+      LOG.error("Metadata bloom filter index is disabled!");
+      return Option.empty();
+    }
+
+    final Pair<String, String> partitionFileName = Pair.of(partitionName, 
fileName);
+    Map<Pair<String, String>, ByteBuffer> bloomFilters = 
getBloomFilters(Collections.singletonList(partitionFileName));
+    if (bloomFilters.isEmpty()) {
+      LOG.error("Meta index: missing bloom filter for partition: " + 
partitionName + ", file: " + fileName);
+      return Option.empty();
+    }
+
+    ValidationUtils.checkState(bloomFilters.containsKey(partitionFileName));
+    return Option.of(bloomFilters.get(partitionFileName));
+  }
+
+  @Override
+  public Map<Pair<String, String>, ByteBuffer> getBloomFilters(final 
List<Pair<String, String>> partitionNameFileNameList)
+      throws HoodieMetadataException {
+    if (!isBloomFilterIndexEnabled) {
+      LOG.error("Metadata bloom filter index is disabled!");
+      return Collections.emptyMap();
+    }
+    if (partitionNameFileNameList.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Set<String> partitionIDFileIDSortedStrings = new TreeSet<>();
+    Map<String, Pair<String, String>> fileToKeyMap = new HashMap<>();
+    partitionNameFileNameList.forEach(partitionNameFileNamePair -> {
+          final String bloomKey = new 
PartitionIndexID(partitionNameFileNamePair.getLeft()).asBase64EncodedString()

Review comment:
       or bloomFilterIndexKey




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to