This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 738e2cce8f437ed3a2f7fc474e4143bb42fbad22
Author: xiarixiaoyao <mengtao0...@qq.com>
AuthorDate: Thu Nov 3 23:11:39 2022 +0800

    [HUDI-4898] presto/hive respect payload during merge parquet file and 
logfile when reading mor table (#6741)
    
    * [HUDI-4898] presto/hive respect payload during merge parquet file and 
logfile when reading mor table
    
    * Update HiveAvroSerializer.java otherwise payload string type combine 
field will cause cast exception
    
    (cherry picked from commit cd314b8cfa58c32f731f7da2aa6377a09df4c6f9)
---
 .../realtime/AbstractRealtimeRecordReader.java     |  72 +++-
 .../realtime/HoodieHFileRealtimeInputFormat.java   |   2 +-
 .../realtime/HoodieParquetRealtimeInputFormat.java |  14 +-
 .../realtime/RealtimeCompactedRecordReader.java    |  25 +-
 .../hudi/hadoop/utils/HiveAvroSerializer.java      | 409 +++++++++++++++++++++
 .../utils/HoodieRealtimeInputFormatUtils.java      |  19 +-
 .../utils/HoodieRealtimeRecordReaderUtils.java     |   5 +
 .../hudi/hadoop/utils/TestHiveAvroSerializer.java  | 148 ++++++++
 8 files changed, 678 insertions(+), 16 deletions(-)

diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
index dfdda9dfc8..83b69812e1 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java
@@ -18,26 +18,34 @@
 
 package org.apache.hudi.hadoop.realtime;
 
-import org.apache.hudi.common.model.HoodieAvroPayload;
-import org.apache.hudi.common.model.HoodiePayloadProps;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.common.table.TableSchemaResolver;
-import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
-
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodiePayloadProps;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.hadoop.utils.HiveAvroSerializer;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -55,6 +63,10 @@ public abstract class AbstractRealtimeRecordReader {
   private Schema writerSchema;
   private Schema hiveSchema;
   private HoodieTableMetaClient metaClient;
+  // support merge operation
+  protected boolean supportPayload = true;
+  // handle hive type to avro record
+  protected HiveAvroSerializer serializer;
 
   public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) {
     this.split = split;
@@ -62,6 +74,7 @@ public abstract class AbstractRealtimeRecordReader {
     LOG.info("cfg ==> " + 
job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR));
     LOG.info("columnIds ==> " + 
job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
     LOG.info("partitioningColumns ==> " + 
job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, ""));
+    this.supportPayload = 
Boolean.parseBoolean(job.get("hoodie.support.payload", "true"));
     try {
       metaClient = 
HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build();
       if (metaClient.getTableConfig().getPreCombineField() != null) {
@@ -73,6 +86,7 @@ public abstract class AbstractRealtimeRecordReader {
     } catch (Exception e) {
       throw new HoodieException("Could not create HoodieRealtimeRecordReader 
on path " + this.split.getPath(), e);
     }
+    prepareHiveAvroSerializer();
   }
 
   private boolean usesCustomPayload(HoodieTableMetaClient metaClient) {
@@ -80,6 +94,34 @@ public abstract class AbstractRealtimeRecordReader {
         || 
metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload"));
   }
 
+  private void prepareHiveAvroSerializer() {
+    try {
+      // hive will append virtual columns at the end of column list. we should 
remove those columns.
+      // eg: current table is col1, col2, col3; 
jobConf.get(serdeConstants.LIST_COLUMNS): col1, col2, col3 
,BLOCK__OFFSET__INSIDE__FILE ...
+      Set<String> writerSchemaColNames = 
writerSchema.getFields().stream().map(f -> 
f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toSet());
+      List<String> columnNameList = 
Arrays.stream(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList());
+      List<TypeInfo> columnTypeList = 
TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES));
+      int columnNameListLen = columnNameList.size() - 1;
+      for (int i = columnNameListLen; i >= 0; i--) {
+        String lastColName = columnNameList.get(columnNameList.size() - 1);
+        // virtual columns will only append at the end of column list. it will 
be ok to break the loop.
+        if (writerSchemaColNames.contains(lastColName)) {
+          break;
+        }
+        LOG.debug(String.format("remove virtual column: %s", lastColName));
+        columnNameList.remove(columnNameList.size() - 1);
+        columnTypeList.remove(columnTypeList.size() - 1);
+      }
+      StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+      this.serializer = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfo), columnNameList, columnTypeList);
+    } catch (Exception e) {
+      // fallback to origin logical
+      LOG.warn("fall to init HiveAvroSerializer to support payload merge", e);
+      this.supportPayload = false;
+    }
+
+  }
+
   /**
    * Gets schema from HoodieTableMetaClient. If not, falls
    * back to the schema from the latest parquet file. Finally, sets the 
partition column and projection fields into the
@@ -135,6 +177,10 @@ public abstract class AbstractRealtimeRecordReader {
     return hiveSchema;
   }
 
+  protected Schema getLogScannerReaderSchema() {
+    return usesCustomPayload ? writerSchema : readerSchema;
+  }
+
   public Schema getReaderSchema() {
     return readerSchema;
   }
@@ -154,4 +200,16 @@ public abstract class AbstractRealtimeRecordReader {
   public JobConf getJobConf() {
     return jobConf;
   }
+
+  public void setReaderSchema(Schema readerSchema) {
+    this.readerSchema = readerSchema;
+  }
+
+  public void setWriterSchema(Schema writerSchema) {
+    this.writerSchema = writerSchema;
+  }
+
+  public void setHiveSchema(Schema hiveSchema) {
+    this.hiveSchema = hiveSchema;
+  }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
index 799d90bce5..cdc062475f 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
@@ -72,7 +72,7 @@ public class HoodieHFileRealtimeInputFormat extends 
HoodieMergeOnReadTableInputF
           // For e:g _hoodie_record_key would be missing and merge step would 
throw exceptions.
           // TO fix this, hoodie columns are appended late at the time 
record-reader gets built instead of construction
           // time.
-          HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, 
Option.empty());
+          HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, 
Option.empty(), Option.empty());
 
           this.conf = jobConf;
           this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, 
"true");
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
index e8c806ed2c..78768104d9 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java
@@ -27,6 +27,10 @@ import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
 import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
 import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
@@ -61,7 +65,10 @@ public class HoodieParquetRealtimeInputFormat extends 
HoodieParquetInputFormat {
     ValidationUtils.checkArgument(split instanceof RealtimeSplit,
         "HoodieRealtimeRecordReader can only work on RealtimeSplit and not 
with " + split);
     RealtimeSplit realtimeSplit = (RealtimeSplit) split;
-    addProjectionToJobConf(realtimeSplit, jobConf);
+    // add preCombineKey
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(realtimeSplit.getBasePath()).build();
+    HoodieTableConfig tableConfig = metaClient.getTableConfig();
+    addProjectionToJobConf(realtimeSplit, jobConf, 
metaClient.getTableConfig().getPreCombineField());
     LOG.info("Creating record reader with readCols :" + 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)
         + ", Ids :" + 
jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR));
 
@@ -74,7 +81,7 @@ public class HoodieParquetRealtimeInputFormat extends 
HoodieParquetInputFormat {
         super.getRecordReader(split, jobConf, reporter));
   }
 
-  void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf 
jobConf) {
+  void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf 
jobConf, String preCombineKey) {
     // Hive on Spark invokes multiple getRecordReaders from different threads 
in the same spark task (and hence the
     // same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is 
shared across all threads, is at the
     // risk of experiencing race conditions. Hence, we synchronize on the 
JobConf object here. There is negligible
@@ -94,7 +101,8 @@ public class HoodieParquetRealtimeInputFormat extends 
HoodieParquetInputFormat {
           // TO fix this, hoodie columns are appended late at the time 
record-reader gets built instead of construction
           // time.
           if (!realtimeSplit.getDeltaLogPaths().isEmpty()) {
-            
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, 
realtimeSplit.getVirtualKeyInfo());
+            
HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, 
realtimeSplit.getVirtualKeyInfo(),
+                StringUtils.isNullOrEmpty(preCombineKey) ? Option.empty() : 
Option.of(preCombineKey));
           }
           jobConf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true");
           setConf(jobConf);
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
index b917f004bc..0143672fa0 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
+import org.apache.hudi.hadoop.utils.HiveAvroSerializer;
 import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
 import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils;
 import org.apache.log4j.LogManager;
@@ -81,7 +82,7 @@ class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
         .withFileSystem(FSUtils.getFs(split.getPath().toString(), jobConf))
         .withBasePath(split.getBasePath())
         .withLogFilePaths(split.getDeltaLogPaths())
-        .withReaderSchema(usesCustomPayload ? getWriterSchema() : 
getReaderSchema())
+        .withReaderSchema(getLogScannerReaderSchema())
         .withLatestInstantTime(split.getMaxCommitTime())
         
.withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf))
         
.withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP,
 HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED)))
@@ -112,9 +113,7 @@ class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
         if (deltaRecordMap.containsKey(key)) {
           // mark the key as handled
           this.deltaRecordKeys.remove(key);
-          // TODO(NA): Invoke preCombine here by converting arrayWritable to 
Avro. This is required since the
-          // deltaRecord may not be a full record and needs values of columns 
from the parquet
-          Option<GenericRecord> rec = 
buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
+          Option<GenericRecord> rec = supportPayload ? 
mergeRecord(deltaRecordMap.get(key), arrayWritable) : 
buildGenericRecordwithCustomPayload(deltaRecordMap.get(key));
           // If the record is not present, this is a delete record using an 
empty payload so skip this base record
           // and move to the next record
           if (!rec.isPresent()) {
@@ -173,6 +172,24 @@ class RealtimeCompactedRecordReader extends 
AbstractRealtimeRecordReader
     }
   }
 
+  private Option<GenericRecord> mergeRecord(HoodieRecord<? extends 
HoodieRecordPayload> newRecord, ArrayWritable writableFromParquet) throws 
IOException {
+    GenericRecord oldRecord = 
convertArrayWritableToHoodieRecord(writableFromParquet);
+    // presto will not append partition columns to 
jobConf.get(serdeConstants.LIST_COLUMNS), but hive will do it. This will lead 
following results
+    // eg: current table: col1: int, col2: int, par: string, and column par is 
partition columns.
+    // for hive engine, the hiveSchema will be: col1,col2,par, and the 
writerSchema will be col1,col2,par
+    // for presto engine, the hiveSchema will be: col1,col2, but the 
writerSchema will be col1,col2,par
+    // so to be compatible with hive and presto, we should rewrite oldRecord 
before we call combineAndGetUpdateValue,
+    // once presto on hudi have it's own mor reader, we can remove the rewrite 
logical.
+    Option<GenericRecord> combinedValue = 
newRecord.getData().combineAndGetUpdateValue(HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord,
+        getLogScannerReaderSchema()), getLogScannerReaderSchema(), 
payloadProps);
+    return combinedValue;
+  }
+
+  private GenericRecord convertArrayWritableToHoodieRecord(ArrayWritable 
arrayWritable) {
+    GenericRecord record = serializer.serialize(arrayWritable, 
getHiveSchema());
+    return record;
+  }
+
   @Override
   public NullWritable createKey() {
     return parquetReader.createKey();
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java
new file mode 100644
index 0000000000..16942ba8b3
--- /dev/null
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericEnumSymbol;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.avro.util.Utf8;
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils;
+import org.apache.hadoop.hive.serde2.avro.InstanceCache;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import 
org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableDateObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.math.BigDecimal;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema;
+import static org.apache.hudi.avro.HoodieAvroUtils.isMetadataField;
+
+/**
+ * Helper class to serialize hive writable type to avro record.
+ */
+public class HiveAvroSerializer {
+
+  private final List<String> columnNames;
+  private final List<TypeInfo> columnTypes;
+  private final ObjectInspector objectInspector;
+
+  private static final Logger LOG = 
LogManager.getLogger(HiveAvroSerializer.class);
+
+  public HiveAvroSerializer(ObjectInspector objectInspector, List<String> 
columnNames, List<TypeInfo> columnTypes) {
+    this.columnNames = columnNames;
+    this.columnTypes = columnTypes;
+    this.objectInspector = objectInspector;
+  }
+
+  private static final Schema STRING_SCHEMA = 
Schema.create(Schema.Type.STRING);
+
+  public GenericRecord serialize(Object o, Schema schema) {
+
+    StructObjectInspector soi = (StructObjectInspector) objectInspector;
+    GenericData.Record record = new GenericData.Record(schema);
+
+    List<? extends StructField> outputFieldRefs = soi.getAllStructFieldRefs();
+    if (outputFieldRefs.size() != columnNames.size()) {
+      throw new HoodieException("Number of input columns was different than 
output columns (in = " + columnNames.size() + " vs out = " + 
outputFieldRefs.size());
+    }
+
+    int size = schema.getFields().size();
+
+    List<? extends StructField> allStructFieldRefs = 
soi.getAllStructFieldRefs();
+    List<Object> structFieldsDataAsList = soi.getStructFieldsDataAsList(o);
+
+    for (int i  = 0; i < size; i++) {
+      Schema.Field field = schema.getFields().get(i);
+      if (i >= columnTypes.size()) {
+        break;
+      }
+      try {
+        setUpRecordFieldFromWritable(columnTypes.get(i), 
structFieldsDataAsList.get(i),
+            allStructFieldRefs.get(i).getFieldObjectInspector(), record, 
field);
+      } catch (Exception e) {
+        LOG.error(String.format("current columnNames: %s", 
columnNames.stream().collect(Collectors.joining(","))));
+        LOG.error(String.format("current type: %s", columnTypes.stream().map(f 
-> f.getTypeName()).collect(Collectors.joining(","))));
+        LOG.error(String.format("current value: %s", 
HoodieRealtimeRecordReaderUtils.arrayWritableToString((ArrayWritable) o)));
+        throw e;
+      }
+    }
+    return record;
+  }
+
+  private void setUpRecordFieldFromWritable(TypeInfo typeInfo, Object 
structFieldData, ObjectInspector fieldOI, GenericData.Record record, 
Schema.Field field) {
+    Object val = serialize(typeInfo, fieldOI, structFieldData, field.schema());
+    if (val == null) {
+      if (field.defaultVal() instanceof JsonProperties.Null) {
+        record.put(field.name(), null);
+      } else {
+        record.put(field.name(), field.defaultVal());
+      }
+    } else {
+      record.put(field.name(), val);
+    }
+  }
+
+  /**
+   * Determine if an Avro schema is of type Union[T, NULL].  Avro supports 
nullable
+   * types via a union of type T and null.  This is a very common use case.
+   * As such, we want to silently convert it to just T and allow the value to 
be null.
+   *
+   * When a Hive union type is used with AVRO, the schema type becomes
+   * Union[NULL, T1, T2, ...]. The NULL in the union should be silently removed
+   *
+   * @return true if type represents Union[T, Null], false otherwise
+   */
+  public static boolean isNullableType(Schema schema) {
+    if (!schema.getType().equals(Schema.Type.UNION)) {
+      return false;
+    }
+
+    List<Schema> itemSchemas = schema.getTypes();
+    if (itemSchemas.size() < 2) {
+      return false;
+    }
+
+    for (Schema itemSchema : itemSchemas) {
+      if (Schema.Type.NULL.equals(itemSchema.getType())) {
+        return true;
+      }
+    }
+
+    // [null, null] not allowed, so this check is ok.
+    return false;
+  }
+
+  /**
+   * If the union schema is a nullable union, get the schema for the 
non-nullable type.
+   * This method does no checking that the provided Schema is nullable. If the 
provided
+   * union schema is non-nullable, it simply returns the union schema
+   */
+  public static Schema getOtherTypeFromNullableType(Schema unionSchema) {
+    final List<Schema> types = unionSchema.getTypes();
+    if (types.size() == 2) { // most common scenario
+      if (types.get(0).getType() == Schema.Type.NULL) {
+        return types.get(1);
+      }
+      if (types.get(1).getType() == Schema.Type.NULL) {
+        return types.get(0);
+      }
+      // not a nullable union
+      return unionSchema;
+    }
+
+    final List<Schema> itemSchemas = new ArrayList<>();
+    for (Schema itemSchema : types) {
+      if (!Schema.Type.NULL.equals(itemSchema.getType())) {
+        itemSchemas.add(itemSchema);
+      }
+    }
+
+    if (itemSchemas.size() > 1) {
+      return Schema.createUnion(itemSchemas);
+    } else {
+      return itemSchemas.get(0);
+    }
+  }
+
+  private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object 
structFieldData, Schema schema) throws HoodieException {
+    if (null == structFieldData) {
+      return null;
+    }
+
+    if (isNullableType(schema)) {
+      schema = getOtherTypeFromNullableType(schema);
+    }
+    /* Because we use Hive's 'string' type when Avro calls for enum, we have 
to expressly check for enum-ness */
+    if (Schema.Type.ENUM.equals(schema.getType())) {
+      assert fieldOI instanceof PrimitiveObjectInspector;
+      return serializeEnum((PrimitiveObjectInspector) fieldOI, 
structFieldData, schema);
+    }
+    switch (typeInfo.getCategory()) {
+      case PRIMITIVE:
+        assert fieldOI instanceof PrimitiveObjectInspector;
+        return serializePrimitive((PrimitiveObjectInspector) fieldOI, 
structFieldData, schema);
+      case MAP:
+        assert fieldOI instanceof MapObjectInspector;
+        assert typeInfo instanceof MapTypeInfo;
+        return serializeMap((MapTypeInfo) typeInfo, (MapObjectInspector) 
fieldOI, structFieldData, schema);
+      case LIST:
+        assert fieldOI instanceof ListObjectInspector;
+        assert typeInfo instanceof ListTypeInfo;
+        return serializeList((ListTypeInfo) typeInfo, (ListObjectInspector) 
fieldOI, structFieldData, schema);
+      case UNION:
+        assert fieldOI instanceof UnionObjectInspector;
+        assert typeInfo instanceof UnionTypeInfo;
+        return serializeUnion((UnionTypeInfo) typeInfo, (UnionObjectInspector) 
fieldOI, structFieldData, schema);
+      case STRUCT:
+        assert fieldOI instanceof StructObjectInspector;
+        assert typeInfo instanceof StructTypeInfo;
+        return serializeStruct((StructTypeInfo) typeInfo, 
(StructObjectInspector) fieldOI, structFieldData, schema);
+      default:
+        throw new HoodieException("Ran out of TypeInfo Categories: " + 
typeInfo.getCategory());
+    }
+  }
+
+  /** private cache to avoid lots of EnumSymbol creation while serializing.
+   *  Two levels because the enum symbol is specific to a schema.
+   *  Object because we want to avoid the overhead of repeated toString calls 
while maintaining compatability.
+   *  Provided there are few enum types per record, and few symbols per enum, 
memory use should be moderate.
+   *  eg 20 types with 50 symbols each as length-10 Strings should be on the 
order of 100KB per AvroSerializer.
+   */
+  final InstanceCache<Schema, InstanceCache<Object, GenericEnumSymbol>> enums 
= new InstanceCache<Schema, InstanceCache<Object, GenericEnumSymbol>>() {
+    @Override
+    protected InstanceCache<Object, GenericEnumSymbol> makeInstance(final 
Schema schema,
+                                                                    
Set<Schema> seenSchemas) {
+      return new InstanceCache<Object, GenericEnumSymbol>() {
+        @Override
+        protected GenericEnumSymbol makeInstance(Object seed, Set<Object> 
seenSchemas) {
+          return new GenericData.EnumSymbol(schema, seed.toString());
+        }
+      };
+    }
+  };
+
+  private Object serializeEnum(PrimitiveObjectInspector fieldOI, Object 
structFieldData, Schema schema) throws HoodieException {
+    try {
+      return enums.retrieve(schema).retrieve(serializePrimitive(fieldOI, 
structFieldData, schema));
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  private Object serializeStruct(StructTypeInfo typeInfo, 
StructObjectInspector ssoi, Object o, Schema schema) {
+    int size = schema.getFields().size();
+    List<? extends StructField> allStructFieldRefs = 
ssoi.getAllStructFieldRefs();
+    List<Object> structFieldsDataAsList = ssoi.getStructFieldsDataAsList(o);
+    GenericData.Record record = new GenericData.Record(schema);
+    ArrayList<TypeInfo> allStructFieldTypeInfos = 
typeInfo.getAllStructFieldTypeInfos();
+
+    for (int i  = 0; i < size; i++) {
+      Schema.Field field = schema.getFields().get(i);
+      setUpRecordFieldFromWritable(allStructFieldTypeInfos.get(i), 
structFieldsDataAsList.get(i),
+          allStructFieldRefs.get(i).getFieldObjectInspector(), record, field);
+    }
+    return record;
+  }
+
+  private Object serializePrimitive(PrimitiveObjectInspector fieldOI, Object 
structFieldData, Schema schema) throws HoodieException {
+    switch (fieldOI.getPrimitiveCategory()) {
+      case BINARY:
+        if (schema.getType() == Schema.Type.BYTES) {
+          return 
AvroSerdeUtils.getBufferFromBytes((byte[])fieldOI.getPrimitiveJavaObject(structFieldData));
+        } else if (schema.getType() == Schema.Type.FIXED) {
+          GenericData.Fixed fixed = new GenericData.Fixed(schema, 
(byte[])fieldOI.getPrimitiveJavaObject(structFieldData));
+          return fixed;
+        } else {
+          throw new HoodieException("Unexpected Avro schema for Binary 
TypeInfo: " + schema.getType());
+        }
+      case DECIMAL:
+        HiveDecimal dec = 
(HiveDecimal)fieldOI.getPrimitiveJavaObject(structFieldData);
+        LogicalTypes.Decimal decimal = 
(LogicalTypes.Decimal)schema.getLogicalType();
+        BigDecimal bd = new 
BigDecimal(dec.toString()).setScale(decimal.getScale());
+        return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, schema, decimal);
+      case CHAR:
+        HiveChar ch = 
(HiveChar)fieldOI.getPrimitiveJavaObject(structFieldData);
+        return new Utf8(ch.getStrippedValue());
+      case VARCHAR:
+        HiveVarchar vc = 
(HiveVarchar)fieldOI.getPrimitiveJavaObject(structFieldData);
+        return new Utf8(vc.getValue());
+      case STRING:
+        String string = 
(String)fieldOI.getPrimitiveJavaObject(structFieldData);
+        return new Utf8(string);
+      case DATE:
+        return 
DateWritable.dateToDays(((DateObjectInspector)fieldOI).getPrimitiveJavaObject(structFieldData));
+      case TIMESTAMP:
+        Timestamp timestamp =
+            ((TimestampObjectInspector) 
fieldOI).getPrimitiveJavaObject(structFieldData);
+        return timestamp.getTime();
+      case INT:
+        if (schema.getLogicalType() != null && 
schema.getLogicalType().getName().equals("date")) {
+          return DateWritable.dateToDays(new 
WritableDateObjectInspector().getPrimitiveJavaObject(structFieldData));
+        }
+        return fieldOI.getPrimitiveJavaObject(structFieldData);
+      case UNKNOWN:
+        throw new HoodieException("Received UNKNOWN primitive category.");
+      case VOID:
+        return null;
+      default: // All other primitive types are simple
+        return fieldOI.getPrimitiveJavaObject(structFieldData);
+    }
+  }
+
+  private Object serializeUnion(UnionTypeInfo typeInfo, UnionObjectInspector 
fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+    byte tag = fieldOI.getTag(structFieldData);
+
+    // Invariant that Avro's tag ordering must match Hive's.
+    return serialize(typeInfo.getAllUnionObjectTypeInfos().get(tag),
+        fieldOI.getObjectInspectors().get(tag),
+        fieldOI.getField(structFieldData),
+        schema.getTypes().get(tag));
+  }
+
+  private Object serializeList(ListTypeInfo typeInfo, ListObjectInspector 
fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+    List<?> list = fieldOI.getList(structFieldData);
+    List<Object> deserialized = new GenericData.Array<Object>(list.size(), 
schema);
+
+    TypeInfo listElementTypeInfo = typeInfo.getListElementTypeInfo();
+    ObjectInspector listElementObjectInspector = 
fieldOI.getListElementObjectInspector();
+    Schema elementType = schema.getElementType().getField("element") == null ? 
schema.getElementType() : schema.getElementType().getField("element").schema();
+
+    for (int i = 0; i < list.size(); i++) {
+      Object childFieldData = list.get(i);
+      if (childFieldData instanceof ArrayWritable && ((ArrayWritable) 
childFieldData).get().length != ((StructTypeInfo) 
listElementTypeInfo).getAllStructFieldNames().size()) {
+        deserialized.add(i, serialize(listElementTypeInfo, 
listElementObjectInspector, ((ArrayWritable) childFieldData).get()[0], 
elementType));
+      } else {
+        deserialized.add(i, serialize(listElementTypeInfo, 
listElementObjectInspector, childFieldData, elementType));
+      }
+    }
+    return deserialized;
+  }
+
+  private Object serializeMap(MapTypeInfo typeInfo, MapObjectInspector 
fieldOI, Object structFieldData, Schema schema) throws HoodieException {
+    // Avro only allows maps with string keys
+    if (!mapHasStringKey(fieldOI.getMapKeyObjectInspector())) {
+      throw new HoodieException("Avro only supports maps with keys as Strings. 
 Current Map is: " + typeInfo.toString());
+    }
+
+    ObjectInspector mapKeyObjectInspector = fieldOI.getMapKeyObjectInspector();
+    ObjectInspector mapValueObjectInspector = 
fieldOI.getMapValueObjectInspector();
+    TypeInfo mapKeyTypeInfo = typeInfo.getMapKeyTypeInfo();
+    TypeInfo mapValueTypeInfo = typeInfo.getMapValueTypeInfo();
+    Map<?,?> map = fieldOI.getMap(structFieldData);
+    Schema valueType = schema.getValueType();
+
+    Map<Object, Object> deserialized = new LinkedHashMap<Object, 
Object>(fieldOI.getMapSize(structFieldData));
+
+    for (Map.Entry<?, ?> entry : map.entrySet()) {
+      deserialized.put(serialize(mapKeyTypeInfo, mapKeyObjectInspector, 
entry.getKey(), STRING_SCHEMA),
+          serialize(mapValueTypeInfo, mapValueObjectInspector, 
entry.getValue(), valueType));
+    }
+
+    return deserialized;
+  }
+
+  private boolean mapHasStringKey(ObjectInspector mapKeyObjectInspector) {
+    return mapKeyObjectInspector instanceof PrimitiveObjectInspector
+        && ((PrimitiveObjectInspector) 
mapKeyObjectInspector).getPrimitiveCategory().equals(PrimitiveObjectInspector.PrimitiveCategory.STRING);
+  }
+
+  public static GenericRecord rewriteRecordIgnoreResultCheck(GenericRecord 
oldRecord, Schema newSchema) {
+    GenericRecord newRecord = new GenericData.Record(newSchema);
+    boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase;
+    for (Schema.Field f : newSchema.getFields()) {
+      if (!(isSpecificRecord && isMetadataField(f.name()))) {
+        copyOldValueOrSetDefault(oldRecord, newRecord, f);
+      }
+    }
+    return newRecord;
+  }
+
+  private static void copyOldValueOrSetDefault(GenericRecord oldRecord, 
GenericRecord newRecord, Schema.Field field) {
+    Schema oldSchema = oldRecord.getSchema();
+    Object fieldValue = oldSchema.getField(field.name()) == null ? null : 
oldRecord.get(field.name());
+
+    if (fieldValue != null) {
+      // In case field's value is a nested record, we have to rewrite it as 
well
+      Object newFieldValue;
+      if (fieldValue instanceof GenericRecord) {
+        GenericRecord record = (GenericRecord) fieldValue;
+        newFieldValue = rewriteRecordIgnoreResultCheck(record, 
resolveUnionSchema(field.schema(), record.getSchema().getFullName()));
+      } else {
+        newFieldValue = fieldValue;
+      }
+      newRecord.put(field.name(), newFieldValue);
+    } else if (field.defaultVal() instanceof JsonProperties.Null) {
+      newRecord.put(field.name(), null);
+    } else {
+      newRecord.put(field.name(), field.defaultVal());
+    }
+  }
+}
+
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
index 4b351d1205..44e6dd7f93 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.hadoop.utils;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -31,6 +32,10 @@ import org.apache.hudi.hadoop.realtime.RealtimeSplit;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 
 public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils {
@@ -78,7 +83,7 @@ public class HoodieRealtimeInputFormatUtils extends 
HoodieInputFormatUtils {
     return conf;
   }
 
-  public static void addRequiredProjectionFields(Configuration configuration, 
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) {
+  public static void addRequiredProjectionFields(Configuration configuration, 
Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo, Option<String> 
preCombineKeyOpt) {
     // Need this to do merge records in HoodieRealtimeRecordReader
     if (!hoodieVirtualKeyInfo.isPresent()) {
       addProjectionField(configuration, 
HoodieRecord.RECORD_KEY_METADATA_FIELD, 
HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS);
@@ -91,6 +96,18 @@ public class HoodieRealtimeInputFormatUtils extends 
HoodieInputFormatUtils {
         addProjectionField(configuration, 
hoodieVirtualKey.getPartitionPathField().get(), 
hoodieVirtualKey.getPartitionPathFieldIndex().get());
       }
     }
+
+    if (preCombineKeyOpt.isPresent()) {
+      // infer col pos
+      String preCombineKey = preCombineKeyOpt.get();
+      List<String> columnNameList = 
Arrays.stream(configuration.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList());
+      int pos = columnNameList.indexOf(preCombineKey);
+      if (pos != -1) {
+        addProjectionField(configuration, preCombineKey, pos);
+        LOG.info(String.format("add preCombineKey: %s to project columns with 
position %s", preCombineKey, pos));
+      }
+    }
+
   }
 
   public static boolean requiredProjectionFieldsExistInConf(Configuration 
configuration, Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) {
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
index bf4cbff666..e3466a6401 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.io.DateWritable;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils;
 import org.apache.hadoop.io.ArrayWritable;
@@ -54,6 +55,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.sql.Timestamp;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -176,6 +178,9 @@ public class HoodieRealtimeRecordReaderUtils {
         }
         return new IntWritable((Integer) value);
       case LONG:
+        if (schema.getLogicalType() != null && 
"timestamp-micros".equals(schema.getLogicalType().getName())) {
+          return new TimestampWritable(new Timestamp((Long) value));
+        }
         return new LongWritable((Long) value);
       case FLOAT:
         return new FloatWritable((Float) value);
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java
new file mode 100644
index 0000000000..9de4630877
--- /dev/null
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.utils;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+
+import org.apache.avro.LogicalTypes;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHiveAvroSerializer {
+
+  private static final String SIMPLE_SCHEMA = 
"{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\""
+      + ":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null},"
+      + "{\"name\":\"col1\",\"type\":[\"null\",\"long\"],\"default\":null},"
+      + "{\"name\":\"col2\",\"type\":[\"null\",\"float\"],\"default\":null},"
+      + "{\"name\":\"col3\",\"type\":[\"null\",\"double\"],\"default\":null},"
+      + 
"{\"name\":\"col4\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.h0.h0_record.col4\","
+      + 
"\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":4}],\"default\":null},"
+      + "{\"name\":\"col5\",\"type\":[\"null\",\"string\"],\"default\":null},"
+      + 
"{\"name\":\"col6\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null},"
+      + 
"{\"name\":\"col7\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}],\"default\":null},"
+      + "{\"name\":\"col8\",\"type\":[\"null\",\"boolean\"],\"default\":null},"
+      + "{\"name\":\"col9\",\"type\":[\"null\",\"bytes\"],\"default\":null},"
+      + 
"{\"name\":\"par\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}]}";
+  private static final String NESTED_CHEMA = 
"{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":["
+      + "{\"name\":\"firstname\",\"type\":\"string\"},"
+      + "{\"name\":\"lastname\",\"type\":\"string\"},"
+      + 
"{\"name\":\"student\",\"type\":{\"name\":\"student\",\"type\":\"record\",\"fields\":["
+      + "{\"name\":\"firstname\",\"type\":[\"null\" ,\"string\"],\"default\": 
null},{\"name\":\"lastname\",\"type\":[\"null\" ,\"string\"],\"default\": 
null}]}}]}";
+
+  @Test
+  public void testSerialize() {
+    Schema avroSchema = new Schema.Parser().parse(SIMPLE_SCHEMA);
+    // create a test record with avroSchema
+    GenericData.Record avroRecord = new GenericData.Record(avroSchema);
+    avroRecord.put("id", 1);
+    avroRecord.put("col1", 1000L);
+    avroRecord.put("col2", -5.001f);
+    avroRecord.put("col3", 12.999d);
+    Schema currentDecimalType = 
avroSchema.getField("col4").schema().getTypes().get(1);
+    BigDecimal bd = new BigDecimal("123.456").setScale(((LogicalTypes.Decimal) 
currentDecimalType.getLogicalType()).getScale());
+    avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, 
currentDecimalType, currentDecimalType.getLogicalType()));
+    avroRecord.put("col5", "2011-01-01");
+    avroRecord.put("col6", 18987);
+    avroRecord.put("col7", 1640491505000000L);
+    avroRecord.put("col8", false);
+    ByteBuffer bb = ByteBuffer.wrap(new byte[]{97, 48, 53});
+    avroRecord.put("col9", bb);
+    assertTrue(GenericData.get().validate(avroSchema, avroRecord));
+    ArrayWritable writable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, avroSchema);
+
+    List<Writable> writableList = 
Arrays.stream(writable.get()).collect(Collectors.toList());
+    writableList.remove(writableList.size() - 1);
+    ArrayWritable clipWritable = new ArrayWritable(writable.getValueClass(), 
writableList.toArray(new Writable[0]));
+
+    List<TypeInfo> columnTypeList = 
createHiveTypeInfoFrom("int,bigint,float,double,decimal(10,4),string,date,timestamp,boolean,binary,date");
+    List<String> columnNameList = 
createHiveColumnsFrom("id,col1,col2,col3,col4,col5,col6,col7,col8,col9,par");
+    StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+    GenericRecord testRecord = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfo), columnNameList, 
columnTypeList).serialize(writable, avroSchema);
+    assertTrue(GenericData.get().validate(avroSchema, testRecord));
+    // test
+    List<TypeInfo> columnTypeListClip = 
createHiveTypeInfoFrom("int,bigint,float,double,decimal(10,4),string,date,timestamp,boolean,binary");
+    List<String> columnNameListClip = 
createHiveColumnsFrom("id,col1,col2,col3,col4,col5,col6,col7,col8,col9");
+    StructTypeInfo rowTypeInfoClip = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameListClip, columnTypeListClip);
+    GenericRecord testRecordClip = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfoClip), columnNameListClip, 
columnTypeListClip).serialize(clipWritable, avroSchema);
+    assertTrue(GenericData.get().validate(avroSchema, testRecordClip));
+
+  }
+
+  @Test
+  public void testNestedValueSerialize() {
+    Schema nestedSchema = new Schema.Parser().parse(NESTED_CHEMA);
+    GenericRecord avroRecord = new GenericData.Record(nestedSchema);
+    avroRecord.put("firstname", "person1");
+    avroRecord.put("lastname", "person2");
+    GenericRecord studentRecord = new 
GenericData.Record(avroRecord.getSchema().getField("student").schema());
+    studentRecord.put("firstname", "person1");
+    studentRecord.put("lastname", "person2");
+    avroRecord.put("student", studentRecord);
+
+    assertTrue(GenericData.get().validate(nestedSchema, avroRecord));
+    ArrayWritable writable = (ArrayWritable) 
HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, nestedSchema);
+
+    List<TypeInfo> columnTypeList = 
createHiveTypeInfoFrom("string,string,struct<firstname:string,lastname:string>");
+    List<String> columnNameList = 
createHiveColumnsFrom("firstname,lastname,student");
+    StructTypeInfo rowTypeInfo = (StructTypeInfo) 
TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList);
+    GenericRecord testRecord = new HiveAvroSerializer(new 
ArrayWritableObjectInspector(rowTypeInfo), columnNameList, 
columnTypeList).serialize(writable, nestedSchema);
+    assertTrue(GenericData.get().validate(nestedSchema, testRecord));
+  }
+
+  private List<String> createHiveColumnsFrom(final String columnNamesStr) {
+    List<String> columnNames;
+    if (columnNamesStr.length() == 0) {
+      columnNames = new ArrayList<>();
+    } else {
+      columnNames = Arrays.asList(columnNamesStr.split(","));
+    }
+
+    return columnNames;
+  }
+
+  private List<TypeInfo> createHiveTypeInfoFrom(final String columnsTypeStr) {
+    List<TypeInfo> columnTypes;
+
+    if (columnsTypeStr.length() == 0) {
+      columnTypes = new ArrayList<>();
+    } else {
+      columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsTypeStr);
+    }
+
+    return columnTypes;
+  }
+}


Reply via email to