This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0abc00df841 [HUDI-6787] Implement the HoodieFileGroupReader API for 
Hive (#10422)
0abc00df841 is described below

commit 0abc00df8412c5ea3d15ab50d5074d8e8bccebcb
Author: Jon Vexler <jbvex...@gmail.com>
AuthorDate: Sat Jun 8 22:22:46 2024 -0400

    [HUDI-6787] Implement the HoodieFileGroupReader API for Hive (#10422)
---
 .../hudi/client/TestPartitionTTLManagement.java    |   2 +-
 .../hudi/table/TestHoodieMergeOnReadTable.java     |   2 +-
 .../TestHoodieSparkMergeOnReadTableCompaction.java |  80 +++---
 .../hudi/common/engine/HoodieReaderContext.java    |  29 ++-
 .../org/apache/hudi/common/model/HoodieRecord.java |   2 +-
 .../org/apache/hudi/hadoop/fs/HadoopFSUtils.java   |  15 +-
 .../hudi/hadoop/HiveHoodieReaderContext.java       | 273 ++++++++++++++++++++
 .../HoodieFileGroupReaderBasedRecordReader.java    | 281 +++++++++++++++++++++
 .../org/apache/hudi/hadoop/HoodieHiveRecord.java   | 221 ++++++++++++++++
 .../apache/hudi/hadoop/HoodieHiveRecordMerger.java |  71 ++++++
 .../hudi/hadoop/HoodieParquetInputFormat.java      |  48 +++-
 .../hudi/hadoop/RecordReaderValueIterator.java     |  13 +-
 .../HoodieCombineRealtimeRecordReader.java         |  51 +++-
 .../realtime/HoodieParquetRealtimeInputFormat.java |  15 +-
 .../hadoop/utils/HoodieArrayWritableAvroUtils.java | 110 ++++++++
 .../hudi/hadoop/utils/HoodieInputFormatUtils.java  |  36 +++
 .../hudi/hadoop/utils/ObjectInspectorCache.java    | 103 ++++++++
 .../hudi/hadoop/TestHoodieParquetInputFormat.java  | 122 ++++-----
 .../hive/TestHoodieCombineHiveInputFormat.java     |  14 +-
 .../TestHoodieMergeOnReadSnapshotReader.java       |   2 +
 .../realtime/TestHoodieRealtimeRecordReader.java   |   2 +
 .../utils/TestHoodieArrayWritableAvroUtils.java    |  88 +++++++
 .../org/apache/hudi/functional/TestBootstrap.java  |   1 +
 .../functional/TestHiveTableSchemaEvolution.java   |   2 +
 .../TestSparkConsistentBucketClustering.java       |   2 +-
 .../streamer/TestHoodieStreamerUtils.java          |  13 +-
 26 files changed, 1470 insertions(+), 128 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java
index cda76154ca6..f4e9d206f06 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestPartitionTTLManagement.java
@@ -182,7 +182,7 @@ public class TestPartitionTTLManagement extends 
HoodieClientTestBase {
   private List<GenericRecord> readRecords(String[] partitions) {
     return HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(storageConf,
         Arrays.stream(partitions).map(p -> Paths.get(basePath, 
p).toString()).collect(Collectors.toList()),
-        basePath, new JobConf(storageConf.unwrap()), true, false);
+        basePath, new JobConf(storageConf.unwrap()), true, true);
   }
 
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index b0876d06103..ae81a310190 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -213,7 +213,7 @@ public class TestHoodieMergeOnReadTable extends 
SparkClientFunctionalTestHarness
           .map(baseFile -> new Path(baseFile.getPath()).getParent().toString())
           .collect(Collectors.toList());
       List<GenericRecord> recordsRead = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(storageConf(), inputPaths,
-          basePath(), new JobConf(storageConf().unwrap()), true, false);
+          basePath(), new JobConf(storageConf().unwrap()), true, 
populateMetaFields);
       // Wrote 20 records in 2 batches
       assertEquals(40, recordsRead.size(), "Must contain 40 records");
     }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
index e2ba56f94a3..ef28980d9cf 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkMergeOnReadTableCompaction.java
@@ -22,6 +22,7 @@ package org.apache.hudi.table.functional;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieKey;
@@ -146,43 +147,50 @@ public class TestHoodieSparkMergeOnReadTableCompaction 
extends SparkClientFuncti
   @ParameterizedTest
   @MethodSource("writeLogTest")
   public void testWriteLogDuringCompaction(boolean enableMetadataTable, 
boolean enableTimelineServer) throws IOException {
-    Properties props = getPropertiesForKeyGen(true);
-    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
-        .forTable("test-trip-table")
-        .withPath(basePath())
-        .withSchema(TRIP_EXAMPLE_SCHEMA)
-        .withParallelism(2, 2)
-        .withAutoCommit(true)
-        .withEmbeddedTimelineServerEnabled(enableTimelineServer)
-        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
-        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-            .withMaxNumDeltaCommitsBeforeCompaction(1).build())
-        .withLayoutConfig(HoodieLayoutConfig.newBuilder()
-            .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
-            
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
-        
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
-        .build();
-    props.putAll(config.getProps());
-
-    metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
-    client = getHoodieWriteClient(config);
-
-    final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
-    JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
+    try {
+      //disable for this test because it seems like we process mor in a 
different order?
+      
jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
 "false");
+      Properties props = getPropertiesForKeyGen(true);
+      HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+          .forTable("test-trip-table")
+          .withPath(basePath())
+          .withSchema(TRIP_EXAMPLE_SCHEMA)
+          .withParallelism(2, 2)
+          .withAutoCommit(true)
+          .withEmbeddedTimelineServerEnabled(enableTimelineServer)
+          
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(enableMetadataTable).build())
+          .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+              .withMaxNumDeltaCommitsBeforeCompaction(1).build())
+          .withLayoutConfig(HoodieLayoutConfig.newBuilder()
+              .withLayoutType(HoodieStorageLayout.LayoutType.BUCKET.name())
+              
.withLayoutPartitioner(SparkBucketIndexPartitioner.class.getName()).build())
+          
.withIndexConfig(HoodieIndexConfig.newBuilder().fromProperties(props).withIndexType(HoodieIndex.IndexType.BUCKET).withBucketNum("1").build())
+          .build();
+      props.putAll(config.getProps());
+
+      metaClient = getHoodieMetaClient(HoodieTableType.MERGE_ON_READ, props);
+      client = getHoodieWriteClient(config);
+
+      final List<HoodieRecord> records = dataGen.generateInserts("001", 100);
+      JavaRDD<HoodieRecord> writeRecords = jsc().parallelize(records, 2);
+
+      // initialize 100 records
+      client.upsert(writeRecords, client.startCommit());
+      // update 100 records
+      client.upsert(writeRecords, client.startCommit());
+      // schedule compaction
+      client.scheduleCompaction(Option.empty());
+      // delete 50 records
+      List<HoodieKey> toBeDeleted = 
records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
+      JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
+      client.delete(deleteRecords, client.startCommit());
+      // insert the same 100 records again
+      client.upsert(writeRecords, client.startCommit());
+      Assertions.assertEquals(100, readTableTotalRecordsNum());
+    } finally {
+      
jsc().hadoopConfiguration().set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
 "true");
+    }
 
-    // initialize 100 records
-    client.upsert(writeRecords, client.startCommit());
-    // update 100 records
-    client.upsert(writeRecords, client.startCommit());
-    // schedule compaction
-    client.scheduleCompaction(Option.empty());
-    // delete 50 records
-    List<HoodieKey> toBeDeleted = 
records.stream().map(HoodieRecord::getKey).limit(50).collect(Collectors.toList());
-    JavaRDD<HoodieKey> deleteRecords = jsc().parallelize(toBeDeleted, 2);
-    client.delete(deleteRecords, client.startCommit());
-    // insert the same 100 records again
-    client.upsert(writeRecords, client.startCommit());
-    Assertions.assertEquals(100, readTableTotalRecordsNum());
   }
 
   private long readTableTotalRecordsNum() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index 218e0eb4b03..b79562f8b43 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.read.HoodieFileGroupReaderSchemaHandler;
+import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.storage.HoodieStorage;
@@ -38,6 +39,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.function.UnaryOperator;
 
+import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
+
 /**
  * An abstract reader context class for {@code HoodieFileGroupReader} to use, 
containing APIs for
  * engine-specific implementation on reading data files, getting field values 
from a record,
@@ -197,7 +200,10 @@ public abstract class HoodieReaderContext<T> {
    * @param schema The Avro schema of the record.
    * @return The record key in String.
    */
-  public abstract String getRecordKey(T record, Schema schema);
+  public String getRecordKey(T record, Schema schema) {
+    Object val = getValue(record, schema, RECORD_KEY_METADATA_FIELD);
+    return val.toString();
+  }
 
   /**
    * Gets the ordering value in particular type.
@@ -208,10 +214,23 @@ public abstract class HoodieReaderContext<T> {
    * @param props        Properties.
    * @return The ordering value.
    */
-  public abstract Comparable getOrderingValue(Option<T> recordOption,
-                                              Map<String, Object> metadataMap,
-                                              Schema schema,
-                                              TypedProperties props);
+  public Comparable getOrderingValue(Option<T> recordOption,
+                                     Map<String, Object> metadataMap,
+                                     Schema schema,
+                                     TypedProperties props) {
+    if (metadataMap.containsKey(INTERNAL_META_ORDERING_FIELD)) {
+      return (Comparable) metadataMap.get(INTERNAL_META_ORDERING_FIELD);
+    }
+
+    if (!recordOption.isPresent()) {
+      return 0;
+    }
+
+    String orderingFieldName = ConfigUtils.getOrderingField(props);
+    Object value = getValue(recordOption.get(), schema, orderingFieldName);
+    return value != null ? (Comparable) value : 0;
+
+  }
 
   /**
    * Constructs a new {@link HoodieRecord} based on the record of 
engine-specific type and metadata for merging.
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
index 76e640d927a..442419d0713 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieRecord.java
@@ -476,6 +476,6 @@ public abstract class HoodieRecord<T> implements 
HoodieRecordCompatibilityInterf
   }
 
   public enum HoodieRecordType {
-    AVRO, SPARK
+    AVRO, SPARK, HIVE
   }
 }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
index 9b087482c72..e8e92f6b420 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/hadoop/fs/HadoopFSUtils.java
@@ -61,6 +61,8 @@ import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.fs.FSUtils.LOG_FILE_PATTERN;
+
 /**
  * Utility functions related to accessing the file storage on Hadoop.
  */
@@ -377,13 +379,24 @@ public class HadoopFSUtils {
    * the file name.
    */
   public static String getFileIdFromLogPath(Path path) {
-    Matcher matcher = FSUtils.LOG_FILE_PATTERN.matcher(path.getName());
+    Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
     if (!matcher.find()) {
       throw new InvalidHoodiePathException(path.toString(), "LogFile");
     }
     return matcher.group(1);
   }
 
+  /**
+   * Get the second part of the file name in the log file. That will be the 
delta commit time.
+   */
+  public static String getDeltaCommitTimeFromLogPath(Path path) {
+    Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName());
+    if (!matcher.find()) {
+      throw new InvalidHoodiePathException(path.toString(), "LogFile");
+    }
+    return matcher.group(2);
+  }
+
   /**
    * Check if the file is a base file of a log file. Then get the fileId 
appropriately.
    */
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
new file mode 100644
index 00000000000..904d4882cc9
--- /dev/null
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.HoodieStorageUtils;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getPartitionFieldNames;
+
+/**
+ * {@link HoodieReaderContext} for Hive-specific {@link 
HoodieFileGroupReaderBasedRecordReader}.
+ */
+public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable> {
+  protected final HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator 
readerCreator;
+  protected final InputSplit split;
+  protected final JobConf jobConf;
+  protected final Reporter reporter;
+  protected final Schema writerSchema;
+  protected Map<String, String[]> hosts;
+  protected final Map<String, TypeInfo> columnTypeMap;
+  private final ObjectInspectorCache objectInspectorCache;
+  private RecordReader<NullWritable, ArrayWritable> firstRecordReader = null;
+
+  private final List<String> partitionCols;
+  private final Set<String> partitionColSet;
+
+  private final String recordKeyField;
+
+  protected 
HiveHoodieReaderContext(HoodieFileGroupReaderBasedRecordReader.HiveReaderCreator
 readerCreator,
+                                    InputSplit split,
+                                    JobConf jobConf,
+                                    Reporter reporter,
+                                    Schema writerSchema,
+                                    Map<String, String[]> hosts,
+                                    HoodieTableMetaClient metaClient) {
+    this.readerCreator = readerCreator;
+    this.split = split;
+    this.jobConf = jobConf;
+    this.reporter = reporter;
+    this.writerSchema = writerSchema;
+    this.hosts = hosts;
+    this.partitionCols = getPartitionFieldNames(jobConf).stream().filter(n -> 
writerSchema.getField(n) != null).collect(Collectors.toList());
+    this.partitionColSet = new HashSet<>(this.partitionCols);
+    String tableName = metaClient.getTableConfig().getTableName();
+    recordKeyField = getRecordKeyField(metaClient);
+    this.objectInspectorCache = 
HoodieArrayWritableAvroUtils.getCacheForTable(tableName, writerSchema, jobConf);
+    this.columnTypeMap = objectInspectorCache.getColumnTypeMap();
+  }
+
+  /**
+   * If populate meta fields is false, then getRecordKeyFields()
+   * should return exactly 1 recordkey field.
+   */
+  private static String getRecordKeyField(HoodieTableMetaClient metaClient) {
+    if (metaClient.getTableConfig().populateMetaFields()) {
+      return HoodieRecord.RECORD_KEY_METADATA_FIELD;
+    }
+
+    Option<String[]> recordKeyFieldsOpt = 
metaClient.getTableConfig().getRecordKeyFields();
+    ValidationUtils.checkArgument(recordKeyFieldsOpt.isPresent(), "No record 
key field set in table config, but populateMetaFields is disabled");
+    ValidationUtils.checkArgument(recordKeyFieldsOpt.get().length == 1, "More 
than 1 record key set in table config, but populateMetaFields is disabled");
+    return recordKeyFieldsOpt.get()[0];
+  }
+
+  private void setSchemas(JobConf jobConf, Schema dataSchema, Schema 
requiredSchema) {
+    List<String> dataColumnNameList = dataSchema.getFields().stream().map(f -> 
f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toList());
+    List<TypeInfo> dataColumnTypeList = 
dataColumnNameList.stream().map(fieldName -> {
+      TypeInfo type = columnTypeMap.get(fieldName);
+      if (type == null) {
+        throw new IllegalArgumentException("Field: " + fieldName + ", does not 
have a defined type");
+      }
+      return type;
+    }).collect(Collectors.toList());
+    jobConf.set(serdeConstants.LIST_COLUMNS, String.join(",", 
dataColumnNameList));
+    jobConf.set(serdeConstants.LIST_COLUMN_TYPES, 
dataColumnTypeList.stream().map(TypeInfo::getQualifiedName).collect(Collectors.joining(",")));
+    // don't replace `f -> f.name()` with lambda reference
+    String readColNames = requiredSchema.getFields().stream().map(f -> 
f.name()).collect(Collectors.joining(","));
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, 
readColNames);
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, 
requiredSchema.getFields()
+        .stream().map(f -> 
String.valueOf(dataSchema.getField(f.name()).pos())).collect(Collectors.joining(",")));
+  }
+
+  @Override
+  public HoodieStorage getStorage(String path, StorageConfiguration<?> conf) {
+    return HoodieStorageUtils.getStorage(path, conf);
+  }
+
+  @Override
+  public ClosableIterator<ArrayWritable> getFileRecordIterator(StoragePath 
filePath, long start, long length, Schema dataSchema, Schema requiredSchema, 
HoodieStorage storage) throws IOException {
+    JobConf jobConfCopy = new JobConf(jobConf);
+    //move the partition cols to the end, because in some cases it has issues 
if we don't do that
+    Schema modifiedDataSchema = 
HoodieAvroUtils.generateProjectionSchema(dataSchema, 
Stream.concat(dataSchema.getFields().stream()
+            .map(f -> f.name().toLowerCase(Locale.ROOT)).filter(n -> 
!partitionColSet.contains(n)),
+        partitionCols.stream().filter(c -> dataSchema.getField(c) != 
null)).collect(Collectors.toList()));
+    setSchemas(jobConfCopy, modifiedDataSchema, requiredSchema);
+    InputSplit inputSplit = new FileSplit(new Path(filePath.toString()), 
start, length, hosts.get(filePath.toString()));
+    RecordReader<NullWritable, ArrayWritable> recordReader = 
readerCreator.getRecordReader(inputSplit, jobConfCopy, reporter);
+    if (firstRecordReader == null) {
+      firstRecordReader = recordReader;
+    }
+    ClosableIterator<ArrayWritable> recordIterator = new 
RecordReaderValueIterator<>(recordReader);
+    if (modifiedDataSchema.equals(requiredSchema)) {
+      return recordIterator;
+    }
+    // record reader puts the required columns in the positions of the data 
schema and nulls the rest of the columns
+    return new CloseableMappingIterator<>(recordIterator, 
projectRecord(modifiedDataSchema, requiredSchema));
+  }
+
+  @Override
+  public ArrayWritable convertAvroRecord(IndexedRecord avroRecord) {
+    return (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, 
avroRecord.getSchema(), true);
+  }
+
+  @Override
+  public HoodieRecordMerger getRecordMerger(String mergerStrategy) {
+    if (mergerStrategy.equals(DEFAULT_MERGER_STRATEGY_UUID)) {
+      return new HoodieHiveRecordMerger();
+    }
+    throw new HoodieException(String.format("The merger strategy UUID is not 
supported, Default: %s, Passed: %s", mergerStrategy, 
DEFAULT_MERGER_STRATEGY_UUID));
+  }
+
+  @Override
+  public String getRecordKey(ArrayWritable record, Schema schema) {
+    return getValue(record, schema, recordKeyField).toString();
+  }
+
+  @Override
+  public Object getValue(ArrayWritable record, Schema schema, String 
fieldName) {
+    return StringUtils.isNullOrEmpty(fieldName) ? null : 
objectInspectorCache.getValue(record, schema, fieldName);
+  }
+
+  @Override
+  public HoodieRecord<ArrayWritable> 
constructHoodieRecord(Option<ArrayWritable> recordOption, Map<String, Object> 
metadataMap) {
+    if (!recordOption.isPresent()) {
+      return new HoodieEmptyRecord<>(new HoodieKey((String) 
metadataMap.get(INTERNAL_META_RECORD_KEY), (String) 
metadataMap.get(INTERNAL_META_PARTITION_PATH)), 
HoodieRecord.HoodieRecordType.HIVE);
+    }
+    Schema schema = (Schema) metadataMap.get(INTERNAL_META_SCHEMA);
+    ArrayWritable writable = recordOption.get();
+    return new HoodieHiveRecord(new HoodieKey((String) 
metadataMap.get(INTERNAL_META_RECORD_KEY), (String) 
metadataMap.get(INTERNAL_META_PARTITION_PATH)), writable, schema, 
objectInspectorCache);
+  }
+
+  @Override
+  public ArrayWritable seal(ArrayWritable record) {
+    return new ArrayWritable(Writable.class, Arrays.copyOf(record.get(), 
record.get().length));
+  }
+
+  @Override
+  public ClosableIterator<ArrayWritable> 
mergeBootstrapReaders(ClosableIterator<ArrayWritable> skeletonFileIterator,
+                                                               Schema 
skeletonRequiredSchema,
+                                                               
ClosableIterator<ArrayWritable> dataFileIterator,
+                                                               Schema 
dataRequiredSchema) {
+    int skeletonLen = skeletonRequiredSchema.getFields().size();
+    int dataLen = dataRequiredSchema.getFields().size();
+    return new ClosableIterator<ArrayWritable>() {
+
+      private final ArrayWritable returnWritable = new 
ArrayWritable(Writable.class);
+
+      @Override
+      public boolean hasNext() {
+        if (dataFileIterator.hasNext() != skeletonFileIterator.hasNext()) {
+          throw new IllegalStateException("bootstrap data file iterator and 
skeleton file iterator are out of sync");
+        }
+        return dataFileIterator.hasNext();
+      }
+
+      @Override
+      public ArrayWritable next() {
+        Writable[] skeletonWritable = skeletonFileIterator.next().get();
+        Writable[] dataWritable = dataFileIterator.next().get();
+        Writable[] mergedWritable = new Writable[skeletonLen + dataLen];
+        System.arraycopy(skeletonWritable, 0, mergedWritable, 0, skeletonLen);
+        System.arraycopy(dataWritable, 0, mergedWritable, skeletonLen, 
dataLen);
+        returnWritable.set(mergedWritable);
+        return returnWritable;
+      }
+
+      @Override
+      public void close() {
+        skeletonFileIterator.close();
+        dataFileIterator.close();
+      }
+    };
+  }
+
+  @Override
+  public UnaryOperator<ArrayWritable> projectRecord(Schema from, Schema to, 
Map<String, String> renamedColumns) {
+    if (!renamedColumns.isEmpty()) {
+      throw new IllegalStateException("Schema evolution is not supported in 
the filegroup reader for Hive currently");
+    }
+    return HoodieArrayWritableAvroUtils.projectRecord(from, to);
+  }
+
+  public UnaryOperator<ArrayWritable> reverseProjectRecord(Schema from, Schema 
to) {
+    return HoodieArrayWritableAvroUtils.reverseProject(from, to);
+  }
+
+  public long getPos() throws IOException {
+    if (firstRecordReader != null) {
+      return firstRecordReader.getPos();
+    }
+    throw new IllegalStateException("getPos() should not be called before a 
record reader has been initialized");
+  }
+
+  public float getProgress() throws IOException {
+    if (firstRecordReader != null) {
+      return firstRecordReader.getProgress();
+    }
+    throw new IllegalStateException("getProgress() should not be called before 
a record reader has been initialized");
+  }
+
+}
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
new file mode 100644
index 00000000000..efbf68c8e0f
--- /dev/null
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieFileGroupReaderBasedRecordReader.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.BaseFile;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE;
+import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
+import static org.apache.hudi.common.fs.FSUtils.getCommitTime;
+import static org.apache.hudi.common.fs.FSUtils.getFileId;
+import static org.apache.hudi.common.util.StringUtils.EMPTY_STRING;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePathInfo;
+import static 
org.apache.hudi.hadoop.fs.HadoopFSUtils.getDeltaCommitTimeFromLogPath;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getFileIdFromLogPath;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getFs;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getRelativePartitionPath;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.isLogFile;
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getPartitionFieldNames;
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.getTableBasePath;
+
+/**
+ * {@link HoodieFileGroupReader} based implementation of Hive's {@link 
RecordReader} for {@link ArrayWritable}.
+ */
+public class HoodieFileGroupReaderBasedRecordReader implements 
RecordReader<NullWritable, ArrayWritable> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HoodieFileGroupReaderBasedRecordReader.class);
+
+  public interface HiveReaderCreator {
+    org.apache.hadoop.mapred.RecordReader<NullWritable, ArrayWritable> 
getRecordReader(
+        final org.apache.hadoop.mapred.InputSplit split,
+        final org.apache.hadoop.mapred.JobConf job,
+        final org.apache.hadoop.mapred.Reporter reporter
+    ) throws IOException;
+  }
+
+  private final HiveHoodieReaderContext readerContext;
+  private final HoodieFileGroupReader<ArrayWritable> fileGroupReader;
+  private final ArrayWritable arrayWritable;
+  private final NullWritable nullWritable = NullWritable.get();
+  private final InputSplit inputSplit;
+  private final JobConf jobConfCopy;
+  private final UnaryOperator<ArrayWritable> reverseProjection;
+
+  public HoodieFileGroupReaderBasedRecordReader(HiveReaderCreator 
readerCreator,
+                                                final InputSplit split,
+                                                final JobConf jobConf,
+                                                final Reporter reporter) 
throws IOException {
+    this.jobConfCopy = new JobConf(jobConf);
+    HoodieRealtimeInputFormatUtils.cleanProjectionColumnIds(jobConfCopy);
+    Set<String> partitionColumns = new 
HashSet<>(getPartitionFieldNames(jobConfCopy));
+    this.inputSplit = split;
+
+    FileSplit fileSplit = (FileSplit) split;
+    String tableBasePath = getTableBasePath(split, jobConfCopy);
+    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
+        .setConf(getStorageConf(jobConfCopy))
+        .setBasePath(tableBasePath)
+        .build();
+    String latestCommitTime = getLatestCommitTime(split, metaClient);
+    Schema tableSchema = getLatestTableSchema(metaClient, jobConfCopy, 
latestCommitTime);
+    Schema requestedSchema = createRequestedSchema(tableSchema, jobConfCopy);
+    Map<String, String[]> hosts = new HashMap<>();
+    this.readerContext = new HiveHoodieReaderContext(readerCreator, split, 
jobConfCopy, reporter, tableSchema, hosts, metaClient);
+    this.arrayWritable = new ArrayWritable(Writable.class, new 
Writable[requestedSchema.getFields().size()]);
+    // get some config values
+    long maxMemoryForMerge = jobConf.getLong(MAX_MEMORY_FOR_MERGE.key(), 
MAX_MEMORY_FOR_MERGE.defaultValue());
+    String spillableMapPath = jobConf.get(SPILLABLE_MAP_BASE_PATH.key(), 
FileIOUtils.getDefaultSpillableMapBasePath());
+    ExternalSpillableMap.DiskMapType spillMapType = 
ExternalSpillableMap.DiskMapType.valueOf(jobConf.get(SPILLABLE_DISK_MAP_TYPE.key(),
+        
SPILLABLE_DISK_MAP_TYPE.defaultValue().name()).toUpperCase(Locale.ROOT));
+    boolean bitmaskCompressEnabled = 
jobConf.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
+        DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
+    LOG.debug("Creating HoodieFileGroupReaderRecordReader with 
tableBasePath={}, latestCommitTime={}, fileSplit={}", tableBasePath, 
latestCommitTime, fileSplit.getPath());
+    this.fileGroupReader = new HoodieFileGroupReader<>(readerContext, 
metaClient.getStorage(), tableBasePath,
+        latestCommitTime, getFileSliceFromSplit(fileSplit, hosts, 
getFs(tableBasePath, jobConfCopy), tableBasePath),
+        tableSchema, requestedSchema, Option.empty(), metaClient, 
metaClient.getTableConfig().getProps(), metaClient.getTableConfig(), 
fileSplit.getStart(),
+        fileSplit.getLength(), false, maxMemoryForMerge, spillableMapPath, 
spillMapType, bitmaskCompressEnabled);
+    this.fileGroupReader.initRecordIterators();
+    // it expects the partition columns to be at the end
+    Schema outputSchema = HoodieAvroUtils.generateProjectionSchema(tableSchema,
+        Stream.concat(tableSchema.getFields().stream().map(f -> 
f.name().toLowerCase(Locale.ROOT)).filter(n -> !partitionColumns.contains(n)),
+            partitionColumns.stream()).collect(Collectors.toList()));
+    this.reverseProjection = 
readerContext.reverseProjectRecord(requestedSchema, outputSchema);
+  }
+
+  @Override
+  public boolean next(NullWritable key, ArrayWritable value) throws 
IOException {
+    if (!fileGroupReader.hasNext()) {
+      return false;
+    }
+    value.set(fileGroupReader.next().get());
+    reverseProjection.apply(value);
+    return true;
+  }
+
+  @Override
+  public NullWritable createKey() {
+    return nullWritable;
+  }
+
+  @Override
+  public ArrayWritable createValue() {
+    return arrayWritable;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return readerContext.getPos();
+  }
+
+  @Override
+  public void close() throws IOException {
+    fileGroupReader.close();
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return readerContext.getProgress();
+  }
+
+  public RealtimeSplit getSplit() {
+    return (RealtimeSplit) inputSplit;
+  }
+
+  public JobConf getJobConf() {
+    return jobConfCopy;
+  }
+
+  private static Schema getLatestTableSchema(HoodieTableMetaClient metaClient, 
JobConf jobConf, String latestCommitTime) {
+    TableSchemaResolver tableSchemaResolver = new 
TableSchemaResolver(metaClient);
+    try {
+      Schema schema = tableSchemaResolver.getTableAvroSchema(latestCommitTime);
+      // Add partitioning fields to writer schema for resulting row to contain 
null values for these fields
+      return HoodieRealtimeRecordReaderUtils.addPartitionFields(schema, 
getPartitionFieldNames(jobConf));
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to get table schema", e);
+    }
+  }
+
+  private static String getLatestCommitTime(InputSplit split, 
HoodieTableMetaClient metaClient) {
+    if (split instanceof RealtimeSplit) {
+      return ((RealtimeSplit) split).getMaxCommitTime();
+    }
+    Option<HoodieInstant> lastInstant = 
metaClient.getCommitsTimeline().lastInstant();
+    if (lastInstant.isPresent()) {
+      return lastInstant.get().getTimestamp();
+    } else {
+      return EMPTY_STRING;
+    }
+  }
+
+  /**
+   * Convert FileSplit to FileSlice, but save the locations in 'hosts' because 
that data is otherwise lost.
+   */
+  private static FileSlice getFileSliceFromSplit(FileSplit split, Map<String, 
String[]> hosts, FileSystem fs, String tableBasePath) throws IOException {
+    BaseFile bootstrapBaseFile = createBootstrapBaseFile(split, hosts, fs);
+    if (split instanceof RealtimeSplit) {
+      // MOR
+      RealtimeSplit realtimeSplit = (RealtimeSplit) split;
+      boolean isLogFile = isLogFile(realtimeSplit.getPath());
+      String fileID;
+      String commitTime;
+      if (isLogFile) {
+        fileID = getFileIdFromLogPath(realtimeSplit.getPath());
+        commitTime = getDeltaCommitTimeFromLogPath(realtimeSplit.getPath());
+      } else {
+        fileID = getFileId(realtimeSplit.getPath().getName());
+        commitTime = getCommitTime(realtimeSplit.getPath().toString());
+      }
+      HoodieFileGroupId fileGroupId = new 
HoodieFileGroupId(getRelativePartitionPath(new 
Path(realtimeSplit.getBasePath()), realtimeSplit.getPath()), fileID);
+      if (isLogFile) {
+        return new FileSlice(fileGroupId, commitTime, null, 
realtimeSplit.getDeltaLogFiles());
+      }
+      hosts.put(realtimeSplit.getPath().toString(), 
realtimeSplit.getLocations());
+      HoodieBaseFile hoodieBaseFile = new 
HoodieBaseFile(convertToStoragePathInfo(fs.getFileStatus(realtimeSplit.getPath())),
 bootstrapBaseFile);
+      return new FileSlice(fileGroupId, commitTime, hoodieBaseFile, 
realtimeSplit.getDeltaLogFiles());
+    }
+    // COW
+    HoodieFileGroupId fileGroupId = new 
HoodieFileGroupId(getFileId(split.getPath().getName()), 
getRelativePartitionPath(new Path(tableBasePath), split.getPath()));
+    hosts.put(split.getPath().toString(), split.getLocations());
+    return new FileSlice(
+        fileGroupId,
+        getCommitTime(split.getPath().toString()),
+        new 
HoodieBaseFile(convertToStoragePathInfo(fs.getFileStatus(split.getPath())), 
bootstrapBaseFile),
+        Collections.emptyList());
+  }
+
+  private static BaseFile createBootstrapBaseFile(FileSplit split, Map<String, 
String[]> hosts, FileSystem fs) throws IOException {
+    if (split instanceof BootstrapBaseFileSplit) {
+      BootstrapBaseFileSplit bootstrapBaseFileSplit = (BootstrapBaseFileSplit) 
split;
+      FileSplit bootstrapFileSplit = 
bootstrapBaseFileSplit.getBootstrapFileSplit();
+      hosts.put(bootstrapFileSplit.getPath().toString(), 
bootstrapFileSplit.getLocations());
+      return new 
BaseFile(convertToStoragePathInfo(fs.getFileStatus(bootstrapFileSplit.getPath())));
+    }
+    return null;
+  }
+
+  private static Schema createRequestedSchema(Schema tableSchema, JobConf 
jobConf) {
+    String readCols = 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR);
+    if (StringUtils.isNullOrEmpty(readCols)) {
+      Schema emptySchema = Schema.createRecord(tableSchema.getName(), 
tableSchema.getDoc(),
+          tableSchema.getNamespace(), tableSchema.isError());
+      emptySchema.setFields(Collections.emptyList());
+      return emptySchema;
+    }
+    // hive will handle the partition cols
+    String partitionColString = 
jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
+    Set<String> partitionColumns;
+    if (partitionColString == null) {
+      partitionColumns = Collections.emptySet();
+    } else {
+      partitionColumns = 
Arrays.stream(partitionColString.split(",")).collect(Collectors.toSet());
+    }
+    // if they are actually written to the file, then it is ok to read them 
from the file
+    tableSchema.getFields().forEach(f -> 
partitionColumns.remove(f.name().toLowerCase(Locale.ROOT)));
+    return HoodieAvroUtils.generateProjectionSchema(tableSchema,
+        
Arrays.stream(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR).split(",")).filter(c
 -> !partitionColumns.contains(c)).collect(Collectors.toList()));
+  }
+}
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
new file mode 100644
index 00000000000..a2fb08fd614
--- /dev/null
+++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecord.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieOperation;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.MetadataValues;
+import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.hadoop.utils.HoodieArrayWritableAvroUtils;
+import org.apache.hudi.hadoop.utils.ObjectInspectorCache;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * {@link HoodieRecord} implementation for Hive records of {@link 
ArrayWritable}.
+ */
+public class HoodieHiveRecord extends HoodieRecord<ArrayWritable> {
+
+  private boolean copy;
+  private final boolean isDeleted;
+
+  public boolean isDeleted() {
+    return isDeleted;
+  }
+
+  private final ArrayWritableObjectInspector objectInspector;
+
+  private final ObjectInspectorCache objectInspectorCache;
+
+  protected Schema schema;
+
+  public HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
ObjectInspectorCache objectInspectorCache) {
+    super(key, data);
+    this.objectInspector = objectInspectorCache.getObjectInspector(schema);
+    this.objectInspectorCache = objectInspectorCache;
+    this.schema = schema;
+    this.copy = false;
+    isDeleted = data == null;
+  }
+
+  private HoodieHiveRecord(HoodieKey key, ArrayWritable data, Schema schema, 
HoodieOperation operation, boolean isCopy,
+                           ArrayWritableObjectInspector objectInspector, 
ObjectInspectorCache objectInspectorCache) {
+    super(key, data, operation, Option.empty());
+    this.schema = schema;
+    this.copy = isCopy;
+    isDeleted = data == null;
+    this.objectInspector = objectInspector;
+    this.objectInspectorCache = objectInspectorCache;
+  }
+
+  @Override
+  public HoodieRecord<ArrayWritable> newInstance() {
+    return new HoodieHiveRecord(this.key, this.data, this.schema, 
this.operation, this.copy, this.objectInspector, this.objectInspectorCache);
+  }
+
+  @Override
+  public HoodieRecord<ArrayWritable> newInstance(HoodieKey key, 
HoodieOperation op) {
+    throw new UnsupportedOperationException("ObjectInspector is needed for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public HoodieRecord<ArrayWritable> newInstance(HoodieKey key) {
+    throw new UnsupportedOperationException("ObjectInspector is needed for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public Comparable<?> getOrderingValue(Schema recordSchema, Properties props) 
{
+    String orderingField = ConfigUtils.getOrderingField(props);
+    if (orderingField == null) {
+      return 0;
+      //throw new IllegalArgumentException("Ordering Field is not set. 
Precombine must be set. (If you are using a custom record merger it might be 
something else)");
+    }
+    return (Comparable<?>) getValue(ConfigUtils.getOrderingField(props));
+  }
+
+  @Override
+  public HoodieRecordType getRecordType() {
+    return HoodieRecordType.HIVE;
+  }
+
+  @Override
+  public String getRecordKey(Schema recordSchema, Option<BaseKeyGenerator> 
keyGeneratorOpt) {
+    throw new UnsupportedOperationException("Not supported for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public String getRecordKey(Schema recordSchema, String keyFieldName) {
+    throw new UnsupportedOperationException("Not supported for 
HoodieHiveRecord");
+  }
+
+  @Override
+  protected void writeRecordPayload(ArrayWritable payload, Kryo kryo, Output 
output) {
+    throw new UnsupportedOperationException("Not supported for 
HoodieHiveRecord");
+  }
+
+  @Override
+  protected ArrayWritable readRecordPayload(Kryo kryo, Input input) {
+    throw new UnsupportedOperationException("Not supported for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public Object[] getColumnValues(Schema recordSchema, String[] columns, 
boolean consistentLogicalTimestampEnabled) {
+    Object[] objects = new Object[columns.length];
+    for (int i = 0; i < objects.length; i++) {
+      objects[i] = getValue(columns[i]);
+    }
+    return objects;
+  }
+
+  @Override
+  public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
+    throw new UnsupportedOperationException("Not supported for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public HoodieRecord prependMetaFields(Schema recordSchema, Schema 
targetSchema, MetadataValues metadataValues, Properties props) {
+    throw new UnsupportedOperationException("Not supported for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, 
Properties props, Schema newSchema, Map<String, String> renameCols) {
+    throw new UnsupportedOperationException("Not supported for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public boolean isDelete(Schema recordSchema, Properties props) throws 
IOException {
+    if (null == data) {
+      return true;
+    }
+    if (recordSchema.getField(HoodieRecord.HOODIE_IS_DELETED_FIELD) == null) {
+      return false;
+    }
+    Object deleteMarker = getValue(HoodieRecord.HOODIE_IS_DELETED_FIELD);
+    return deleteMarker instanceof BooleanWritable && ((BooleanWritable) 
deleteMarker).get();
+  }
+
+  @Override
+  public boolean shouldIgnore(Schema recordSchema, Properties props) throws 
IOException {
+    return false;
+  }
+
+  @Override
+  public HoodieRecord<ArrayWritable> copy() {
+    if (!copy) {
+      this.data = new ArrayWritable(Writable.class, 
Arrays.copyOf(this.data.get(), this.data.get().length));
+      this.copy = true;
+    }
+    return this;
+  }
+
+  @Override
+  public Option<Map<String, String>> getMetadata() {
+    // TODO HUDI-5282 support metaData
+    return Option.empty();
+  }
+
+  @Override
+  public HoodieRecord wrapIntoHoodieRecordPayloadWithParams(Schema 
recordSchema, Properties props, Option<Pair<String, String>> 
simpleKeyGenFieldsOpt, Boolean withOperation,
+                                                            Option<String> 
partitionNameOp, Boolean populateMetaFieldsOp, Option<Schema> 
schemaWithoutMetaFields) throws IOException {
+    throw new UnsupportedOperationException("Not supported for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema 
recordSchema, Properties props, Option<BaseKeyGenerator> keyGen) {
+    throw new UnsupportedOperationException("Not supported for 
HoodieHiveRecord");
+  }
+
+  @Override
+  public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, 
String keyFieldName) throws IOException {
+    data.get()[recordSchema.getIndexNamed(keyFieldName)] = new Text();
+    return this;
+  }
+
+  @Override
+  public Option<HoodieAvroIndexedRecord> toIndexedRecord(Schema recordSchema, 
Properties props) throws IOException {
+    throw new UnsupportedOperationException("Not supported for 
HoodieHiveRecord");
+  }
+
+  private Object getValue(String name) {
+    return HoodieArrayWritableAvroUtils.getWritableValue(data, 
objectInspector, name);
+  }
+
+  protected Schema getSchema() {
+    return schema;
+  }
+}
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecordMerger.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecordMerger.java
new file mode 100644
index 00000000000..17a4738569e
--- /dev/null
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHiveRecordMerger.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.collection.Pair;
+
+import org.apache.avro.Schema;
+
+import java.io.IOException;
+
+public class HoodieHiveRecordMerger implements HoodieRecordMerger {
+  @Override
+  public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema 
oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws 
IOException {
+    ValidationUtils.checkArgument(older.getRecordType() == 
HoodieRecord.HoodieRecordType.HIVE);
+    ValidationUtils.checkArgument(newer.getRecordType() == 
HoodieRecord.HoodieRecordType.HIVE);
+    if (newer instanceof HoodieHiveRecord) {
+      HoodieHiveRecord newHiveRecord = (HoodieHiveRecord) newer;
+      if (newHiveRecord.isDeleted()) {
+        return Option.empty();
+      }
+    } else if (newer.getData() == null) {
+      return Option.empty();
+    }
+
+    if (older instanceof HoodieHiveRecord) {
+      HoodieHiveRecord oldHiveRecord = (HoodieHiveRecord) older;
+      if (oldHiveRecord.isDeleted()) {
+        return Option.of(Pair.of(newer, newSchema));
+      }
+    } else if (older.getData() == null) {
+      return Option.empty();
+    }
+    if (older.getOrderingValue(oldSchema, 
props).compareTo(newer.getOrderingValue(newSchema, props)) > 0) {
+      return Option.of(Pair.of(older, oldSchema));
+    } else {
+      return Option.of(Pair.of(newer, newSchema));
+    }
+  }
+
+  @Override
+  public HoodieRecord.HoodieRecordType getRecordType() {
+    return HoodieRecord.HoodieRecordType.HIVE;
+  }
+
+  @Override
+  public String getMergingStrategy() {
+    return HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
+  }
+}
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
index 9e656529904..18b9e221978 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieParquetInputFormat.java
@@ -20,10 +20,15 @@ package org.apache.hudi.hadoop;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.avro.HoodieTimestampAwareParquetInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
 import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
@@ -35,9 +40,7 @@ import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
 import org.apache.parquet.hadoop.ParquetInputFormat;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,6 +51,11 @@ import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.hudi.common.util.TablePathUtils.getTablePath;
+import static org.apache.hudi.common.util.TablePathUtils.isHoodieTablePath;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.convertToStoragePath;
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.shouldUseFilegroupReader;
+
 /**
  * HoodieInputFormat which understands the Hoodie File Structure and filters 
files based on the Hoodie Mode. If paths
  * that does not correspond to a hoodie table then they are passed in as is 
(as what FileInputFormat.listStatus()
@@ -91,9 +99,42 @@ public class HoodieParquetInputFormat extends 
HoodieParquetInputFormatBase {
     }
   }
 
+  private static boolean checkIfHudiTable(final InputSplit split, final 
JobConf job) {
+    try {
+      Path inputPath = ((FileSplit) split).getPath();
+      FileSystem fs = inputPath.getFileSystem(job);
+      HoodieStorage storage = new HoodieHadoopStorage(fs);
+      return getTablePath(storage, convertToStoragePath(inputPath))
+          .map(path -> isHoodieTablePath(storage, path)).orElse(false);
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
   @Override
   public RecordReader<NullWritable, ArrayWritable> getRecordReader(final 
InputSplit split, final JobConf job,
                                                                    final 
Reporter reporter) throws IOException {
+    HoodieRealtimeInputFormatUtils.addProjectionField(job, 
job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "").split("/"));
+    if (shouldUseFilegroupReader(job)) {
+      try {
+        if (!(split instanceof FileSplit) || !checkIfHudiTable(split, job)) {
+          return super.getRecordReader(split, job, reporter);
+        }
+        if (supportAvroRead && 
HoodieColumnProjectionUtils.supportTimestamp(job)) {
+          return new HoodieFileGroupReaderBasedRecordReader((s, j, r) -> {
+            try {
+              return new ParquetRecordReaderWrapper(new 
HoodieTimestampAwareParquetInputFormat(), s, j, r);
+            } catch (InterruptedException e) {
+              throw new RuntimeException(e);
+            }
+          }, split, job, reporter);
+        } else {
+          return new 
HoodieFileGroupReaderBasedRecordReader(super::getRecordReader, split, job, 
reporter);
+        }
+      } catch (final IOException e) {
+        throw new RuntimeException("Cannot create a RecordReaderWrapper", e);
+      }
+    }
     // TODO enable automatic predicate pushdown after fixing issues
     // FileSplit fileSplit = (FileSplit) split;
     // HoodieTableMetadata metadata = 
getTableMetadata(fileSplit.getPath().getParent());
@@ -117,7 +158,6 @@ public class HoodieParquetInputFormat extends 
HoodieParquetInputFormatBase {
       LOG.debug("EMPLOYING DEFAULT RECORD READER - " + split);
     }
 
-    HoodieRealtimeInputFormatUtils.addProjectionField(job, 
job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "").split("/"));
     return getRecordReaderInternal(split, job, reporter);
   }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
index 7ffa3bf555c..c08c358c0c8 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/RecordReaderValueIterator.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.hadoop;
 
+import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.exception.HoodieException;
 
 import org.apache.hadoop.mapred.RecordReader;
@@ -25,7 +26,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Iterator;
 import java.util.NoSuchElementException;
 
 /**
@@ -34,7 +34,7 @@ import java.util.NoSuchElementException;
  * @param <K> Key Type
  * @param <V> Value Type
  */
-public class RecordReaderValueIterator<K, V> implements Iterator<V> {
+public class RecordReaderValueIterator<K, V> implements ClosableIterator<V> {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(RecordReaderValueIterator.class);
 
@@ -79,7 +79,12 @@ public class RecordReaderValueIterator<K, V> implements 
Iterator<V> {
     return retVal;
   }
 
-  public void close() throws IOException {
-    this.reader.close();
+  @Override
+  public void close() {
+    try {
+      this.reader.close();
+    } catch (IOException e) {
+      throw new RuntimeException("Could not close reader", e);
+    }
   }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
index 1edf29d45d5..b89e69f4be8 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieCombineRealtimeRecordReader.java
@@ -19,8 +19,10 @@
 package org.apache.hudi.hadoop.realtime;
 
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.hadoop.HoodieFileGroupReaderBasedRecordReader;
 import org.apache.hudi.hadoop.hive.HoodieCombineRealtimeFileSplit;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.IOContextMap;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -35,6 +37,8 @@ import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
 
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.shouldUseFilegroupReader;
+
 /**
  * Allows to read multiple realtime file splits grouped together by 
CombineInputFormat.
  */
@@ -42,19 +46,29 @@ public class HoodieCombineRealtimeRecordReader implements 
RecordReader<NullWrita
 
   private static final transient Logger LOG = 
LoggerFactory.getLogger(HoodieCombineRealtimeRecordReader.class);
   // RecordReaders for each split
-  List<HoodieRealtimeRecordReader> recordReaders = new LinkedList<>();
+  private List<RecordReader> recordReaders = new LinkedList<>();
   // Points to the currently iterating record reader
-  HoodieRealtimeRecordReader currentRecordReader;
+  private RecordReader currentRecordReader;
+
+  private final boolean useFileGroupReader;
 
   public HoodieCombineRealtimeRecordReader(JobConf jobConf, CombineFileSplit 
split,
       List<RecordReader> readers) {
+    useFileGroupReader = shouldUseFilegroupReader(jobConf);
     try {
       ValidationUtils.checkArgument(((HoodieCombineRealtimeFileSplit) 
split).getRealtimeFileSplits().size() == readers
           .size(), "Num Splits does not match number of unique 
RecordReaders!");
       for (InputSplit rtSplit : ((HoodieCombineRealtimeFileSplit) 
split).getRealtimeFileSplits()) {
-        LOG.info("Creating new RealtimeRecordReader for split");
-        recordReaders.add(
-            new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) rtSplit, 
jobConf, readers.remove(0)));
+        if (useFileGroupReader) {
+          LOG.info("Creating new HoodieFileGroupReaderRecordReader for split");
+          RecordReader reader = readers.remove(0);
+          ValidationUtils.checkArgument(reader instanceof 
HoodieFileGroupReaderBasedRecordReader, reader.toString() + "not instance of 
HoodieFileGroupReaderRecordReader ");
+          recordReaders.add(reader);
+        } else {
+          LOG.info("Creating new RealtimeRecordReader for split");
+          recordReaders.add(
+              new HoodieRealtimeRecordReader((HoodieRealtimeFileSplit) 
rtSplit, jobConf, readers.remove(0)));
+        }
       }
       currentRecordReader = recordReaders.remove(0);
     } catch (Exception e) {
@@ -69,9 +83,20 @@ public class HoodieCombineRealtimeRecordReader implements 
RecordReader<NullWrita
     } else if (recordReaders.size() > 0) {
       this.currentRecordReader.close();
       this.currentRecordReader = recordReaders.remove(0);
-      AbstractRealtimeRecordReader reader = 
(AbstractRealtimeRecordReader)currentRecordReader.getReader();
+      RecordReader reader;
+      JobConf jobConf;
+      Path path;
+      if (useFileGroupReader) {
+        reader = currentRecordReader;
+        jobConf = ((HoodieFileGroupReaderBasedRecordReader) 
reader).getJobConf();
+        path = ((HoodieFileGroupReaderBasedRecordReader) 
reader).getSplit().getPath();
+      } else {
+        reader = ((HoodieRealtimeRecordReader)currentRecordReader).getReader();
+        jobConf = ((AbstractRealtimeRecordReader) reader).getJobConf();
+        path = ((AbstractRealtimeRecordReader) reader).getSplit().getPath();
+      }
       // when switch reader, ioctx should be updated
-      
IOContextMap.get(reader.getJobConf()).setInputPath(reader.getSplit().getPath());
+      IOContextMap.get(jobConf).setInputPath(path);
       return next(key, value);
     } else {
       return false;
@@ -80,12 +105,20 @@ public class HoodieCombineRealtimeRecordReader implements 
RecordReader<NullWrita
 
   @Override
   public NullWritable createKey() {
-    return this.currentRecordReader.createKey();
+    if (useFileGroupReader) {
+      return ((HoodieFileGroupReaderBasedRecordReader) 
this.currentRecordReader).createKey();
+    } else {
+      return ((HoodieRealtimeRecordReader) 
this.currentRecordReader).createKey();
+    }
   }
 
   @Override
   public ArrayWritable createValue() {
-    return this.currentRecordReader.createValue();
+    if (useFileGroupReader) {
+      return ((HoodieFileGroupReaderBasedRecordReader) 
this.currentRecordReader).createValue();
+    } else {
+      return ((HoodieRealtimeRecordReader) 
this.currentRecordReader).createValue();
+    }
   }
 
   @Override
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index 7e74171c3f9..8d56e77dda9 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
 import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
-import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
 
@@ -45,6 +44,10 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.getStorageConf;
+import static org.apache.hudi.hadoop.fs.HadoopFSUtils.isLogFile;
+import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.shouldUseFilegroupReader;
+
 /**
  * Input Format, that provides a real-time view of data in a Hoodie table.
  */
@@ -69,16 +72,20 @@ public class HoodieParquetRealtimeInputFormat extends 
HoodieParquetInputFormat {
     ValidationUtils.checkArgument(split instanceof RealtimeSplit,
         "HoodieRealtimeRecordReader can only work on RealtimeSplit and not 
with " + split);
     RealtimeSplit realtimeSplit = (RealtimeSplit) split;
+
+    if (shouldUseFilegroupReader(jobConf)) {
+      return super.getRecordReader(realtimeSplit, jobConf, reporter);
+    }
+
     // add preCombineKey
-    HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
-        
.setConf(HadoopFSUtils.getStorageConfWithCopy(jobConf)).setBasePath(realtimeSplit.getBasePath()).build();
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(getStorageConf(jobConf)).setBasePath(realtimeSplit.getBasePath()).build();
     HoodieTableConfig tableConfig = metaClient.getTableConfig();
     addProjectionToJobConf(realtimeSplit, jobConf, tableConfig);
     LOG.info("Creating record reader with readCols :" + 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
         + ", Ids :" + 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
 
     // for log only split, set the parquet reader as empty.
-    if (HadoopFSUtils.isLogFile(realtimeSplit.getPath())) {
+    if (isLogFile(realtimeSplit.getPath())) {
       return new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new 
HoodieEmptyRecordReader(realtimeSplit, jobConf));
     }
 
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java
new file mode 100644
index 00000000000..a2da796c6f7
--- /dev/null
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableAvroUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.common.util.collection.Pair;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.List;
+import java.util.function.UnaryOperator;
+
+public class HoodieArrayWritableAvroUtils {
+
+  private static final Cache<String, ObjectInspectorCache>
+      OBJECT_INSPECTOR_TABLE_CACHE = 
Caffeine.newBuilder().maximumSize(1000).build();
+
+  public static ObjectInspectorCache getCacheForTable(String table, Schema 
tableSchema, JobConf jobConf) {
+    ObjectInspectorCache cache = 
OBJECT_INSPECTOR_TABLE_CACHE.getIfPresent(table);
+    if (cache == null) {
+      cache = new ObjectInspectorCache(tableSchema, jobConf);
+    }
+    return cache;
+  }
+
+  private static final Cache<Pair<Schema, Schema>, int[]>
+      PROJECTION_CACHE = Caffeine.newBuilder().maximumSize(1000).build();
+
+  public static int[] getProjection(Schema from, Schema to) {
+    return PROJECTION_CACHE.get(Pair.of(from, to), schemas -> {
+      List<Schema.Field> toFields = to.getFields();
+      int[] newProjection = new int[toFields.size()];
+      for (int i = 0; i < newProjection.length; i++) {
+        newProjection[i] = from.getField(toFields.get(i).name()).pos();
+      }
+      return newProjection;
+    });
+  }
+
+  /**
+   * Projection will keep the size from the "from" schema because it gets 
recycled
+   * and if the size changes the reader will fail
+   */
+  public static UnaryOperator<ArrayWritable> projectRecord(Schema from, Schema 
to) {
+    int[] projection = getProjection(from, to);
+    return arrayWritable -> {
+      Writable[] values = new Writable[arrayWritable.get().length];
+      for (int i = 0; i < projection.length; i++) {
+        values[i] = arrayWritable.get()[projection[i]];
+      }
+      arrayWritable.set(values);
+      return arrayWritable;
+    };
+  }
+
+  public static int[] getReverseProjection(Schema from, Schema to) {
+    return PROJECTION_CACHE.get(Pair.of(from, to), schemas -> {
+      List<Schema.Field> fromFields = from.getFields();
+      int[] newProjection = new int[fromFields.size()];
+      for (int i = 0; i < newProjection.length; i++) {
+        newProjection[i] = to.getField(fromFields.get(i).name()).pos();
+      }
+      return newProjection;
+    });
+  }
+
+  /**
+   * After the reading and merging etc is done, we need to put the records
+   * into the positions of the original schema
+   */
+  public static UnaryOperator<ArrayWritable> reverseProject(Schema from, 
Schema to) {
+    int[] projection = getReverseProjection(from, to);
+    return arrayWritable -> {
+      Writable[] values = new Writable[to.getFields().size()];
+      for (int i = 0; i < projection.length; i++) {
+        values[projection[i]] = arrayWritable.get()[i];
+      }
+      arrayWritable.set(values);
+      return arrayWritable;
+    };
+  }
+
+  public static Object getWritableValue(ArrayWritable arrayWritable, 
ArrayWritableObjectInspector objectInspector, String name) {
+    return objectInspector.getStructFieldData(arrayWritable, 
objectInspector.getStructFieldRef(name));
+  }
+}
+
+
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
index fe88855d458..64dc1f63af8 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieInputFormatUtils.java
@@ -18,7 +18,9 @@
 
 package org.apache.hudi.hadoop.utils;
 
+import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -33,6 +35,7 @@ import 
org.apache.hudi.common.table.view.HoodieTableFileSystemView;
 import org.apache.hudi.common.table.view.TableFileSystemView;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.TablePathUtils;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hadoop.FileStatusWithBootstrapBaseFile;
@@ -44,6 +47,8 @@ import 
org.apache.hudi.hadoop.realtime.HoodieHFileRealtimeInputFormat;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
 import org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit;
 import org.apache.hudi.hadoop.realtime.HoodieRealtimePath;
+import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
+import org.apache.hudi.hadoop.realtime.RealtimeSplit;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StoragePath;
@@ -52,8 +57,10 @@ import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
@@ -61,6 +68,8 @@ import 
org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat;
 import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.slf4j.Logger;
@@ -540,4 +549,31 @@ public class HoodieInputFormatUtils {
       throw new HoodieIOException(String.format("Failed to create instance of 
%s", HoodieRealtimeFileSplit.class.getName()), e);
     }
   }
+
+  public static List<String> getPartitionFieldNames(JobConf jobConf) {
+    String partitionFields = 
jobConf.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
+    return partitionFields.isEmpty() ? new ArrayList<>() : 
Arrays.stream(partitionFields.split("/")).collect(Collectors.toList());
+  }
+
+  public static String getTableBasePath(InputSplit split, JobConf jobConf) 
throws IOException {
+    if (split instanceof RealtimeSplit) {
+      RealtimeSplit realtimeSplit = (RealtimeSplit) split;
+      return realtimeSplit.getBasePath();
+    } else {
+      Path inputPath = ((FileSplit) split).getPath();
+      FileSystem fs = inputPath.getFileSystem(jobConf);
+      HoodieStorage storage = new HoodieHadoopStorage(fs);
+      Option<StoragePath> tablePath = TablePathUtils.getTablePath(storage, 
convertToStoragePath(inputPath));
+      return tablePath.get().toString();
+    }
+  }
+
+  /**
+   * `schema.on.read` and skip merge not implemented
+   */
+  public static boolean shouldUseFilegroupReader(final JobConf jobConf) {
+    return 
jobConf.getBoolean(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
HoodieReaderConfig.FILE_GROUP_READER_ENABLED.defaultValue())
+        && 
!jobConf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), 
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue())
+        && 
!jobConf.getBoolean(HoodieRealtimeRecordReader.REALTIME_SKIP_MERGE_PROP, false);
+  }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
new file mode 100644
index 00000000000..ddcc28851df
--- /dev/null
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/ObjectInspectorCache.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.apache.avro.Schema;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapred.JobConf;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/**
+ * To read value from an ArrayWritable, an ObjectInspector is needed.
+ * Object inspectors are cached here or created using the column type map.
+ */
+public class ObjectInspectorCache {
+  private final Map<String, TypeInfo> columnTypeMap = new HashMap<>();
+  private final Cache<Schema, ArrayWritableObjectInspector>
+      objectInspectorCache = Caffeine.newBuilder().maximumSize(1000).build();
+
+  public Map<String, TypeInfo> getColumnTypeMap() {
+    return columnTypeMap;
+  }
+
+  public ObjectInspectorCache(Schema tableSchema, JobConf jobConf) {
+    //From AbstractRealtimeRecordReader#prepareHiveAvroSerializer
+    // hive will append virtual columns at the end of column list. we should 
remove those columns.
+    // eg: current table is col1, col2, col3; 
jobConf.get(serdeConstants.LIST_COLUMNS): col1, col2, col3 
,BLOCK__OFFSET__INSIDE__FILE ...
+    Set<String> writerSchemaColNames = tableSchema.getFields().stream().map(f 
-> f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toSet());
+    List<String> columnNameList = 
Arrays.stream(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList());
+    List<TypeInfo> columnTypeList =  
TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
+
+    int columnNameListLen = columnNameList.size() - 1;
+    for (int i = columnNameListLen; i >= 0; i--) {
+      String lastColName = columnNameList.get(columnNameList.size() - 1);
+      // virtual columns will only append at the end of column list. it will 
be ok to break the loop.
+      if (writerSchemaColNames.contains(lastColName)) {
+        break;
+      }
+      columnNameList.remove(columnNameList.size() - 1);
+      columnTypeList.remove(columnTypeList.size() - 1);
+    }
+
+    //Use columnNameList.size() instead of columnTypeList because the type 
list is longer for some reason
+    IntStream.range(0, columnNameList.size()).boxed().forEach(i -> 
columnTypeMap.put(columnNameList.get(i),
+        
TypeInfoUtils.getTypeInfosFromTypeString(columnTypeList.get(i).getQualifiedName()).get(0)));
+
+    StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+    ArrayWritableObjectInspector objectInspector = new 
ArrayWritableObjectInspector(rowTypeInfo);
+    objectInspectorCache.put(tableSchema, objectInspector);
+  }
+
+  public ArrayWritableObjectInspector getObjectInspector(Schema schema) {
+    return objectInspectorCache.get(schema, s -> {
+      List<String> columnNameList = 
s.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
+      List<TypeInfo> columnTypeList = 
columnNameList.stream().map(columnTypeMap::get).collect(Collectors.toList());
+      StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+      return new ArrayWritableObjectInspector(rowTypeInfo);
+    });
+
+  }
+
+  public Object getValue(ArrayWritable record, Schema schema, String 
fieldName) {
+    try {
+      ArrayWritableObjectInspector objectInspector = 
getObjectInspector(schema);
+      return objectInspector.getStructFieldData(record, 
objectInspector.getStructFieldRef(fieldName));
+    } catch (Exception e) {
+      throw e;
+    }
+
+  }
+}
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
index 08cd33c2d56..7d7a2eec626 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieParquetInputFormat.java
@@ -20,6 +20,7 @@ package org.apache.hudi.hadoop;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileFormat;
@@ -368,7 +369,7 @@ public class TestHoodieParquetInputFormat {
     files = inputFormat.listStatus(jobConf);
     assertEquals(10, files.length,
         "When hoodie.incremental.use.database is true and hoodie.database.name 
is not null or empty"
-                + " and the incremental database name is not set, then the 
incremental query will not take effect");
+            + " and the incremental database name is not set, then the 
incremental query will not take effect");
   }
 
   @Test
@@ -403,7 +404,7 @@ public class TestHoodieParquetInputFormat {
     metaClient = HoodieTestUtils.init(HoodieTestUtils.getDefaultStorageConf(), 
basePath.toString(), HoodieTableType.COPY_ON_WRITE,
         baseFileFormat, HoodieTestUtils.HOODIE_DATABASE);
     assertEquals(HoodieTestUtils.HOODIE_DATABASE, 
metaClient.getTableConfig().getDatabaseName(),
-            String.format("The hoodie.database.name should be %s ", 
HoodieTestUtils.HOODIE_DATABASE));
+        String.format("The hoodie.database.name should be %s ", 
HoodieTestUtils.HOODIE_DATABASE));
 
     files = inputFormat.listStatus(jobConf);
     assertEquals(0, files.length,
@@ -414,7 +415,7 @@ public class TestHoodieParquetInputFormat {
     files = inputFormat.listStatus(jobConf);
     assertEquals(10, files.length,
         "When hoodie.incremental.use.database is false and the incremental 
database name is set, "
-                + "then the incremental query will not take effect");
+            + "then the incremental query will not take effect");
 
     // The configuration with and without database name exists together
     InputFormatTestUtil.setupIncremental(jobConf, "1", 1, true);
@@ -422,13 +423,13 @@ public class TestHoodieParquetInputFormat {
     files = inputFormat.listStatus(jobConf);
     assertEquals(0, files.length,
         "When hoodie.incremental.use.database is true, "
-                + "We should exclude commit 100 because the returning 
incremental pull with start commit time is 100");
+            + "We should exclude commit 100 because the returning incremental 
pull with start commit time is 100");
 
     InputFormatTestUtil.setupIncremental(jobConf, "1", 1, false);
     files = inputFormat.listStatus(jobConf);
     assertEquals(10, files.length,
         "When hoodie.incremental.use.database is false, "
-                + "We should include commit 100 because the returning 
incremental pull with start commit time is 1");
+            + "We should include commit 100 because the returning incremental 
pull with start commit time is 1");
   }
 
   @Test
@@ -679,13 +680,13 @@ public class TestHoodieParquetInputFormat {
 
     try {
       // Verify that Validate mode throws error with invalid commit time
-      InputFormatTestUtil.setupSnapshotIncludePendingCommits(jobConf, "300"); 
+      InputFormatTestUtil.setupSnapshotIncludePendingCommits(jobConf, "300");
       inputFormat.listStatus(jobConf);
       fail("Expected list status to fail when validate is called with unknown 
timestamp");
     } catch (HoodieIOException e) {
       // expected because validate is called with invalid instantTime
     }
-    
+
     //Creating a new jobCOnf Object because old one has 
hoodie.%.consume.commit set
     jobConf = new JobConf();
     inputFormat.setConf(jobConf);
@@ -751,7 +752,7 @@ public class TestHoodieParquetInputFormat {
   }
 
   private void ensureRecordsInCommit(String msg, String commit, int 
expectedNumberOfRecordsInCommit,
-      int totalExpected) throws IOException {
+                                     int totalExpected) throws IOException {
     int actualCount = 0;
     int totalCount = 0;
     InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
@@ -777,59 +778,64 @@ public class TestHoodieParquetInputFormat {
 
   @Test
   public void testHoodieParquetInputFormatReadTimeType() throws IOException {
-    long testTimestampLong = System.currentTimeMillis();
-    int testDate = 19116;// 2022-05-04
-
-    Schema schema = SchemaTestUtil.getSchemaFromResource(getClass(), 
"/test_timetype.avsc");
-    String commit = "20160628071126";
-    HoodieTestUtils.init(HoodieTestUtils.getDefaultStorageConf(), 
basePath.toString(),
-        HoodieTableType.COPY_ON_WRITE, HoodieFileFormat.PARQUET);
-    java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", 
"06", "28"));
-    String fileId = FSUtils.makeBaseFileName(commit, "1-0-1", "fileid1",
-        HoodieFileFormat.PARQUET.getFileExtension());
-    try (AvroParquetWriter parquetWriter = new AvroParquetWriter(
-        new Path(partitionPath.resolve(fileId).toString()), schema)) {
-      GenericData.Record record = new GenericData.Record(schema);
-      record.put("test_timestamp", testTimestampLong * 1000);
-      record.put("test_long", testTimestampLong * 1000);
-      record.put("test_date", testDate);
-      record.put("_hoodie_commit_time", commit);
-      record.put("_hoodie_commit_seqno", commit + 1);
-      parquetWriter.write(record);
-    }
-
-    jobConf.set(IOConstants.COLUMNS, 
"test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno");
-    jobConf.set(IOConstants.COLUMNS_TYPES, 
"timestamp,bigint,date,string,string");
-    jobConf.set(READ_COLUMN_NAMES_CONF_STR, 
"test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno");
-    InputFormatTestUtil.setupPartition(basePath, partitionPath);
-    InputFormatTestUtil.commit(basePath, commit);
-    FileInputFormat.setInputPaths(jobConf, partitionPath.toFile().getPath());
+    try {
+      long testTimestampLong = System.currentTimeMillis();
+      int testDate = 19116;// 2022-05-04
+
+      Schema schema = SchemaTestUtil.getSchemaFromResource(getClass(), 
"/test_timetype.avsc");
+      String commit = "20160628071126";
+      HoodieTestUtils.init(HoodieTestUtils.getDefaultStorageConf(), 
basePath.toString(),
+          HoodieTableType.COPY_ON_WRITE, HoodieFileFormat.PARQUET);
+      java.nio.file.Path partitionPath = basePath.resolve(Paths.get("2016", 
"06", "28"));
+      String fileId = FSUtils.makeBaseFileName(commit, "1-0-1", "fileid1",
+          HoodieFileFormat.PARQUET.getFileExtension());
+      try (AvroParquetWriter parquetWriter = new AvroParquetWriter(
+          new Path(partitionPath.resolve(fileId).toString()), schema)) {
+        GenericData.Record record = new GenericData.Record(schema);
+        record.put("test_timestamp", testTimestampLong * 1000);
+        record.put("test_long", testTimestampLong * 1000);
+        record.put("test_date", testDate);
+        record.put("_hoodie_commit_time", commit);
+        record.put("_hoodie_commit_seqno", commit + 1);
+        parquetWriter.write(record);
+      }
 
-    InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
-    for (InputSplit split : splits) {
-      RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat
-          .getRecordReader(split, jobConf, null);
-      NullWritable key = recordReader.createKey();
-      ArrayWritable writable = recordReader.createValue();
-      while (recordReader.next(key, writable)) {
-        // test timestamp
-        if (HiveVersionInfo.getShortVersion().startsWith("3")) {
-          LocalDateTime localDateTime = LocalDateTime.ofInstant(
-              Instant.ofEpochMilli(testTimestampLong), ZoneOffset.UTC);
-          assertEquals(Timestamp.valueOf(localDateTime).toString(), 
String.valueOf(writable.get()[0]));
-        } else {
-          Date date = new Date();
-          date.setTime(testTimestampLong);
-          Timestamp actualTime = ((TimestampWritable) 
writable.get()[0]).getTimestamp();
-          SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS");
-          assertEquals(dateFormat.format(date), dateFormat.format(actualTime));
+      //this is not a hoodie table!!
+      jobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "false");
+      jobConf.set(IOConstants.COLUMNS, 
"test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno");
+      jobConf.set(IOConstants.COLUMNS_TYPES, 
"timestamp,bigint,date,string,string");
+      jobConf.set(READ_COLUMN_NAMES_CONF_STR, 
"test_timestamp,test_long,test_date,_hoodie_commit_time,_hoodie_commit_seqno");
+      InputFormatTestUtil.setupPartition(basePath, partitionPath);
+      InputFormatTestUtil.commit(basePath, commit);
+      FileInputFormat.setInputPaths(jobConf, partitionPath.toFile().getPath());
+
+      InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
+      for (InputSplit split : splits) {
+        RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat
+            .getRecordReader(split, jobConf, null);
+        NullWritable key = recordReader.createKey();
+        ArrayWritable writable = recordReader.createValue();
+        while (recordReader.next(key, writable)) {
+          // test timestamp
+          if (HiveVersionInfo.getShortVersion().startsWith("3")) {
+            LocalDateTime localDateTime = LocalDateTime.ofInstant(
+                Instant.ofEpochMilli(testTimestampLong), ZoneOffset.UTC);
+            assertEquals(Timestamp.valueOf(localDateTime).toString(), 
String.valueOf(writable.get()[0]));
+          } else {
+            Date date = new Date();
+            date.setTime(testTimestampLong);
+            Timestamp actualTime = ((TimestampWritable) 
writable.get()[0]).getTimestamp();
+            SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS");
+            assertEquals(dateFormat.format(date), 
dateFormat.format(actualTime));
+          }
+          // test long
+          assertEquals(testTimestampLong * 1000, ((LongWritable) 
writable.get()[1]).get());
+          // test date
+          assertEquals(LocalDate.ofEpochDay(testDate).toString(), 
String.valueOf(writable.get()[2]));
         }
-        // test long
-        assertEquals(testTimestampLong * 1000, ((LongWritable) 
writable.get()[1]).get());
-        // test date
-        assertEquals(LocalDate.ofEpochDay(testDate).toString(), 
String.valueOf(writable.get()[2]));
       }
-      recordReader.close();
+    } finally {
+      jobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), "true");
     }
   }
 }
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
index 3371b5efb27..ab907390f88 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/hive/TestHoodieCombineHiveInputFormat.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hive.ql.io.IOContextMap;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileInputFormat;
@@ -243,7 +244,18 @@ public class TestHoodieCombineHiveInputFormat extends 
HoodieCommonTestHarness {
 
     HoodieCombineHiveInputFormat combineHiveInputFormat = new 
HoodieCombineHiveInputFormat();
     String tripsHiveColumnTypes = 
"double,string,string,string,double,double,double,double,double";
-    InputFormatTestUtil.setPropsForInputFormat(jobConf, schema, 
tripsHiveColumnTypes);
+    List<Schema.Field> fields = schema.getFields();
+    String names = fields.stream().map(f -> 
f.name().toString()).collect(Collectors.joining(","));
+    String positions = fields.stream().map(f -> 
String.valueOf(f.pos())).collect(Collectors.joining(","));
+
+    String hiveColumnNames = 
fields.stream().map(Schema.Field::name).collect(Collectors.joining(","));
+    hiveColumnNames = hiveColumnNames + ",year,month,day";
+    String modifiedHiveColumnTypes = 
HoodieAvroUtils.addMetadataColumnTypes(tripsHiveColumnTypes);
+    modifiedHiveColumnTypes = modifiedHiveColumnTypes + 
",string,string,string";
+    jobConf.set(hive_metastoreConstants.META_TABLE_COLUMNS, hiveColumnNames);
+    jobConf.set(hive_metastoreConstants.META_TABLE_COLUMN_TYPES, 
modifiedHiveColumnTypes);
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, names);
+    jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, positions);
     // unset META_TABLE_PARTITION_COLUMNS to trigger HUDI-1718
     jobConf.set(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "");
     InputSplit[] splits = combineHiveInputFormat.getSplits(jobConf, 1);
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
index f982a971062..463ad5a2ebc 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieMergeOnReadSnapshotReader.java
@@ -20,6 +20,7 @@ package org.apache.hudi.hadoop.realtime;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -89,6 +90,7 @@ public class TestHoodieMergeOnReadSnapshotReader {
     baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 
String.valueOf(1024 * 1024));
     baseJobConf.set(serdeConstants.LIST_COLUMNS, COLUMNS);
     baseJobConf.set(serdeConstants.LIST_COLUMN_TYPES, COLUMN_TYPES);
+    baseJobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
"false");
     storage = new HoodieHadoopStorage(HadoopFSUtils.getFs(new 
StoragePath(basePath.toUri()), baseJobConf));
   }
 
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index 3ee83a09a3b..b992987c690 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieLogFile;
@@ -120,6 +121,7 @@ public class TestHoodieRealtimeRecordReader {
     storageConf.set("fs.file.impl", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
     baseJobConf = new JobConf(storageConf.unwrap());
     baseJobConf.set(HoodieMemoryConfig.MAX_DFS_STREAM_BUFFER_SIZE.key(), 
String.valueOf(1024 * 1024));
+    baseJobConf.set(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(), 
"false");
     fs = HadoopFSUtils.getFs(basePath.toUri().toString(), baseJobConf);
     storage = new HoodieHadoopStorage(fs);
   }
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java
new file mode 100644
index 00000000000..d7b4a93009b
--- /dev/null
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableAvroUtils.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.UnaryOperator;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieArrayWritableAvroUtils {
+
+  HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+  Schema tableSchema = HoodieTestDataGenerator.AVRO_SCHEMA;
+  ObjectInspectorCache objectInspectorCache;
+
+  @BeforeEach
+  public void setup() {
+    List<Schema.Field> fields = tableSchema.getFields();
+    Configuration conf = HoodieTestUtils.getDefaultStorageConf().unwrap();
+    JobConf jobConf = new JobConf(conf);
+    jobConf.set(serdeConstants.LIST_COLUMNS, 
fields.stream().map(Schema.Field::name).collect(Collectors.joining(",")));
+    jobConf.set(serdeConstants.LIST_COLUMN_TYPES, 
HoodieTestDataGenerator.TRIP_HIVE_COLUMN_TYPES);
+    objectInspectorCache = new 
ObjectInspectorCache(HoodieTestDataGenerator.AVRO_SCHEMA, jobConf);
+  }
+
+  @Test
+  public void testProjection() {
+    Schema from =  tableSchema;
+    Schema to = HoodieAvroUtils.generateProjectionSchema(from, 
Arrays.asList("trip_type", "current_ts", "weight"));
+    UnaryOperator<ArrayWritable> projection = 
HoodieArrayWritableAvroUtils.projectRecord(from, to);
+    UnaryOperator<ArrayWritable> reverseProjection = 
HoodieArrayWritableAvroUtils.reverseProject(to, from);
+
+    //We reuse the ArrayWritable, so we need to get the values before 
projecting
+    ArrayWritable record = 
convertArrayWritable(dataGen.generateGenericRecord());
+    Object tripType = objectInspectorCache.getValue(record, from, "trip_type");
+    Object currentTs = objectInspectorCache.getValue(record, from, 
"current_ts");
+    Object weight = objectInspectorCache.getValue(record, from, "weight");
+
+    //Make sure the projected fields can be read
+    ArrayWritable projectedRecord = projection.apply(record);
+    assertEquals(tripType, objectInspectorCache.getValue(projectedRecord, to, 
"trip_type"));
+    assertEquals(currentTs, objectInspectorCache.getValue(projectedRecord, to, 
"current_ts"));
+    assertEquals(weight, objectInspectorCache.getValue(projectedRecord, to, 
"weight"));
+
+    //Reverse projection, the fields are in the original spots, but only the 
fields we set can be read.
+    //Therefore, we can only check the 3 fields that were in the projection
+    ArrayWritable reverseProjected = reverseProjection.apply(projectedRecord);
+    assertEquals(tripType, objectInspectorCache.getValue(reverseProjected, 
from, "trip_type"));
+    assertEquals(currentTs, objectInspectorCache.getValue(reverseProjected, 
from, "current_ts"));
+    assertEquals(weight, objectInspectorCache.getValue(reverseProjected, from, 
"weight"));
+  }
+
+  private static ArrayWritable convertArrayWritable(GenericRecord record) {
+    return  (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(record, record.getSchema(), 
false);
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
index b6795bc2a2a..99fcdcbf8a3 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBootstrap.java
@@ -253,6 +253,7 @@ public class TestBootstrap extends 
HoodieSparkClientTestBase {
     long timestamp = Instant.now().toEpochMilli();
     Schema schema = generateNewDataSetAndReturnSchema(timestamp, totalRecords, 
partitions, bootstrapBasePath);
     HoodieWriteConfig config = getConfigBuilder(schema.toString())
+        .withPreCombineField("timestamp")
         .withAutoCommit(true)
         .withSchema(schema.toString())
         .withKeyGenerator(keyGeneratorClass)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
index a5a45cabf81..806f7754423 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.functional;
 
 import org.apache.hudi.HoodieSparkUtils;
+import org.apache.hudi.common.config.HoodieCommonConfig;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
 
@@ -106,6 +107,7 @@ public class TestHiveTableSchemaEvolution {
       spark.sql(String.format("alter table %s rename column col2 to col2_new", 
tableName));
 
       JobConf jobConf = new JobConf();
+      jobConf.set(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), "true");
       jobConf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false");
       jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, 
"col1,col2_new");
       jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7");
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
index 723d2389d22..2f2d2ba0efa 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestSparkConsistentBucketClustering.java
@@ -219,7 +219,7 @@ public class TestSparkConsistentBucketClustering extends 
HoodieSparkClientTestHa
    */
   @ParameterizedTest
   @MethodSource("configParamsForSorting")
-  public void testClusteringColumnSort(String sortColumn, boolean 
rowWriterEnable) throws IOException {
+  public void testClusteringColumnSort(String sortColumn, boolean 
rowWriterEnable) throws Exception {
     Map<String, String> options = new HashMap<>();
     // Record key is handled specially
     if (sortColumn.equals("_row_key")) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
index e6c388b3e3b..e00d7290094 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/TestHoodieStreamerUtils.java
@@ -32,12 +32,14 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.mockito.Mockito.doNothing;
@@ -56,8 +58,15 @@ public class TestHoodieStreamerUtils extends 
UtilitiesTestBase {
     initTestServices();
   }
 
+  private static Stream<Arguments> validRecordTypes() {
+    Stream.Builder<Arguments> b = Stream.builder();
+    b.add(Arguments.of(HoodieRecordType.SPARK));
+    b.add(Arguments.of(HoodieRecordType.AVRO));
+    return b.build();
+  }
+
   @ParameterizedTest
-  @EnumSource(HoodieRecordType.class)
+  @MethodSource("validRecordTypes")
   public void testCreateHoodieRecordsWithError(HoodieRecordType recordType) {
     Schema schema = new Schema.Parser().parse(SCHEMA_STRING);
     JavaRDD<GenericRecord> recordRdd = 
jsc.parallelize(Collections.singletonList(1)).map(i -> {

Reply via email to