This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bf44c01b56 [HUDI-6821] Support multiple base file formats in Hudi 
table (#9761)
8bf44c01b56 is described below

commit 8bf44c01b56dd3afe5323dc7566971cee2e46d50
Author: Sagar Sumit <sagarsumi...@gmail.com>
AuthorDate: Thu Oct 26 09:27:02 2023 +0530

    [HUDI-6821] Support multiple base file formats in Hudi table (#9761)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  11 +-
 .../java/org/apache/hudi/io/HoodieWriteHandle.java |   3 +-
 .../java/org/apache/hudi/table/HoodieTable.java    |  10 +-
 .../table/action/bootstrap/BootstrapUtils.java     |   9 +-
 ...sistentHashingBucketClusteringPlanStrategy.java |   4 +-
 .../rollback/ListingBasedRollbackStrategy.java     |   6 +-
 .../table/upgrade/ZeroToOneUpgradeHandler.java     |   7 +-
 .../io/storage/row/HoodieRowDataCreateHandle.java  |   4 +-
 .../client/TestHoodieJavaWriteClientInsert.java    |   4 +-
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |   5 -
 .../TestHoodieJavaClientOnCopyOnWriteStorage.java  |   3 +-
 .../commit/TestJavaCopyOnWriteActionExecutor.java  |   4 +-
 .../testutils/HoodieJavaClientTestHarness.java     |   4 +
 .../SparkBootstrapCommitActionExecutor.java        |   2 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  14 +-
 .../table/action/bootstrap/TestBootstrapUtils.java |  12 +-
 .../commit/TestCopyOnWriteActionExecutor.java      |   5 +-
 .../TestHoodieSparkMergeOnReadTableRollback.java   |   2 +-
 .../hudi/testutils/HoodieClientTestBase.java       |   5 +
 .../testutils/HoodieSparkClientTestHarness.java    |   5 -
 .../apache/hudi/common/model/HoodieFileFormat.java |   9 +
 .../hudi/common/table/HoodieTableConfig.java       |  10 +
 .../hudi/common/table/HoodieTableMetaClient.java   |  19 +-
 .../org/apache/hudi/common/util/BaseFileUtils.java |   5 -
 .../org/apache/hudi/common/fs/TestFSUtils.java     |  27 ++
 .../hudi/common/testutils/HoodieTestTable.java     |   3 +-
 .../org/apache/hudi/BaseFileOnlyRelation.scala     |   4 +-
 .../main/scala/org/apache/hudi/DefaultSource.scala |  52 ++--
 .../scala/org/apache/hudi/HoodieBaseRelation.scala | 107 +++++++-
 ...tils.scala => HoodieSparkFileFormatUtils.scala} |  35 +--
 .../scala/org/apache/hudi/HoodieWriterUtils.scala  |   9 +-
 .../hudi/MergeOnReadIncrementalRelation.scala      |   4 +-
 .../apache/hudi/MergeOnReadSnapshotRelation.scala  |  92 -------
 .../sql/catalyst/catalog/HoodieCatalogTable.scala  |   4 +-
 .../datasources/HoodieMultipleBaseFileFormat.scala | 278 +++++++++++++++++++++
 .../spark/sql/hudi/ProvidesHoodieConfig.scala      |   2 +-
 .../RepairMigratePartitionMetaProcedure.scala      |   2 +-
 .../org/apache/hudi/functional/TestBootstrap.java  |   8 +-
 .../apache/hudi/functional/TestOrcBootstrap.java   |   8 +-
 .../apache/hudi/testutils/DataSourceTestUtils.java |  20 +-
 .../TestHoodieMultipleBaseFileFormat.scala         | 123 +++++++++
 .../datasources/Spark32NestedSchemaPruning.scala   |   3 +-
 .../hudi/utilities/streamer/HoodieStreamer.java    |  10 +-
 43 files changed, 712 insertions(+), 241 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index cc3876338cc..5ae7ab25fbd 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -219,7 +219,7 @@ public class HoodieWriteConfig extends HoodieConfig {
           + "the timeline as an immutable log relying only on atomic writes 
for object storage.");
 
   public static final ConfigProperty<HoodieFileFormat> BASE_FILE_FORMAT = 
ConfigProperty
-      .key("hoodie.table.base.file.format")
+      .key("hoodie.base.file.format")
       .defaultValue(HoodieFileFormat.PARQUET)
       .withValidValues(HoodieFileFormat.PARQUET.name(), 
HoodieFileFormat.ORC.name(), HoodieFileFormat.HFILE.name())
       .withAlternatives("hoodie.table.ro.file.format")
@@ -1198,6 +1198,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(BASE_PATH);
   }
 
+  public HoodieFileFormat getBaseFileFormat() {
+    return HoodieFileFormat.valueOf(getStringOrDefault(BASE_FILE_FORMAT));
+  }
+
   public HoodieRecordMerger getRecordMerger() {
     List<String> mergers = 
StringUtils.split(getStringOrDefault(RECORD_MERGER_IMPLS), ",").stream()
         .map(String::trim)
@@ -2705,6 +2709,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withBaseFileFormat(String baseFileFormat) {
+      writeConfig.setValue(BASE_FILE_FORMAT, 
HoodieFileFormat.valueOf(baseFileFormat).name());
+      return this;
+    }
+
     public Builder withSchema(String schemaStr) {
       writeConfig.setValue(AVRO_SCHEMA_STRING, schemaStr);
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
index 8c76e322b09..9d1bb6d511e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java
@@ -122,8 +122,7 @@ public abstract class HoodieWriteHandle<T, I, K, O> extends 
HoodieIOHandle<T, I,
       throw new HoodieIOException("Failed to make dir " + path, e);
     }
 
-    return new Path(path.toString(), FSUtils.makeBaseFileName(instantTime, 
writeToken, fileId,
-        
hoodieTable.getMetaClient().getTableConfig().getBaseFileFormat().getFileExtension()));
+    return new Path(path.toString(), FSUtils.makeBaseFileName(instantTime, 
writeToken, fileId, hoodieTable.getBaseFileExtension()));
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 8cc1dcf924c..36a5e6de21a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -217,7 +217,7 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
   public abstract HoodieWriteMetadata<O> delete(HoodieEngineContext context, 
String instantTime, K keys);
 
   /**
-   * Delete records from Hoodie table based on {@link HoodieKey} and {@link 
HoodieRecordLocation} specified in
+   * Delete records from Hoodie table based on {@link HoodieKey} and {@link 
org.apache.hudi.common.model.HoodieRecordLocation} specified in
    * preppedRecords.
    *
    * @param context {@link HoodieEngineContext}.
@@ -874,13 +874,13 @@ public abstract class HoodieTable<T, I, K, O> implements 
Serializable {
   }
 
   public HoodieFileFormat getBaseFileFormat() {
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    if (tableConfig.isMultipleBaseFileFormatsEnabled() && 
config.contains(HoodieWriteConfig.BASE_FILE_FORMAT)) {
+      return config.getBaseFileFormat();
+    }
     return metaClient.getTableConfig().getBaseFileFormat();
   }
 
-  public HoodieFileFormat getLogFileFormat() {
-    return metaClient.getTableConfig().getLogFileFormat();
-  }
-
   public Option<HoodieFileFormat> getPartitionMetafileFormat() {
     return metaClient.getTableConfig().getPartitionMetafileFormat();
   }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
index 3e9e6b42a61..05f71454ed0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapUtils.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.common.bootstrap.FileStatusUtils;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.collection.Pair;
 
@@ -45,16 +46,16 @@ public class BootstrapUtils {
 
   /**
    * Returns leaf folders with files under a path.
-   * @param metaClient Hoodie table metadata client
+   * @param baseFileFormat Hoodie base file format
    * @param fs  File System
    * @param context JHoodieEngineContext
    * @return list of partition paths with files under them.
    * @throws IOException
    */
-  public static List<Pair<String, List<HoodieFileStatus>>> 
getAllLeafFoldersWithFiles(HoodieTableMetaClient metaClient,
-      FileSystem fs, String basePathStr, HoodieEngineContext context) throws 
IOException {
+  public static List<Pair<String, List<HoodieFileStatus>>> 
getAllLeafFoldersWithFiles(HoodieFileFormat baseFileFormat,
+                                                                               
       FileSystem fs, String basePathStr, HoodieEngineContext context) throws 
IOException {
     final Path basePath = new Path(basePathStr);
-    final String baseFileExtension = 
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
+    final String baseFileExtension = baseFileFormat.getFileExtension();
     final Map<Integer, List<String>> levelToPartitions = new HashMap<>();
     final Map<String, List<HoodieFileStatus>> partitionToFiles = new 
HashMap<>();
     PathFilter filePathFilter = getFilePathFilter(baseFileExtension);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
index af3c00d3d8e..27fea59fa9f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/cluster/strategy/BaseConsistentHashingBucketClusteringPlanStrategy.java
@@ -329,12 +329,12 @@ public abstract class 
BaseConsistentHashingBucketClusteringPlanStrategy<T extend
   }
 
   private long getSplitSize() {
-    HoodieFileFormat format = 
getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat();
+    HoodieFileFormat format = getHoodieTable().getBaseFileFormat();
     return (long) (getWriteConfig().getMaxFileSize(format) * 
getWriteConfig().getBucketSplitThreshold());
   }
 
   private long getMergeSize() {
-    HoodieFileFormat format = 
getHoodieTable().getMetaClient().getTableConfig().getBaseFileFormat();
+    HoodieFileFormat format = getHoodieTable().getBaseFileFormat();
     return (long) (getWriteConfig().getMaxFileSize(format) * 
getWriteConfig().getBucketMergeThreshold());
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
index 74e60b35bd0..2b383c1d246 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
@@ -95,7 +95,7 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
       context.setJobStatus(this.getClass().getSimpleName(), "Creating Listing 
Rollback Plan: " + config.getTableName());
 
       HoodieTableType tableType = table.getMetaClient().getTableType();
-      String baseFileExtension = getBaseFileExtension(metaClient);
+      String baseFileExtension = table.getBaseFileExtension();
       Option<HoodieCommitMetadata> commitMetadataOptional = 
getHoodieCommitMetadata(metaClient, instantToRollback);
       Boolean isCommitMetadataCompleted = 
checkCommitMetadataCompleted(instantToRollback, commitMetadataOptional);
       AtomicBoolean isCompaction = new AtomicBoolean(false);
@@ -191,10 +191,6 @@ public class ListingBasedRollbackStrategy implements 
BaseRollbackPlanActionExecu
     return 
metaClient.getFs().listStatus(FSUtils.getPartitionPath(config.getBasePath(), 
partitionPath), filter);
   }
 
-  private String getBaseFileExtension(HoodieTableMetaClient metaClient) {
-    return metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
-  }
-
   @NotNull
   private List<HoodieRollbackRequest> getHoodieRollbackRequests(String 
partitionPath, FileStatus[] filesToDeletedStatus) {
     return Arrays.stream(filesToDeletedStatus)
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
index 831e11efae7..772afe71b02 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ZeroToOneUpgradeHandler.java
@@ -18,8 +18,6 @@
 
 package org.apache.hudi.table.upgrade;
 
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.model.HoodieRollbackRequest;
 import org.apache.hudi.common.HoodieRollbackStat;
 import org.apache.hudi.common.config.ConfigProperty;
@@ -39,6 +37,9 @@ import 
org.apache.hudi.table.action.rollback.ListingBasedRollbackStrategy;
 import org.apache.hudi.table.marker.WriteMarkers;
 import org.apache.hudi.table.marker.WriteMarkersFactory;
 
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -137,6 +138,6 @@ public class ZeroToOneUpgradeHandler implements 
UpgradeHandler {
     String deltaInstant = FSUtils.getDeltaCommitTimeFromLogPath(logPath);
     String writeToken = FSUtils.getWriteTokenFromLogPath(logPath);
 
-    return FSUtils.makeBaseFileName(deltaInstant, writeToken, fileId, 
table.getBaseFileFormat().getFileExtension());
+    return FSUtils.makeBaseFileName(deltaInstant, writeToken, fileId, 
table.getBaseFileExtension());
   }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index 6cff94068d6..475d0efc582 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -28,7 +28,6 @@ import org.apache.hudi.common.model.HoodieRecordDelegate;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
-import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -195,9 +194,8 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
     } catch (IOException e) {
       throw new HoodieIOException("Failed to make dir " + path, e);
     }
-    HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
     return new Path(path.toString(), FSUtils.makeBaseFileName(instantTime, 
getWriteToken(), fileId,
-        tableConfig.getBaseFileFormat().getFileExtension()));
+        table.getBaseFileExtension()));
   }
 
   /**
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
index 02c407ba02d..ea13939ad2e 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java
@@ -148,7 +148,7 @@ public class TestHoodieJavaWriteClientInsert extends 
HoodieJavaClientTestHarness
 
     HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
+    BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
 
     // Get some records belong to the same partition (2021/09/11)
     String insertRecordStr1 = "{\"_row_key\":\"1\","
@@ -222,7 +222,7 @@ public class TestHoodieJavaWriteClientInsert extends 
HoodieJavaClientTestHarness
 
     HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
+    BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
 
     String partitionPath = "2021/09/11";
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new 
String[]{partitionPath});
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 1e8f5149d37..06446ae9138 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -39,7 +39,6 @@ import org.apache.hudi.common.model.HoodieAvroRecord;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
-import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
@@ -2763,10 +2762,6 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       // Metadata table is MOR
       assertEquals(metadataMetaClient.getTableType(), 
HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
 
-      // Metadata table is HFile format
-      assertEquals(metadataMetaClient.getTableConfig().getBaseFileFormat(), 
HoodieFileFormat.HFILE,
-          "Metadata Table base file format should be HFile");
-
       // Metadata table has a fixed number of partitions
       // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as 
that function filters all directory
       // in the .hoodie folder.
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
index 3330c5c7eed..a591134517f 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
@@ -62,7 +62,6 @@ import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestTable;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.RawTripTestPayload;
-import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.MarkerUtils;
@@ -1021,7 +1020,7 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage 
extends HoodieJavaClientTe
   private Set<String> verifyRecordKeys(List<HoodieRecord> expectedRecords, 
List<WriteStatus> allStatus, List<GenericRecord> records) {
     for (WriteStatus status : allStatus) {
       Path filePath = new Path(basePath, status.getStat().getPath());
-      
records.addAll(BaseFileUtils.getInstance(metaClient).readAvroRecords(hadoopConf,
 filePath));
+      
records.addAll(getFileUtilsInstance(metaClient).readAvroRecords(hadoopConf, 
filePath));
     }
     Set<String> expectedKeys = recordsToRecordKeySet(expectedRecords);
     assertEquals(records.size(), expectedKeys.size());
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
index a3a233cb743..bda362931c7 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
@@ -129,7 +129,7 @@ public class TestJavaCopyOnWriteActionExecutor extends 
HoodieJavaClientTestHarne
     HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
     writeClient.startCommitWithTime(firstCommitTime);
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
+    BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
 
     String partitionPath = "2016/01/31";
 
@@ -476,7 +476,7 @@ public class TestJavaCopyOnWriteActionExecutor extends 
HoodieJavaClientTestHarne
     HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
     writeClient.startCommitWithTime(firstCommitTime);
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
+    BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
 
     String partitionPath = "2022/04/09";
 
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
index 7a373f093c0..09687e73a89 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
@@ -1046,4 +1046,8 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
     }
     return builder;
   }
+
+  public static BaseFileUtils getFileUtilsInstance(HoodieTableMetaClient 
metaClient) {
+    return 
BaseFileUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
index 92bee7ab141..884a7a6ab44 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
@@ -266,7 +266,7 @@ public class SparkBootstrapCommitActionExecutor<T>
    */
   private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> 
listAndProcessSourcePartitions() throws IOException {
     List<Pair<String, List<HoodieFileStatus>>> folders = 
BootstrapUtils.getAllLeafFoldersWithFiles(
-        table.getMetaClient(), bootstrapSourceFileSystem, 
config.getBootstrapSourceBasePath(), context);
+        table.getBaseFileFormat(), bootstrapSourceFileSystem, 
config.getBootstrapSourceBasePath(), context);
 
     LOG.info("Fetching Bootstrap Schema !!");
     HoodieBootstrapSchemaProvider sourceSchemaProvider = new 
HoodieSparkBootstrapSchemaProvider(config);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 8f13e0cea48..44105a41983 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -1200,7 +1200,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
     dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
     SparkRDDWriteClient client = getHoodieWriteClient(config);
-    BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
+    BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
 
     // Inserts => will write file1
     String commitTime1 = "001";
@@ -1313,7 +1313,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
     HoodieWriteConfig config = getSmallInsertWriteConfig(insertSplitLimit, 
false, mergeAllowDuplicateInserts); // hold upto 200 records max
     dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath});
     SparkRDDWriteClient client = getHoodieWriteClient(config);
-    BaseFileUtils fileUtils = BaseFileUtils.getInstance(metaClient);
+    BaseFileUtils fileUtils = getFileUtilsInstance(metaClient);
 
     // Inserts => will write file1
     String commitTime1 = "001";
@@ -1410,9 +1410,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
     assertEquals(1, statuses.size(), "Just 1 file needs to be added.");
     String file1 = statuses.get(0).getFileId();
-    assertEquals(100,
-        BaseFileUtils.getInstance(metaClient).readRowKeys(hadoopConf, new 
Path(basePath, statuses.get(0).getStat().getPath()))
-            .size(), "file should contain 100 records");
+    assertEquals(100, getFileUtilsInstance(metaClient).readRowKeys(hadoopConf, 
new Path(basePath, statuses.get(0).getStat().getPath())).size(), "file should 
contain 100 records");
 
     // Delete 20 among 100 inserted
     testDeletes(client, inserts1, 20, file1, "002", 80, keysSoFar);
@@ -2091,7 +2089,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   private Set<String> verifyRecordKeys(List<HoodieRecord> expectedRecords, 
List<WriteStatus> allStatus, List<GenericRecord> records) {
     for (WriteStatus status : allStatus) {
       Path filePath = new Path(basePath, status.getStat().getPath());
-      
records.addAll(BaseFileUtils.getInstance(metaClient).readAvroRecords(jsc.hadoopConfiguration(),
 filePath));
+      
records.addAll(getFileUtilsInstance(metaClient).readAvroRecords(jsc.hadoopConfiguration(),
 filePath));
     }
     Set<String> expectedKeys = recordsToRecordKeySet(expectedRecords);
     assertEquals(records.size(), expectedKeys.size());
@@ -2180,10 +2178,10 @@ public class TestHoodieClientOnCopyOnWriteStorage 
extends HoodieClientTestBase {
 
     Path newFile = new Path(basePath, statuses.get(0).getStat().getPath());
     assertEquals(expectedRecords,
-        BaseFileUtils.getInstance(metaClient).readRowKeys(hadoopConf, 
newFile).size(),
+        getFileUtilsInstance(metaClient).readRowKeys(hadoopConf, 
newFile).size(),
         "file should contain 110 records");
 
-    List<GenericRecord> records = 
BaseFileUtils.getInstance(metaClient).readAvroRecords(hadoopConf, newFile);
+    List<GenericRecord> records = 
getFileUtilsInstance(metaClient).readAvroRecords(hadoopConf, newFile);
     for (GenericRecord record : records) {
       String recordKey = 
record.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
       assertTrue(keys.contains(recordKey), "key expected to be part of " + 
instantTime);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
index 83a6caecd19..cda4fa38d40 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/bootstrap/TestBootstrapUtils.java
@@ -67,18 +67,14 @@ public class TestBootstrapUtils extends 
HoodieClientTestBase {
       }
     });
 
-    List<Pair<String, List<HoodieFileStatus>>> collected = 
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient,
+    List<Pair<String, List<HoodieFileStatus>>> collected = 
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
             metaClient.getFs(), basePath, context);
     assertEquals(3, collected.size());
-    collected.stream().forEach(k -> {
-      assertEquals(2, k.getRight().size());
-    });
+    collected.forEach(k -> assertEquals(2, k.getRight().size()));
 
     // Simulate reading from un-partitioned dataset
-    collected = BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, 
metaClient.getFs(), basePath + "/" + folders.get(0), context);
+    collected = 
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
 metaClient.getFs(), basePath + "/" + folders.get(0), context);
     assertEquals(1, collected.size());
-    collected.stream().forEach(k -> {
-      assertEquals(2, k.getRight().size());
-    });
+    collected.forEach(k -> assertEquals(2, k.getRight().size()));
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index 24b66911613..4574b34393d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -244,8 +244,7 @@ public class TestCopyOnWriteActionExecutor extends 
HoodieClientTestBase implemen
 
     // Check whether the record has been updated
     Path updatedFilePath = allFiles[0].getPath();
-    BloomFilter updatedFilter =
-        
BaseFileUtils.getInstance(metaClient).readBloomFilterFromMetadata(hadoopConf, 
updatedFilePath);
+    BloomFilter updatedFilter = 
getFileUtilsInstance(metaClient).readBloomFilterFromMetadata(hadoopConf, 
updatedFilePath);
     for (HoodieRecord record : records) {
       // No change to the _row_key
       assertTrue(updatedFilter.mightContain(record.getRecordKey()));
@@ -542,7 +541,7 @@ public class TestCopyOnWriteActionExecutor extends 
HoodieClientTestBase implemen
     Option<Path> metafilePath = 
HoodiePartitionMetadata.getPartitionMetafilePath(fs, partitionPath);
     if (partitionMetafileUseBaseFormat) {
       // Extension should be the same as the data file format of the table
-      
assertTrue(metafilePath.get().toString().endsWith(table.getBaseFileFormat().getFileExtension()));
+      
assertTrue(metafilePath.get().toString().endsWith(table.getBaseFileExtension()));
     } else {
       // No extension as it is in properties file format
       
assertTrue(metafilePath.get().toString().endsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX));
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
index 01cfcd047b4..92f2b2e1438 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableRollback.java
@@ -950,7 +950,7 @@ public class TestHoodieSparkMergeOnReadTableRollback 
extends SparkClientFunction
     return records;
   }
 
-  private long doCompaction(SparkRDDWriteClient client, HoodieTableMetaClient 
metaClient, HoodieWriteConfig cfg, long numLogFiles) throws IOException {
+  private long doCompaction(SparkRDDWriteClient client, HoodieTableMetaClient 
metaClient, HoodieWriteConfig cfg, long numLogFiles) {
     // Do a compaction
     String instantTime = 
client.scheduleCompaction(Option.empty()).get().toString();
     HoodieWriteMetadata<JavaRDD<WriteStatus>> compactionMetadata = 
client.compact(instantTime);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index c4a150e7f8f..39c77de3f26 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
+import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.HoodieIndex;
@@ -622,4 +623,8 @@ public class HoodieClientTestBase extends 
HoodieSparkClientTestHarness {
   public HoodieCleanStat getCleanStat(List<HoodieCleanStat> 
hoodieCleanStatsTwo, String partitionPath) {
     return hoodieCleanStatsTwo.stream().filter(e -> 
e.getPartitionPath().equals(partitionPath)).findFirst().orElse(null);
   }
+
+  public static BaseFileUtils getFileUtilsInstance(HoodieTableMetaClient 
metaClient) {
+    return 
BaseFileUtils.getInstance(metaClient.getTableConfig().getBaseFileFormat());
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
index 54b4972880f..2a83baa018c 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieSparkClientTestHarness.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
-import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
@@ -602,10 +601,6 @@ public abstract class HoodieSparkClientTestHarness extends 
HoodieWriterClientTes
     // Metadata table is MOR
     assertEquals(metadataMetaClient.getTableType(), 
HoodieTableType.MERGE_ON_READ, "Metadata Table should be MOR");
 
-    // Metadata table is HFile format
-    assertEquals(metadataMetaClient.getTableConfig().getBaseFileFormat(), 
HoodieFileFormat.HFILE,
-        "Metadata Table base file format should be HFile");
-
     // Metadata table has a fixed number of partitions
     // Cannot use FSUtils.getAllFoldersWithPartitionMetaFile for this as that 
function filters all directory
     // in the .hoodie folder.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
index c8c94e5db3d..d7c25b82fad 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileFormat.java
@@ -64,4 +64,13 @@ public enum HoodieFileFormat {
   public String getFileExtension() {
     return extension;
   }
+
+  public static HoodieFileFormat fromFileExtension(String extension) {
+    for (HoodieFileFormat format : HoodieFileFormat.values()) {
+      if (format.getFileExtension().equals(extension)) {
+        return format;
+      }
+    }
+    throw new IllegalArgumentException("Unknown file extension :" + extension);
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
index 29869730367..27aaf3324ed 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java
@@ -244,6 +244,12 @@ public class HoodieTableConfig extends HoodieConfig {
       .markAdvanced()
       .withDocumentation("When set to true, will not write the partition 
columns into hudi. By default, false.");
 
+  public static final ConfigProperty<Boolean> 
MULTIPLE_BASE_FILE_FORMATS_ENABLE = ConfigProperty
+      .key("hoodie.table.multiple.base.file.formats.enable")
+      .defaultValue(false)
+      .sinceVersion("1.0.0")
+      .withDocumentation("When set to true, the table can support reading and 
writing multiple base file formats.");
+
   public static final ConfigProperty<String> URL_ENCODE_PARTITIONING = 
KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
   public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE = 
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;
 
@@ -747,6 +753,10 @@ public class HoodieTableConfig extends HoodieConfig {
     return getBooleanOrDefault(DROP_PARTITION_COLUMNS);
   }
 
+  public boolean isMultipleBaseFileFormatsEnabled() {
+    return getBooleanOrDefault(MULTIPLE_BASE_FILE_FORMATS_ENABLE);
+  }
+
   /**
    * Read the table checksum.
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
index cee950592b4..2a989764120 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
@@ -158,8 +158,7 @@ public class HoodieTableMetaClient implements Serializable {
     }
     this.timelineLayoutVersion = layoutVersion.orElseGet(() -> 
tableConfig.getTimelineLayoutVersion().get());
     this.loadActiveTimelineOnLoad = loadActiveTimelineOnLoad;
-    LOG.info("Finished Loading Table of type " + tableType + "(version=" + 
timelineLayoutVersion + ", baseFileFormat="
-        + this.tableConfig.getBaseFileFormat() + ") from " + basePath);
+    LOG.info("Finished Loading Table of type " + tableType + "(version=" + 
timelineLayoutVersion + ") from " + basePath);
     if (loadActiveTimelineOnLoad) {
       LOG.info("Loading Active commit timeline for " + basePath);
       getActiveTimeline();
@@ -867,6 +866,7 @@ public class HoodieTableMetaClient implements Serializable {
     private String metadataPartitions;
     private String inflightMetadataPartitions;
     private String secondaryIndexesMetadata;
+    private Boolean multipleBaseFileFormatsEnabled;
 
     /**
      * Persist the configs that is written at the first time, and should not 
be changed.
@@ -1031,6 +1031,15 @@ public class HoodieTableMetaClient implements 
Serializable {
       return this;
     }
 
+    public PropertyBuilder setMultipleBaseFileFormatsEnabled(Boolean 
multipleBaseFileFormatsEnabled) {
+      this.multipleBaseFileFormatsEnabled = multipleBaseFileFormatsEnabled;
+      return this;
+    }
+
+    public PropertyBuilder setBaseFileFormats(String baseFileFormats) {
+      return this;
+    }
+
     public PropertyBuilder set(Map<String, Object> props) {
       for (ConfigProperty<String> configProperty : 
HoodieTableConfig.PERSISTED_CONFIG_LIST) {
         if (containsConfigProperty(props, configProperty)) {
@@ -1155,6 +1164,9 @@ public class HoodieTableMetaClient implements 
Serializable {
       if (hoodieConfig.contains(HoodieTableConfig.SECONDARY_INDEXES_METADATA)) 
{
         
setSecondaryIndexesMetadata(hoodieConfig.getString(HoodieTableConfig.SECONDARY_INDEXES_METADATA));
       }
+      if 
(hoodieConfig.contains(HoodieTableConfig.MULTIPLE_BASE_FILE_FORMATS_ENABLE)) {
+        
setMultipleBaseFileFormatsEnabled(hoodieConfig.getBoolean(HoodieTableConfig.MULTIPLE_BASE_FILE_FORMATS_ENABLE));
+      }
       return this;
     }
 
@@ -1263,6 +1275,9 @@ public class HoodieTableMetaClient implements 
Serializable {
       if (null != secondaryIndexesMetadata) {
         tableConfig.setValue(HoodieTableConfig.SECONDARY_INDEXES_METADATA, 
secondaryIndexesMetadata);
       }
+      if (null != multipleBaseFileFormatsEnabled) {
+        
tableConfig.setValue(HoodieTableConfig.MULTIPLE_BASE_FILE_FORMATS_ENABLE, 
Boolean.toString(multipleBaseFileFormatsEnabled));
+      }
       return tableConfig.getProps();
     }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
index be41857a38e..278729f3d78 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
@@ -25,7 +25,6 @@ import org.apache.hudi.common.bloom.BloomFilterFactory;
 import org.apache.hudi.common.bloom.BloomFilterTypeCode;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieKey;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
@@ -65,10 +64,6 @@ public abstract class BaseFileUtils {
     throw new UnsupportedOperationException(fileFormat.name() + " format not 
supported yet.");
   }
 
-  public static BaseFileUtils getInstance(HoodieTableMetaClient metaClient) {
-    return getInstance(metaClient.getTableConfig().getBaseFileFormat());
-  }
-
   /**
    * Read the rowKey list from the given data file.
    *
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
index b5f4ea5726f..612929bc8a6 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/fs/TestFSUtils.java
@@ -545,6 +545,33 @@ public class TestFSUtils extends HoodieCommonTestHarness {
             .collect(Collectors.toSet()));
   }
 
+  @Test
+  public void testGetFileExtension() {
+    String pathWithExtension = "/path/to/some/file/sample.parquet";
+    String pathWithoutExtension = "/path/to/some/file/sample";
+    String justFileNameWithExtension = "sample.orc";
+    String justFileNameWithoutExtension = "sample";
+
+    // file with extension
+    String result1 = FSUtils.getFileExtension(pathWithExtension);
+    assertEquals(".parquet", result1);
+
+    // file without extension
+    String result2 = FSUtils.getFileExtension(pathWithoutExtension);
+    assertEquals("", result2);
+
+    // just a file name with extension
+    String result3 = FSUtils.getFileExtension(justFileNameWithExtension);
+    assertEquals(".orc", result3);
+
+    // just a file name without extension
+    String result4 = FSUtils.getFileExtension(justFileNameWithoutExtension);
+    assertEquals("", result4);
+
+    // null input
+    assertThrows(NullPointerException.class, () -> 
FSUtils.getFileExtension(null));
+  }
+
   private Path getHoodieTempDir() {
     return new Path(baseUri.toString(), ".hoodie/.temp");
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
index 81e7d993d55..202827ce0c7 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestTable.java
@@ -50,7 +50,6 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.IOType;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
@@ -789,7 +788,7 @@ public class HoodieTestTable {
   }
 
   public FileStatus[] listAllBaseFiles() throws IOException {
-    return 
listAllBaseFiles(HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension());
+    return listAllBaseFiles(HoodieFileFormat.PARQUET.getFileExtension());
   }
 
   public FileStatus[] listAllBaseFiles(String fileExtension) throws 
IOException {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
index f3b32b84017..65bb8881455 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/BaseFileOnlyRelation.scala
@@ -90,9 +90,7 @@ case class BaseFileOnlyRelation(override val sqlContext: 
SQLContext,
                                     requiredSchema: HoodieTableSchema,
                                     requestedColumns: Array[String],
                                     filters: Array[Filter]): RDD[InternalRow] 
= {
-    val (partitionSchema, dataSchema, requiredDataSchema) =
-      tryPrunePartitionColumns(tableSchema, requiredSchema)
-
+    val (partitionSchema, dataSchema, requiredDataSchema) = 
tryPrunePartitionColumns(tableSchema, requiredSchema)
     val baseFileReader = createBaseFileReader(
       spark = sparkSession,
       partitionSchema = partitionSchema,
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
index f982fb1e1c3..965340c637a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala
@@ -23,8 +23,7 @@ import 
org.apache.hudi.DataSourceWriteOptions.{BOOTSTRAP_OPERATION_OPT_VAL, OPER
 import org.apache.hudi.cdc.CDCRelation
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieTableType.{COPY_ON_WRITE, 
MERGE_ON_READ}
-import org.apache.hudi.common.model.WriteConcurrencyMode
-import org.apache.hudi.common.table.timeline.HoodieInstant
+import org.apache.hudi.common.model.{HoodieTableType, WriteConcurrencyMode}
 import org.apache.hudi.common.table.{HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.ConfigUtils
 import org.apache.hudi.common.util.ValidationUtils.checkState
@@ -118,11 +117,6 @@ class DefaultSource extends RelationProvider
     DefaultSource.createRelation(sqlContext, metaClient, schema, globPaths, 
parameters)
   }
 
-  def getValidCommits(metaClient: HoodieTableMetaClient): String = {
-    metaClient
-      
.getCommitsAndCompactionTimeline.filterCompletedInstants.getInstantsAsStream.toArray().map(_.asInstanceOf[HoodieInstant].getFileName).mkString(",")
-  }
-
   /**
    * This DataSource API is used for writing the DataFrame at the destination. 
For now, we are returning a dummy
    * relation here because Spark does not really make use of the relation 
returned, and just returns an empty
@@ -227,6 +221,7 @@ object DefaultSource {
     val queryType = parameters(QUERY_TYPE.key)
     val isCdcQuery = queryType == QUERY_TYPE_INCREMENTAL_OPT_VAL &&
       
parameters.get(INCREMENTAL_FORMAT.key).contains(INCREMENTAL_FORMAT_CDC_VAL)
+    val isMultipleBaseFileFormatsEnabled = 
metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled
 
     log.info(s"Is bootstrapped table => $isBootstrappedTable, tableType is: 
$tableType, queryType is: $queryType")
 
@@ -245,16 +240,24 @@ object DefaultSource {
     } else if (isCdcQuery) {
       CDCRelation.getCDCRelation(sqlContext, metaClient, parameters)
     } else {
-      lazy val newHudiFileFormatUtils = if 
(parameters.getOrElse(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key,
-        USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean && (globPaths 
== null || globPaths.isEmpty)
+      lazy val fileFormatUtils = if ((isMultipleBaseFileFormatsEnabled && 
!isBootstrappedTable)
+        || (parameters.getOrElse(USE_NEW_HUDI_PARQUET_FILE_FORMAT.key, 
USE_NEW_HUDI_PARQUET_FILE_FORMAT.defaultValue).toBoolean
+        && (globPaths == null || globPaths.isEmpty)
         && parameters.getOrElse(REALTIME_MERGE.key(), 
REALTIME_MERGE.defaultValue())
-        .equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL)) {
-        val formatUtils = new NewHoodieParquetFileFormatUtils(sqlContext, 
metaClient, parameters, userSchema)
+        .equalsIgnoreCase(REALTIME_PAYLOAD_COMBINE_OPT_VAL))) {
+        val formatUtils = new HoodieSparkFileFormatUtils(sqlContext, 
metaClient, parameters, userSchema)
         if (formatUtils.hasSchemaOnRead) Option.empty else Some(formatUtils)
       } else {
         Option.empty
       }
 
+      if (isMultipleBaseFileFormatsEnabled) {
+        if (isBootstrappedTable) {
+          throw new HoodieException(s"Multiple base file formats are not 
supported for bootstrapped table")
+        }
+        resolveMultiFileFormatRelation(tableType, queryType, 
fileFormatUtils.get)
+      }
+
       (tableType, queryType, isBootstrappedTable) match {
         case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) |
              (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, false) |
@@ -265,27 +268,27 @@ object DefaultSource {
           new IncrementalRelation(sqlContext, parameters, userSchema, 
metaClient)
 
         case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, false) =>
-          if (newHudiFileFormatUtils.isEmpty) {
+          if (fileFormatUtils.isEmpty) {
             new MergeOnReadSnapshotRelation(sqlContext, parameters, 
metaClient, globPaths, userSchema)
           } else {
-            newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true, 
isBootstrap = false)
+            fileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap 
= false)
           }
 
         case (MERGE_ON_READ, QUERY_TYPE_INCREMENTAL_OPT_VAL, _) =>
           new MergeOnReadIncrementalRelation(sqlContext, parameters, 
metaClient, userSchema)
 
         case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL, true) =>
-          if (newHudiFileFormatUtils.isEmpty) {
+          if (fileFormatUtils.isEmpty) {
             new HoodieBootstrapMORRelation(sqlContext, userSchema, globPaths, 
metaClient, parameters)
           } else {
-            newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = true, 
isBootstrap = true)
+            fileFormatUtils.get.getHadoopFsRelation(isMOR = true, isBootstrap 
= true)
           }
 
         case (_, _, true) =>
-          if (newHudiFileFormatUtils.isEmpty) {
+          if (fileFormatUtils.isEmpty) {
             resolveHoodieBootstrapRelation(sqlContext, globPaths, userSchema, 
metaClient, parameters)
           } else {
-            newHudiFileFormatUtils.get.getHadoopFsRelation(isMOR = false, 
isBootstrap = true)
+            fileFormatUtils.get.getHadoopFsRelation(isMOR = false, isBootstrap 
= true)
           }
 
         case (_, _, _) =>
@@ -332,6 +335,21 @@ object DefaultSource {
     }
   }
 
+  private def resolveMultiFileFormatRelation(tableType: HoodieTableType,
+                                             queryType: String,
+                                             fileFormatUtils: 
HoodieSparkFileFormatUtils): BaseRelation = {
+    (tableType, queryType) match {
+      case (COPY_ON_WRITE, QUERY_TYPE_SNAPSHOT_OPT_VAL) |
+           (COPY_ON_WRITE, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) =>
+        fileFormatUtils.getHadoopFsRelation(isMOR = false, isBootstrap = false)
+      case (MERGE_ON_READ, QUERY_TYPE_SNAPSHOT_OPT_VAL) |
+           (MERGE_ON_READ, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL) =>
+        fileFormatUtils.getHadoopFsRelation(isMOR = true, isBootstrap = false)
+      case (_, _) =>
+        throw new HoodieException(s"Multiple base file formats not supported 
for query type : $queryType for tableType: $tableType")
+    }
+  }
+
   private def resolveSchema(metaClient: HoodieTableMetaClient,
                             parameters: Map[String, String],
                             schema: Option[StructType]): StructType = {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
index 9ace93ed495..c791e8417ca 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieBaseRelation.scala
@@ -55,7 +55,6 @@ import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.{convertToCatalystExpr
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import org.apache.spark.sql.catalyst.expressions.{Expression, 
SubqueryExpression}
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.FileRelation
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
@@ -516,6 +515,99 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
    */
   def updatePrunedDataSchema(prunedSchema: StructType): Relation
 
+  protected def createBaseFileReaders(tableSchema: HoodieTableSchema,
+                                      requiredSchema: HoodieTableSchema,
+                                      requestedColumns: Array[String],
+                                      requiredFilters: Seq[Filter],
+                                      optionalFilters: Seq[Filter] = Seq.empty,
+                                      baseFileFormat: HoodieFileFormat = 
tableConfig.getBaseFileFormat): HoodieMergeOnReadBaseFileReaders = {
+    val (partitionSchema, dataSchema, requiredDataSchema) =
+      tryPrunePartitionColumns(tableSchema, requiredSchema)
+
+    val fullSchemaReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      partitionSchema = partitionSchema,
+      dataSchema = dataSchema,
+      requiredDataSchema = dataSchema,
+      // This file-reader is used to read base file records, subsequently 
merging them with the records
+      // stored in delta-log files. As such, we have to read _all_ records 
from the base file, while avoiding
+      // applying any filtering _before_ we complete combining them w/ 
delta-log records (to make sure that
+      // we combine them correctly);
+      // As such only required filters could be pushed-down to such reader
+      filters = requiredFilters,
+      options = optParams,
+      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+      //       to configure Parquet reader appropriately
+      hadoopConf = embedInternalSchema(new Configuration(conf), 
internalSchemaOpt),
+      baseFileFormat = baseFileFormat
+    )
+
+    val requiredSchemaReader = createBaseFileReader(
+      spark = sqlContext.sparkSession,
+      partitionSchema = partitionSchema,
+      dataSchema = dataSchema,
+      requiredDataSchema = requiredDataSchema,
+      // This file-reader is used to read base file records, subsequently 
merging them with the records
+      // stored in delta-log files. As such, we have to read _all_ records 
from the base file, while avoiding
+      // applying any filtering _before_ we complete combining them w/ 
delta-log records (to make sure that
+      // we combine them correctly);
+      // As such only required filters could be pushed-down to such reader
+      filters = requiredFilters,
+      options = optParams,
+      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+      //       to configure Parquet reader appropriately
+      hadoopConf = embedInternalSchema(new Configuration(conf), 
requiredDataSchema.internalSchema),
+      baseFileFormat = baseFileFormat
+    )
+
+    // Check whether fields required for merging were also requested to be 
fetched
+    // by the query:
+    //    - In case they were, there's no optimization we could apply here (we 
will have
+    //    to fetch such fields)
+    //    - In case they were not, we will provide 2 separate file-readers
+    //        a) One which would be applied to file-groups w/ delta-logs 
(merging)
+    //        b) One which would be applied to file-groups w/ no delta-logs or
+    //           in case query-mode is skipping merging
+    val mandatoryColumns = 
mandatoryFields.map(HoodieAvroUtils.getRootLevelFieldName)
+    if (mandatoryColumns.forall(requestedColumns.contains)) {
+      HoodieMergeOnReadBaseFileReaders(
+        fullSchemaReader = fullSchemaReader,
+        requiredSchemaReader = requiredSchemaReader,
+        requiredSchemaReaderSkipMerging = requiredSchemaReader
+      )
+    } else {
+      val prunedRequiredSchema = {
+        val unusedMandatoryColumnNames = 
mandatoryColumns.filterNot(requestedColumns.contains)
+        val prunedStructSchema =
+          StructType(requiredDataSchema.structTypeSchema.fields
+            .filterNot(f => unusedMandatoryColumnNames.contains(f.name)))
+
+        HoodieTableSchema(prunedStructSchema, 
convertToAvroSchema(prunedStructSchema, tableName).toString)
+      }
+
+      val requiredSchemaReaderSkipMerging = createBaseFileReader(
+        spark = sqlContext.sparkSession,
+        partitionSchema = partitionSchema,
+        dataSchema = dataSchema,
+        requiredDataSchema = prunedRequiredSchema,
+        // This file-reader is only used in cases when no merging is 
performed, therefore it's safe to push
+        // down these filters to the base file readers
+        filters = requiredFilters ++ optionalFilters,
+        options = optParams,
+        // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
+        //       to configure Parquet reader appropriately
+        hadoopConf = embedInternalSchema(new Configuration(conf), 
requiredDataSchema.internalSchema),
+        baseFileFormat = baseFileFormat
+      )
+
+      HoodieMergeOnReadBaseFileReaders(
+        fullSchemaReader = fullSchemaReader,
+        requiredSchemaReader = requiredSchemaReader,
+        requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging
+      )
+    }
+  }
+
   /**
    * Returns file-reader routine accepting [[PartitionedFile]] and returning 
an [[Iterator]]
    * over [[InternalRow]]
@@ -527,16 +619,15 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
                                      filters: Seq[Filter],
                                      options: Map[String, String],
                                      hadoopConf: Configuration,
-                                     shouldAppendPartitionValuesOverride: 
Option[Boolean] = None): BaseFileReader = {
-    val tableBaseFileFormat = tableConfig.getBaseFileFormat
-
+                                     shouldAppendPartitionValuesOverride: 
Option[Boolean] = None,
+                                     baseFileFormat: HoodieFileFormat = 
tableConfig.getBaseFileFormat): BaseFileReader = {
     // NOTE: PLEASE READ CAREFULLY
     //       Lambda returned from this method is going to be invoked on the 
executor, and therefore
     //       we have to eagerly initialize all of the readers even though only 
one specific to the type
     //       of the file being read will be used. This is required to avoid 
serialization of the whole
     //       relation (containing file-index for ex) and passing it to the 
executor
     val (read: (PartitionedFile => Iterator[InternalRow]), schema: StructType) 
=
-      tableBaseFileFormat match {
+      baseFileFormat match {
         case HoodieFileFormat.PARQUET =>
           val parquetReader = HoodieDataSourceHelper.buildHoodieParquetReader(
             sparkSession = spark,
@@ -571,17 +662,17 @@ abstract class HoodieBaseRelation(val sqlContext: 
SQLContext,
 
         (hfileReader, requiredDataSchema.structTypeSchema)
 
-      case _ => throw new UnsupportedOperationException(s"Base file format is 
not currently supported ($tableBaseFileFormat)")
+      case _ => throw new UnsupportedOperationException(s"Base file format is 
not currently supported ($baseFileFormat)")
     }
 
     BaseFileReader(
       read = partitionedFile => {
         val filePathString = 
sparkAdapter.getSparkPartitionedFileUtils.getStringPathFromPartitionedFile(partitionedFile)
         val extension = FSUtils.getFileExtension(filePathString)
-        if (tableBaseFileFormat.getFileExtension.equals(extension)) {
+        if (baseFileFormat.getFileExtension.equals(extension)) {
           read(partitionedFile)
         } else {
-          throw new UnsupportedOperationException(s"Invalid base-file format 
($extension), expected ($tableBaseFileFormat)")
+          throw new UnsupportedOperationException(s"Invalid base-file format 
($extension), expected ($baseFileFormat)")
         }
       },
       schema = schema
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
similarity index 87%
rename from 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
rename to 
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
index a76d4bfc77f..e66b248e0ab 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/NewHoodieParquetFileFormatUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkFileFormatUtils.scala
@@ -24,9 +24,7 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hudi.HoodieBaseRelation._
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.common.config.{ConfigProperty, HoodieReaderConfig}
-import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord
-import 
org.apache.hudi.common.model.HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX
 import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
TableSchemaResolver}
 import org.apache.hudi.common.util.ValidationUtils.checkState
@@ -37,7 +35,7 @@ import 
org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
 import org.apache.hudi.internal.schema.{HoodieSchemaException, InternalSchema}
 import org.apache.spark.sql.catalyst.analysis.Resolver
 import 
org.apache.spark.sql.execution.datasources.parquet.{HoodieFileGroupReaderBasedParquetFileFormat,
 NewHoodieParquetFileFormat}
-import org.apache.spark.sql.execution.datasources.{FileStatusCache, 
HadoopFsRelation}
+import org.apache.spark.sql.execution.datasources.{FileStatusCache, 
HadoopFsRelation, HoodieMultipleBaseFileFormat}
 import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.StructType
@@ -46,10 +44,10 @@ import org.apache.spark.sql.{SQLContext, SparkSession}
 import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
 
-class NewHoodieParquetFileFormatUtils(val sqlContext: SQLContext,
-                                      val metaClient: HoodieTableMetaClient,
-                                      val optParamsInput: Map[String, String],
-                                      private val schemaSpec: 
Option[StructType]) extends SparkAdapterSupport {
+class HoodieSparkFileFormatUtils(val sqlContext: SQLContext,
+                                 val metaClient: HoodieTableMetaClient,
+                                 val optParamsInput: Map[String, String],
+                                 private val schemaSpec: Option[StructType]) 
extends SparkAdapterSupport {
   protected val sparkSession: SparkSession = sqlContext.sparkSession
 
   protected val optParams: Map[String, String] = optParamsInput.filter(kv => 
!kv._1.equals(DATA_QUERIES_ONLY.key()))
@@ -208,18 +206,27 @@ class NewHoodieParquetFileFormatUtils(val sqlContext: 
SQLContext,
       Seq.empty
     }
     fileIndex.shouldEmbedFileSlices = true
-    val fileGroupReaderBasedFileFormat = new 
HoodieFileGroupReaderBasedParquetFileFormat(
-      tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
-      metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
isMOR, isBootstrap, shouldUseRecordPosition)
-    val newHoodieParquetFileFormat = new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
-      sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
-      metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
isMOR, isBootstrap)
+
+    val fileFormat = if (fileGroupReaderEnabled) {
+      new HoodieFileGroupReaderBasedParquetFileFormat(
+        tableState, HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
isMOR, isBootstrap, shouldUseRecordPosition)
+    } else if (metaClient.getTableConfig.isMultipleBaseFileFormatsEnabled && 
!isBootstrap) {
+      new 
HoodieMultipleBaseFileFormat(sparkSession.sparkContext.broadcast(tableState),
+        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
isMOR)
+    } else {
+      new 
NewHoodieParquetFileFormat(sparkSession.sparkContext.broadcast(tableState),
+        
sparkSession.sparkContext.broadcast(HoodieTableSchema(tableStructSchema, 
tableAvroSchema.toString, internalSchemaOpt)),
+        metaClient.getTableConfig.getTableName, mergeType, mandatoryFields, 
isMOR, isBootstrap)
+    }
+
     HadoopFsRelation(
       location = fileIndex,
       partitionSchema = fileIndex.partitionSchema,
       dataSchema = fileIndex.dataSchema,
       bucketSpec = None,
-      fileFormat = if (fileGroupReaderEnabled) fileGroupReaderBasedFileFormat 
else newHoodieParquetFileFormat,
+      fileFormat = fileFormat,
       optParams)(sparkSession)
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
index b2c44cc3330..e2c5ad88d7f 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieWriterUtils.scala
@@ -169,9 +169,12 @@ object HoodieWriterUtils {
       val resolver = spark.sessionState.conf.resolver
       val diffConfigs = StringBuilder.newBuilder
       params.foreach { case (key, value) =>
-        val existingValue = 
getStringFromTableConfigWithAlternatives(tableConfig, key)
-        if (null != existingValue && !resolver(existingValue, value)) {
-          diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
+        // Base file format can change between writes, so ignore it.
+        if (!HoodieTableConfig.BASE_FILE_FORMAT.key.equals(key)) {
+          val existingValue = 
getStringFromTableConfigWithAlternatives(tableConfig, key)
+          if (null != existingValue && !resolver(existingValue, value)) {
+            
diffConfigs.append(s"$key:\t$value\t${tableConfig.getString(key)}\n")
+          }
         }
       }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
index d80ce1a9cba..4dda08c2e28 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadIncrementalRelation.scala
@@ -17,7 +17,7 @@
 
 package org.apache.hudi
 
-import org.apache.hadoop.fs.{FileStatus, GlobPattern, Path}
+import org.apache.hadoop.fs.{FileStatus, GlobPattern}
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.HoodieSparkConfUtils.getHollowCommitHandling
 import org.apache.hudi.common.model.{FileSlice, HoodieRecord}
@@ -181,7 +181,7 @@ trait HoodieIncrementalRelationTrait extends 
HoodieBaseRelation {
   protected lazy val commitsMetadata = 
includedCommits.map(getCommitMetadata(_, super.timeline)).asJava
 
   protected lazy val affectedFilesInCommits: Array[FileStatus] = {
-    listAffectedFilesForCommits(conf, new Path(metaClient.getBasePath), 
commitsMetadata)
+    listAffectedFilesForCommits(conf, metaClient.getBasePathV2, 
commitsMetadata)
   }
 
   protected lazy val (includeStartTime, startTs) = if (startInstantArchived) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
index 31d64d50e45..8808d73ae1a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/MergeOnReadSnapshotRelation.scala
@@ -18,12 +18,9 @@
 
 package org.apache.hudi
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
-import org.apache.hudi.HoodieBaseRelation.convertToAvroSchema
 import org.apache.hudi.HoodieConversionUtils.toScalaOption
 import org.apache.hudi.MergeOnReadSnapshotRelation.{createPartitionedFile, 
isProjectionCompatible}
-import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.model.{FileSlice, HoodieLogFile, 
OverwriteWithLatestAvroPayload}
 import org.apache.hudi.common.table.HoodieTableMetaClient
 import org.apache.spark.rdd.RDD
@@ -125,95 +122,6 @@ abstract class BaseMergeOnReadSnapshotRelation(sqlContext: 
SQLContext,
       fileSplits = fileSplits)
   }
 
-  protected def createBaseFileReaders(tableSchema: HoodieTableSchema,
-                                      requiredSchema: HoodieTableSchema,
-                                      requestedColumns: Array[String],
-                                      requiredFilters: Seq[Filter],
-                                      optionalFilters: Seq[Filter] = 
Seq.empty): HoodieMergeOnReadBaseFileReaders = {
-    val (partitionSchema, dataSchema, requiredDataSchema) =
-      tryPrunePartitionColumns(tableSchema, requiredSchema)
-
-    val fullSchemaReader = createBaseFileReader(
-      spark = sqlContext.sparkSession,
-      partitionSchema = partitionSchema,
-      dataSchema = dataSchema,
-      requiredDataSchema = dataSchema,
-      // This file-reader is used to read base file records, subsequently 
merging them with the records
-      // stored in delta-log files. As such, we have to read _all_ records 
from the base file, while avoiding
-      // applying any filtering _before_ we complete combining them w/ 
delta-log records (to make sure that
-      // we combine them correctly);
-      // As such only required filters could be pushed-down to such reader
-      filters = requiredFilters,
-      options = optParams,
-      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
-      //       to configure Parquet reader appropriately
-      hadoopConf = embedInternalSchema(new Configuration(conf), 
internalSchemaOpt)
-    )
-
-    val requiredSchemaReader = createBaseFileReader(
-      spark = sqlContext.sparkSession,
-      partitionSchema = partitionSchema,
-      dataSchema = dataSchema,
-      requiredDataSchema = requiredDataSchema,
-      // This file-reader is used to read base file records, subsequently 
merging them with the records
-      // stored in delta-log files. As such, we have to read _all_ records 
from the base file, while avoiding
-      // applying any filtering _before_ we complete combining them w/ 
delta-log records (to make sure that
-      // we combine them correctly);
-      // As such only required filters could be pushed-down to such reader
-      filters = requiredFilters,
-      options = optParams,
-      // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
-      //       to configure Parquet reader appropriately
-      hadoopConf = embedInternalSchema(new Configuration(conf), 
requiredDataSchema.internalSchema)
-    )
-
-    // Check whether fields required for merging were also requested to be 
fetched
-    // by the query:
-    //    - In case they were, there's no optimization we could apply here (we 
will have
-    //    to fetch such fields)
-    //    - In case they were not, we will provide 2 separate file-readers
-    //        a) One which would be applied to file-groups w/ delta-logs 
(merging)
-    //        b) One which would be applied to file-groups w/ no delta-logs or
-    //           in case query-mode is skipping merging
-    val mandatoryColumns = 
mandatoryFieldsForMerging.map(HoodieAvroUtils.getRootLevelFieldName)
-    if (mandatoryColumns.forall(requestedColumns.contains)) {
-      HoodieMergeOnReadBaseFileReaders(
-        fullSchemaReader = fullSchemaReader,
-        requiredSchemaReader = requiredSchemaReader,
-        requiredSchemaReaderSkipMerging = requiredSchemaReader
-      )
-    } else {
-      val prunedRequiredSchema = {
-        val unusedMandatoryColumnNames = 
mandatoryColumns.filterNot(requestedColumns.contains)
-        val prunedStructSchema =
-          StructType(requiredDataSchema.structTypeSchema.fields
-            .filterNot(f => unusedMandatoryColumnNames.contains(f.name)))
-
-        HoodieTableSchema(prunedStructSchema, 
convertToAvroSchema(prunedStructSchema, tableName).toString)
-      }
-
-      val requiredSchemaReaderSkipMerging = createBaseFileReader(
-        spark = sqlContext.sparkSession,
-        partitionSchema = partitionSchema,
-        dataSchema = dataSchema,
-        requiredDataSchema = prunedRequiredSchema,
-        // This file-reader is only used in cases when no merging is 
performed, therefore it's safe to push
-        // down these filters to the base file readers
-        filters = requiredFilters ++ optionalFilters,
-        options = optParams,
-        // NOTE: We have to fork the Hadoop Config here as Spark will be 
modifying it
-        //       to configure Parquet reader appropriately
-        hadoopConf = embedInternalSchema(new Configuration(conf), 
requiredDataSchema.internalSchema)
-      )
-
-      HoodieMergeOnReadBaseFileReaders(
-        fullSchemaReader = fullSchemaReader,
-        requiredSchemaReader = requiredSchemaReader,
-        requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging
-      )
-    }
-  }
-
   protected override def collectFileSplits(partitionFilters: Seq[Expression], 
dataFilters: Seq[Expression]): List[HoodieMergeOnReadFileSplit] = {
     val convertedPartitionFilters =
       HoodieFileIndex.convertFilterForTimestampKeyGenerator(metaClient, 
partitionFilters)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
index 772dd27e279..01fa4f7e39b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/catalyst/catalog/HoodieCatalogTable.scala
@@ -126,9 +126,9 @@ class HoodieCatalogTable(val spark: SparkSession, var 
table: CatalogTable) exten
   lazy val partitionFields: Array[String] = 
tableConfig.getPartitionFields.orElse(Array.empty)
 
   /**
-   * BaseFileFormat
+   * For multiple base file formats
    */
-  lazy val baseFileFormat: String = 
metaClient.getTableConfig.getBaseFileFormat.name()
+  lazy val isMultipleBaseFileFormatsEnabled: Boolean = 
tableConfig.isMultipleBaseFileFormatsEnabled
 
   /**
    * Firstly try to load table schema from meta directory on filesystem.
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
new file mode 100644
index 00000000000..c250a875f2b
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/HoodieMultipleBaseFileFormat.scala
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.mapreduce.Job
+import 
org.apache.hudi.DataSourceReadOptions.{REALTIME_PAYLOAD_COMBINE_OPT_VAL, 
REALTIME_SKIP_MERGE_OPT_VAL}
+import org.apache.hudi.MergeOnReadSnapshotRelation.createPartitionedFile
+import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.{FileSlice, HoodieLogFile}
+import org.apache.hudi.{HoodieBaseRelation, HoodieTableSchema, 
HoodieTableState, LogFileIterator, MergeOnReadSnapshotRelation, 
PartitionFileSliceMapping, RecordMergingFileIterator, SparkAdapterSupport}
+import org.apache.spark.broadcast.Broadcast
+import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.JoinedRow
+import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.sources.Filter
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.util.SerializableConfiguration
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters.asScalaIteratorConverter
+
+/**
+ * File format that supports reading multiple base file formats in a table.
+ */
+class HoodieMultipleBaseFileFormat(tableState: Broadcast[HoodieTableState],
+                                   tableSchema: Broadcast[HoodieTableSchema],
+                                   tableName: String,
+                                   mergeType: String,
+                                   mandatoryFields: Seq[String],
+                                   isMOR: Boolean) extends FileFormat with 
SparkAdapterSupport {
+  private val parquetFormat = new ParquetFileFormat()
+  private val orcFormat = new OrcFileFormat()
+
+  override def inferSchema(sparkSession: SparkSession,
+                           options: Map[String, String],
+                           files: Seq[FileStatus]): Option[StructType] = {
+    // This is a simple heuristic assuming all files have the same extension.
+    val fileFormat = detectFileFormat(files.head.getPath.toString)
+
+    fileFormat match {
+      case "parquet" => parquetFormat.inferSchema(sparkSession, options, files)
+      case "orc" => orcFormat.inferSchema(sparkSession, options, files)
+      case _ => throw new UnsupportedOperationException(s"File format 
$fileFormat is not supported.")
+    }
+  }
+
+  override def isSplitable(sparkSession: SparkSession, options: Map[String, 
String], path: Path): Boolean = {
+    false
+  }
+
+  // Used so that the planner only projects once and does not stack overflow
+  var isProjected = false
+
+  /**
+   * Support batch needs to remain consistent, even if one side of a bootstrap 
merge can support
+   * while the other side can't
+   */
+  private var supportBatchCalled = false
+  private var supportBatchResult = false
+
+  override def supportBatch(sparkSession: SparkSession, schema: StructType): 
Boolean = {
+    if (!supportBatchCalled) {
+      supportBatchCalled = true
+      supportBatchResult =
+        !isMOR && parquetFormat.supportBatch(sparkSession, schema) && 
orcFormat.supportBatch(sparkSession, schema)
+    }
+    supportBatchResult
+  }
+
+  override def prepareWrite(sparkSession: SparkSession,
+                            job: Job,
+                            options: Map[String, String],
+                            dataSchema: StructType): OutputWriterFactory = {
+    throw new UnsupportedOperationException("Write operations are not 
supported in this example.")
+  }
+
+  override def buildReaderWithPartitionValues(sparkSession: SparkSession,
+                                              dataSchema: StructType,
+                                              partitionSchema: StructType,
+                                              requiredSchema: StructType,
+                                              filters: Seq[Filter],
+                                              options: Map[String, String],
+                                              hadoopConf: Configuration): 
PartitionedFile => Iterator[InternalRow] = {
+    val outputSchema = StructType(requiredSchema.fields ++ 
partitionSchema.fields)
+    val requiredSchemaWithMandatory = if (!isMOR || 
MergeOnReadSnapshotRelation.isProjectionCompatible(tableState.value)) {
+      // add mandatory fields to required schema
+      val added: mutable.Buffer[StructField] = mutable.Buffer[StructField]()
+      for (field <- mandatoryFields) {
+        if (requiredSchema.getFieldIndex(field).isEmpty) {
+          val fieldToAdd = 
dataSchema.fields(dataSchema.getFieldIndex(field).get)
+          added.append(fieldToAdd)
+        }
+      }
+      val addedFields = StructType(added.toArray)
+      StructType(requiredSchema.toArray ++ addedFields.fields)
+    } else {
+      dataSchema
+    }
+
+    val (parquetBaseFileReader, orcBaseFileReader, 
preMergeParquetBaseFileReader, preMergeOrcBaseFileReader) = buildFileReaders(
+      sparkSession, dataSchema, partitionSchema, requiredSchema, filters, 
options, hadoopConf, requiredSchemaWithMandatory)
+
+    val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new 
SerializableConfiguration(hadoopConf))
+    (file: PartitionedFile) => {
+      val filePath = 
sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(file)
+      val fileFormat = detectFileFormat(filePath.toString)
+      file.partitionValues match {
+        case fileSliceMapping: PartitionFileSliceMapping =>
+          if (FSUtils.isLogFile(filePath)) {
+            // no base file
+            val fileSlice = 
fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName).substring(1)).get
+            val logFiles = getLogFilesFromSlice(fileSlice)
+            val outputAvroSchema = 
HoodieBaseRelation.convertToAvroSchema(outputSchema, tableName)
+            new LogFileIterator(logFiles, filePath.getParent, 
tableSchema.value, outputSchema, outputAvroSchema,
+              tableState.value, broadcastedHadoopConf.value.value)
+          } else {
+            // We do not broadcast the slice if it has no log files
+            fileSliceMapping.getSlice(FSUtils.getFileId(filePath.getName)) 
match {
+              case Some(fileSlice) =>
+                val hoodieBaseFile = fileSlice.getBaseFile.get()
+                val baseFileFormat = 
detectFileFormat(hoodieBaseFile.getFileName)
+                val partitionValues = fileSliceMapping.getInternalRow
+                val logFiles = getLogFilesFromSlice(fileSlice)
+                if (requiredSchemaWithMandatory.isEmpty) {
+                  val baseFile = createPartitionedFile(partitionValues, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
+                  baseFileFormat match {
+                    case "parquet" => parquetBaseFileReader(baseFile)
+                    case "orc" => orcBaseFileReader(baseFile)
+                    case _ => throw new UnsupportedOperationException(s"Base 
file format $baseFileFormat is not supported.")
+                  }
+                } else {
+                  if (logFiles.nonEmpty) {
+                    val baseFile = createPartitionedFile(InternalRow.empty, 
hoodieBaseFile.getHadoopPath, 0, hoodieBaseFile.getFileLen)
+                    buildMergeOnReadIterator(
+                      baseFileFormat match {
+                        case "parquet" => 
preMergeParquetBaseFileReader(baseFile)
+                        case "orc" => preMergeOrcBaseFileReader(baseFile)
+                        case _ => throw new 
UnsupportedOperationException(s"Base file format $baseFileFormat is not 
supported.")
+                      },
+                      logFiles,
+                      filePath.getParent,
+                      requiredSchemaWithMandatory,
+                      requiredSchemaWithMandatory,
+                      outputSchema,
+                      partitionSchema,
+                      partitionValues,
+                      broadcastedHadoopConf.value.value)
+                  } else {
+                    throw new IllegalStateException("should not be here since 
file slice should not have been broadcasted since it has no log or base files")
+                  }
+                }
+              case _ => fileFormat match {
+                case "parquet" => parquetBaseFileReader(file)
+                case "orc" => orcBaseFileReader(file)
+                case _ => throw new UnsupportedOperationException(s"Base file 
format $fileFormat is not supported.")
+              }
+            }
+          }
+        case _ => fileFormat match {
+          case "parquet" => parquetBaseFileReader(file)
+          case "orc" => orcBaseFileReader(file)
+          case _ => throw new UnsupportedOperationException(s"Base file format 
$fileFormat is not supported.")
+        }
+      }
+    }
+  }
+
+  /**
+   * Build file readers to read individual physical files
+   */
+  protected def buildFileReaders(sparkSession: SparkSession, dataSchema: 
StructType, partitionSchema: StructType,
+                                 requiredSchema: StructType, filters: 
Seq[Filter], options: Map[String, String],
+                                 hadoopConf: Configuration, 
requiredSchemaWithMandatory: StructType):
+  (PartitionedFile => Iterator[InternalRow],
+    PartitionedFile => Iterator[InternalRow],
+    PartitionedFile => Iterator[InternalRow],
+    PartitionedFile => Iterator[InternalRow]) = {
+    val parquetBaseFileReader = 
parquetFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, 
partitionSchema, requiredSchema,
+      filters, options, new Configuration(hadoopConf))
+    val orcBaseFileReader = 
orcFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, 
partitionSchema, requiredSchema,
+      filters, options, new Configuration(hadoopConf))
+
+    val preMergeParquetBaseFileReader = if (isMOR) {
+      parquetFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, 
StructType(Seq.empty),
+        requiredSchemaWithMandatory, Seq.empty, options, new 
Configuration(hadoopConf))
+    } else {
+      _: PartitionedFile => Iterator.empty
+    }
+
+    val preMergeOrcBaseFileReader = if (isMOR) {
+      orcFormat.buildReaderWithPartitionValues(sparkSession, dataSchema, 
StructType(Seq.empty),
+        requiredSchemaWithMandatory, Seq.empty, options, new 
Configuration(hadoopConf))
+    } else {
+      _: PartitionedFile => Iterator.empty
+    }
+
+    (parquetBaseFileReader, orcBaseFileReader, preMergeParquetBaseFileReader, 
preMergeOrcBaseFileReader)
+  }
+
+  /**
+   * Create iterator for a file slice that has log files
+   */
+  protected def buildMergeOnReadIterator(iter: Iterator[InternalRow], 
logFiles: List[HoodieLogFile],
+                                         partitionPath: Path, inputSchema: 
StructType, requiredSchemaWithMandatory: StructType,
+                                         outputSchema: StructType, 
partitionSchema: StructType, partitionValues: InternalRow,
+                                         hadoopConf: Configuration): 
Iterator[InternalRow] = {
+
+    val requiredAvroSchema = 
HoodieBaseRelation.convertToAvroSchema(requiredSchemaWithMandatory, tableName)
+    val morIterator = mergeType match {
+      case REALTIME_SKIP_MERGE_OPT_VAL => throw new 
UnsupportedOperationException("Skip merge is not currently " +
+        "implemented for the New Hudi Parquet File format")
+      //new SkipMergeIterator(logFiles, partitionPath, iter, inputSchema, 
tableSchema.value,
+      //  requiredSchemaWithMandatory, requiredAvroSchema, tableState.value, 
hadoopConf)
+      case REALTIME_PAYLOAD_COMBINE_OPT_VAL =>
+        new RecordMergingFileIterator(logFiles, partitionPath, iter, 
inputSchema, tableSchema.value,
+          requiredSchemaWithMandatory, requiredAvroSchema, tableState.value, 
hadoopConf)
+    }
+    appendPartitionAndProject(morIterator, requiredSchemaWithMandatory, 
partitionSchema,
+      outputSchema, partitionValues)
+  }
+
+  /**
+   * Append partition values to rows and project to output schema
+   */
+  protected def appendPartitionAndProject(iter: Iterator[InternalRow],
+                                          inputSchema: StructType,
+                                          partitionSchema: StructType,
+                                          to: StructType,
+                                          partitionValues: InternalRow): 
Iterator[InternalRow] = {
+    if (partitionSchema.isEmpty) {
+      projectSchema(iter, inputSchema, to)
+    } else {
+      val unsafeProjection = 
generateUnsafeProjection(StructType(inputSchema.fields ++ 
partitionSchema.fields), to)
+      val joinedRow = new JoinedRow()
+      iter.map(d => unsafeProjection(joinedRow(d, partitionValues)))
+    }
+  }
+
+  protected def projectSchema(iter: Iterator[InternalRow],
+                              from: StructType,
+                              to: StructType): Iterator[InternalRow] = {
+    val unsafeProjection = generateUnsafeProjection(from, to)
+    iter.map(d => unsafeProjection(d))
+  }
+
+  protected def getLogFilesFromSlice(fileSlice: FileSlice): 
List[HoodieLogFile] = {
+    
fileSlice.getLogFiles.sorted(HoodieLogFile.getLogFileComparator).iterator().asScala.toList
+  }
+
+  private def detectFileFormat(filePath: String): String = {
+    // Logic to detect file format based on the filePath or its content.
+    if (filePath.endsWith(".parquet")) "parquet"
+    else if (filePath.endsWith(".orc")) "orc"
+    else ""
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
index a34a6dfb052..5492d12d5fb 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/ProvidesHoodieConfig.scala
@@ -456,7 +456,7 @@ trait ProvidesHoodieConfig extends Logging {
     hiveSyncConfig.setValue(HiveSyncConfigHolder.HIVE_SYNC_ENABLED.key, 
enableHive.toString)
     hiveSyncConfig.setValue(HiveSyncConfigHolder.HIVE_SYNC_MODE.key, 
props.getString(HiveSyncConfigHolder.HIVE_SYNC_MODE.key, 
HiveSyncMode.HMS.name()))
     hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_PATH, 
hoodieCatalogTable.tableLocation)
-    hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT, 
hoodieCatalogTable.baseFileFormat)
+    hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT, 
props.getString(HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.key, 
HoodieSyncConfig.META_SYNC_BASE_FILE_FORMAT.defaultValue))
     hiveSyncConfig.setValue(HoodieSyncConfig.META_SYNC_DATABASE_NAME, 
hoodieCatalogTable.table.identifier.database.getOrElse("default"))
     hiveSyncConfig.setDefaultValue(HoodieSyncConfig.META_SYNC_TABLE_NAME, 
hoodieCatalogTable.table.identifier.table)
     if (props.get(HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key) != null) {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
index 5804d36ba09..b12c694ce56 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
@@ -74,7 +74,7 @@ class RepairMigratePartitionMetaProcedure extends 
BaseProcedure with ProcedureBu
       if (!dryRun) {
         if (!baseFormatFile.isPresent) {
           val partitionMetadata: HoodiePartitionMetadata = new 
HoodiePartitionMetadata(metaClient.getFs, latestCommit,
-            basePath, partition, 
Option.of(metaClient.getTableConfig.getBaseFileFormat))
+            basePath, partition, 
Option.of(getWriteConfig(basePath.toString).getBaseFileFormat))
           partitionMetadata.trySave(0)
         }
         // delete it, in case we failed midway last time.
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index c2afc73ebac..8df34768909 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -169,7 +169,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     } else {
       df.write().format("parquet").mode(SaveMode.Overwrite).save(srcPath);
     }
-    String filePath = 
FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, 
metaClient.getFs(),
+    String filePath = 
FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(getConfig().getBaseFileFormat(),
 metaClient.getFs(),
             srcPath, context).stream().findAny().map(p -> 
p.getValue().stream().findAny())
             .orElse(null).get().getPath()).toString();
     HoodieAvroParquetReader parquetReader = new 
HoodieAvroParquetReader(metaClient.getHadoopConf(), new Path(filePath));
@@ -273,7 +273,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     client.getTableServiceClient().rollbackFailedBootstrap();
     metaClient.reloadActiveTimeline();
     assertEquals(0, metaClient.getCommitsTimeline().countInstants());
-    assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, 
metaClient.getFs(), basePath, context)
+    assertEquals(0L, 
BootstrapUtils.getAllLeafFoldersWithFiles(config.getBaseFileFormat(), 
metaClient.getFs(), basePath, context)
         .stream().mapToLong(f -> f.getValue().size()).sum());
 
     BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
@@ -300,7 +300,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     String updateSPath = tmpFolder.toAbsolutePath() + "/data2";
     generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, 
partitions, updateSPath);
     JavaRDD<HoodieRecord> updateBatch =
-        generateInputBatch(jsc, 
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), 
updateSPath, context),
+        generateInputBatch(jsc, 
BootstrapUtils.getAllLeafFoldersWithFiles(config.getBaseFileFormat(), 
metaClient.getFs(), updateSPath, context),
                 schema);
     String newInstantTs = client.startCommit();
     client.upsert(updateBatch, newInstantTs);
@@ -373,7 +373,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     bootstrapped.registerTempTable("bootstrapped");
     original.registerTempTable("original");
     if (checkNumRawFiles) {
-      List<HoodieFileStatus> files = 
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(),
+      List<HoodieFileStatus> files = 
BootstrapUtils.getAllLeafFoldersWithFiles(getConfig().getBaseFileFormat(), 
metaClient.getFs(),
           bootstrapBasePath, context).stream().flatMap(x -> 
x.getValue().stream()).collect(Collectors.toList());
       assertEquals(files.size() * numVersions,
           sqlContext.sql("select distinct _hoodie_file_name from 
bootstrapped").count());
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
index 54857e78eb7..abbbd78d064 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestOrcBootstrap.java
@@ -151,7 +151,7 @@ public class TestOrcBootstrap extends 
HoodieSparkClientTestBase {
     } else {
       df.write().format("orc").mode(SaveMode.Overwrite).save(srcPath);
     }
-    String filePath = 
FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, 
metaClient.getFs(),
+    String filePath = 
FileStatusUtils.toPath(BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
 metaClient.getFs(),
         srcPath, context).stream().findAny().map(p -> 
p.getValue().stream().findAny())
         .orElse(null).get().getPath()).toString();
     Reader orcReader = OrcFile.createReader(new Path(filePath), 
OrcFile.readerOptions(metaClient.getHadoopConf()));
@@ -262,7 +262,7 @@ public class TestOrcBootstrap extends 
HoodieSparkClientTestBase {
     client.getTableServiceClient().rollbackFailedBootstrap();
     metaClient.reloadActiveTimeline();
     assertEquals(0, metaClient.getCommitsTimeline().countInstants());
-    assertEquals(0L, BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, 
metaClient.getFs(), basePath, context)
+    assertEquals(0L, 
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
 metaClient.getFs(), basePath, context)
         .stream().flatMap(f -> f.getValue().stream()).count());
 
     BootstrapIndex index = BootstrapIndex.getBootstrapIndex(metaClient);
@@ -289,7 +289,7 @@ public class TestOrcBootstrap extends 
HoodieSparkClientTestBase {
     String updateSPath = tmpFolder.toAbsolutePath().toString() + "/data2";
     generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, 
partitions, updateSPath);
     JavaRDD<HoodieRecord> updateBatch =
-        generateInputBatch(jsc, 
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(), 
updateSPath, context),
+        generateInputBatch(jsc, 
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
 metaClient.getFs(), updateSPath, context),
             schema);
     String newInstantTs = client.startCommit();
     client.upsert(updateBatch, newInstantTs);
@@ -361,7 +361,7 @@ public class TestOrcBootstrap extends 
HoodieSparkClientTestBase {
     bootstrapped.registerTempTable("bootstrapped");
     original.registerTempTable("original");
     if (checkNumRawFiles) {
-      List<HoodieFileStatus> files = 
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient, metaClient.getFs(),
+      List<HoodieFileStatus> files = 
BootstrapUtils.getAllLeafFoldersWithFiles(metaClient.getTableConfig().getBaseFileFormat(),
 metaClient.getFs(),
           bootstrapBasePath, context).stream().flatMap(x -> 
x.getValue().stream()).collect(Collectors.toList());
       assertEquals(files.size() * numVersions,
           sqlContext.sql("select distinct _hoodie_file_name from 
bootstrapped").count());
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
index 4a93245dc8d..28c8df82e8e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/testutils/DataSourceTestUtils.java
@@ -18,16 +18,17 @@
 
 package org.apache.hudi.testutils;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocatedFileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.FileIOUtils;
 
 import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.RowFactory;
@@ -130,17 +131,14 @@ public class DataSourceTestUtils {
    */
   public static boolean isLogFileOnly(String basePath) throws IOException {
     Configuration conf = new Configuration();
-    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
-            .setConf(conf).setBasePath(basePath)
-            .build();
-    String baseDataFormat = 
metaClient.getTableConfig().getBaseFileFormat().getFileExtension();
     Path path = new Path(basePath);
     FileSystem fs = path.getFileSystem(conf);
     RemoteIterator<LocatedFileStatus> files = fs.listFiles(path, true);
     while (files.hasNext()) {
       LocatedFileStatus file = files.next();
-      if (file.isFile()) {
-        if (file.getPath().toString().endsWith(baseDataFormat)) {
+      // skip meta folder
+      if (file.isFile() && 
!file.getPath().toString().contains(HoodieTableMetaClient.METAFOLDER_NAME + 
Path.SEPARATOR)) {
+        if (FSUtils.isBaseFile(file.getPath())) {
           return false;
         }
       }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieMultipleBaseFileFormat.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieMultipleBaseFileFormat.scala
new file mode 100644
index 00000000000..8995f7ec883
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestHoodieMultipleBaseFileFormat.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.common.config.HoodieStorageConfig
+import org.apache.hudi.common.model.{HoodieFileFormat, HoodieTableType}
+import org.apache.hudi.common.table.HoodieTableConfig
+import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.{DEFAULT_FIRST_PARTITION_PATH,
 DEFAULT_SECOND_PARTITION_PATH}
+import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.{DataSourceWriteOptions, HoodieSparkRecordMerger, 
SparkDatasetMixin}
+import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession}
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+/**
+ * Test cases on multiple base file format support for COW and MOR table types.
+ */
+class TestHoodieMultipleBaseFileFormat extends HoodieSparkClientTestBase with 
SparkDatasetMixin {
+
+  var spark: SparkSession = null
+  private val log = LoggerFactory.getLogger(classOf[TestMORDataSource])
+  val commonOpts = Map(
+    "hoodie.insert.shuffle.parallelism" -> "4",
+    "hoodie.upsert.shuffle.parallelism" -> "4",
+    HoodieTableConfig.MULTIPLE_BASE_FILE_FORMATS_ENABLE.key -> "true",
+    DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+    DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition",
+    DataSourceWriteOptions.PRECOMBINE_FIELD.key -> "timestamp",
+    HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+  )
+  val sparkOpts = Map(
+    HoodieWriteConfig.RECORD_MERGER_IMPLS.key -> 
classOf[HoodieSparkRecordMerger].getName,
+    HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key -> "parquet"
+  )
+
+  val verificationCol: String = "driver"
+  val updatedVerificationVal: String = "driver_update"
+
+  @BeforeEach override def setUp() {
+    setTableName("hoodie_test")
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+    initTestDataGenerator()
+    initFileSystem()
+  }
+
+  @AfterEach override def tearDown() = {
+    cleanupSparkContexts()
+    cleanupTestDataGenerator()
+    cleanupFileSystem()
+  }
+
+  @Test
+  def testMultiFileFormatForCOWTableType(): Unit = {
+    insertAndValidateSnapshot(basePath, HoodieTableType.COPY_ON_WRITE.name())
+  }
+
+  @Test
+  def testMultiFileFormatForMORTableType(): Unit = {
+    insertAndValidateSnapshot(basePath, HoodieTableType.MERGE_ON_READ.name())
+  }
+
+  def insertAndValidateSnapshot(basePath: String, tableType: String): Unit = {
+    // Insert records in Parquet format to one of the partitions.
+    val records1 = recordsToStrings(dataGen.generateInsertsForPartition("001", 
10, DEFAULT_FIRST_PARTITION_PATH)).asScala
+    val inputDF1 = spark.read.json(spark.sparkContext.parallelize(records1, 2))
+    inputDF1.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
+      .mode(SaveMode.Overwrite)
+      .save(basePath)
+
+    // Insert records to a new partition in ORC format.
+    val records2 = recordsToStrings(dataGen.generateInsertsForPartition("002", 
10, DEFAULT_SECOND_PARTITION_PATH)).asScala
+    val inputDF2: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(records2, 2))
+    inputDF2.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
+      .option(HoodieTableConfig.BASE_FILE_FORMAT.key, 
HoodieFileFormat.ORC.name())
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    // Snapshot Read the table
+    val hudiDf = spark.read.format("hudi").load(basePath + "/*")
+    assertEquals(0, hudiDf.count())
+
+    // Update and generate new slice across partitions.
+    val records3 = recordsToStrings(dataGen.generateUniqueUpdates("003", 
10)).asScala
+    val inputDF3: Dataset[Row] = 
spark.read.json(spark.sparkContext.parallelize(records3, 2))
+    inputDF3.write.format("hudi")
+      .options(commonOpts)
+      .option(DataSourceWriteOptions.TABLE_TYPE.key, tableType)
+      .mode(SaveMode.Append)
+      .save(basePath)
+
+    // Snapshot Read the table
+    val hudiDfAfterUpdate = spark.read.format("hudi").load(basePath + "/*")
+    assertEquals(0, hudiDfAfterUpdate.count())
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala
 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala
index 7a6cb20c849..861fd43be85 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/execution/datasources/Spark32NestedSchemaPruning.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.hudi.{HoodieBaseRelation, SparkAdapterSupport}
 import org.apache.spark.sql.HoodieSpark3CatalystPlanUtils
+import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
 import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, 
AttributeSet, Expression, NamedExpression, ProjectionOverSchema}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
@@ -52,7 +53,7 @@ class Spark32NestedSchemaPruning extends Rule[LogicalPlan] {
       // NOTE: This is modified to accommodate for Hudi's custom relations, 
given that original
       //       [[NestedSchemaPruning]] rule is tightly coupled w/ 
[[HadoopFsRelation]]
       // TODO generalize to any file-based relation
-      l @ LogicalRelation(relation: HoodieBaseRelation, _, _, _))
+      l @ LogicalRelation(relation: HoodieBaseRelation, _, catalogTable: 
Option[HoodieCatalogTable], _))
         if relation.canPruneRelationSchema =>
 
         prunePhysicalColumns(l.output, projects, filters, relation.dataSchema,
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 0626ac3960f..95534c5533f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -671,13 +671,9 @@ public class HoodieStreamer implements Serializable {
           // This will guarantee there is no surprise with table type
           
checkArgument(tableType.equals(HoodieTableType.valueOf(cfg.tableType)), "Hoodie 
table is of type " + tableType + " but passed in CLI argument is " + 
cfg.tableType);
 
-          // Load base file format
-          // This will guarantee there is no surprise with base file type
-          String baseFileFormat = 
meta.getTableConfig().getBaseFileFormat().toString();
-          checkArgument(baseFileFormat.equals(cfg.baseFileFormat) || 
cfg.baseFileFormat == null,
-              format("Hoodie table's base file format is of type %s but passed 
in CLI argument is %s", baseFileFormat, cfg.baseFileFormat));
-          cfg.baseFileFormat = baseFileFormat;
-          this.cfg.baseFileFormat = baseFileFormat;
+          if (cfg.baseFileFormat == null) {
+            cfg.baseFileFormat = "PARQUET"; // default for backward 
compatibility
+          }
           Map<String, String> propsToValidate = new HashMap<>();
           properties.get().forEach((k, v) -> propsToValidate.put(k.toString(), 
v.toString()));
           HoodieWriterUtils.validateTableConfig(this.sparkSession, 
org.apache.hudi.HoodieConversionUtils.mapAsScalaImmutableMap(propsToValidate), 
meta.getTableConfig());

Reply via email to