This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 738e2cce8f437ed3a2f7fc474e4143bb42fbad22 Author: xiarixiaoyao <mengtao0...@qq.com> AuthorDate: Thu Nov 3 23:11:39 2022 +0800 [HUDI-4898] presto/hive respect payload during merge parquet file and logfile when reading mor table (#6741) * [HUDI-4898] presto/hive respect payload during merge parquet file and logfile when reading mor table * Update HiveAvroSerializer.java otherwise payload string type combine field will cause cast exception (cherry picked from commit cd314b8cfa58c32f731f7da2aa6377a09df4c6f9) --- .../realtime/AbstractRealtimeRecordReader.java | 72 +++- .../realtime/HoodieHFileRealtimeInputFormat.java | 2 +- .../realtime/HoodieParquetRealtimeInputFormat.java | 14 +- .../realtime/RealtimeCompactedRecordReader.java | 25 +- .../hudi/hadoop/utils/HiveAvroSerializer.java | 409 +++++++++++++++++++++ .../utils/HoodieRealtimeInputFormatUtils.java | 19 +- .../utils/HoodieRealtimeRecordReaderUtils.java | 5 + .../hudi/hadoop/utils/TestHiveAvroSerializer.java | 148 ++++++++ 8 files changed, 678 insertions(+), 16 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java index dfdda9dfc8..83b69812e1 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/AbstractRealtimeRecordReader.java @@ -18,26 +18,34 @@ package org.apache.hudi.hadoop.realtime; -import org.apache.hudi.common.model.HoodieAvroPayload; -import org.apache.hudi.common.model.HoodiePayloadProps; -import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.exception.HoodieException; -import org.apache.hudi.common.table.TableSchemaResolver; -import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; - import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.mapred.JobConf; +import org.apache.hudi.common.model.HoodieAvroPayload; +import org.apache.hudi.common.model.HoodiePayloadProps; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.hadoop.utils.HiveAvroSerializer; +import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Properties; +import java.util.Set; import java.util.stream.Collectors; /** @@ -55,6 +63,10 @@ public abstract class AbstractRealtimeRecordReader { private Schema writerSchema; private Schema hiveSchema; private HoodieTableMetaClient metaClient; + // support merge operation + protected boolean supportPayload = true; + // handle hive type to avro record + protected HiveAvroSerializer serializer; public AbstractRealtimeRecordReader(RealtimeSplit split, JobConf job) { this.split = split; @@ -62,6 +74,7 @@ public abstract class AbstractRealtimeRecordReader { LOG.info("cfg ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR)); LOG.info("columnIds ==> " + job.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); LOG.info("partitioningColumns ==> " + job.get(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS, "")); + this.supportPayload = Boolean.parseBoolean(job.get("hoodie.support.payload", "true")); try { metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(split.getBasePath()).build(); if (metaClient.getTableConfig().getPreCombineField() != null) { @@ -73,6 +86,7 @@ public abstract class AbstractRealtimeRecordReader { } catch (Exception e) { throw new HoodieException("Could not create HoodieRealtimeRecordReader on path " + this.split.getPath(), e); } + prepareHiveAvroSerializer(); } private boolean usesCustomPayload(HoodieTableMetaClient metaClient) { @@ -80,6 +94,34 @@ public abstract class AbstractRealtimeRecordReader { || metaClient.getTableConfig().getPayloadClass().contains("org.apache.hudi.OverwriteWithLatestAvroPayload")); } + private void prepareHiveAvroSerializer() { + try { + // hive will append virtual columns at the end of column list. we should remove those columns. + // eg: current table is col1, col2, col3; jobConf.get(serdeConstants.LIST_COLUMNS): col1, col2, col3 ,BLOCK__OFFSET__INSIDE__FILE ... + Set<String> writerSchemaColNames = writerSchema.getFields().stream().map(f -> f.name().toLowerCase(Locale.ROOT)).collect(Collectors.toSet()); + List<String> columnNameList = Arrays.stream(jobConf.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList()); + List<TypeInfo> columnTypeList = TypeInfoUtils.getTypeInfosFromTypeString(jobConf.get(serdeConstants.LIST_COLUMN_TYPES)); + int columnNameListLen = columnNameList.size() - 1; + for (int i = columnNameListLen; i >= 0; i--) { + String lastColName = columnNameList.get(columnNameList.size() - 1); + // virtual columns will only append at the end of column list. it will be ok to break the loop. + if (writerSchemaColNames.contains(lastColName)) { + break; + } + LOG.debug(String.format("remove virtual column: %s", lastColName)); + columnNameList.remove(columnNameList.size() - 1); + columnTypeList.remove(columnTypeList.size() - 1); + } + StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); + this.serializer = new HiveAvroSerializer(new ArrayWritableObjectInspector(rowTypeInfo), columnNameList, columnTypeList); + } catch (Exception e) { + // fallback to origin logical + LOG.warn("fall to init HiveAvroSerializer to support payload merge", e); + this.supportPayload = false; + } + + } + /** * Gets schema from HoodieTableMetaClient. If not, falls * back to the schema from the latest parquet file. Finally, sets the partition column and projection fields into the @@ -135,6 +177,10 @@ public abstract class AbstractRealtimeRecordReader { return hiveSchema; } + protected Schema getLogScannerReaderSchema() { + return usesCustomPayload ? writerSchema : readerSchema; + } + public Schema getReaderSchema() { return readerSchema; } @@ -154,4 +200,16 @@ public abstract class AbstractRealtimeRecordReader { public JobConf getJobConf() { return jobConf; } + + public void setReaderSchema(Schema readerSchema) { + this.readerSchema = readerSchema; + } + + public void setWriterSchema(Schema writerSchema) { + this.writerSchema = writerSchema; + } + + public void setHiveSchema(Schema hiveSchema) { + this.hiveSchema = hiveSchema; + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java index 799d90bce5..cdc062475f 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java @@ -72,7 +72,7 @@ public class HoodieHFileRealtimeInputFormat extends HoodieMergeOnReadTableInputF // For e:g _hoodie_record_key would be missing and merge step would throw exceptions. // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction // time. - HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, Option.empty()); + HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, Option.empty(), Option.empty()); this.conf = jobConf; this.conf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true"); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java index e8c806ed2c..78768104d9 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieParquetRealtimeInputFormat.java @@ -27,6 +27,10 @@ import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.hadoop.HoodieParquetInputFormat; import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat; import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat; @@ -61,7 +65,10 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat { ValidationUtils.checkArgument(split instanceof RealtimeSplit, "HoodieRealtimeRecordReader can only work on RealtimeSplit and not with " + split); RealtimeSplit realtimeSplit = (RealtimeSplit) split; - addProjectionToJobConf(realtimeSplit, jobConf); + // add preCombineKey + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(jobConf).setBasePath(realtimeSplit.getBasePath()).build(); + HoodieTableConfig tableConfig = metaClient.getTableConfig(); + addProjectionToJobConf(realtimeSplit, jobConf, metaClient.getTableConfig().getPreCombineField()); LOG.info("Creating record reader with readCols :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR) + ", Ids :" + jobConf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR)); @@ -74,7 +81,7 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat { super.getRecordReader(split, jobConf, reporter)); } - void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf) { + void addProjectionToJobConf(final RealtimeSplit realtimeSplit, final JobConf jobConf, String preCombineKey) { // Hive on Spark invokes multiple getRecordReaders from different threads in the same spark task (and hence the // same JVM) unlike Hive on MR. Due to this, accesses to JobConf, which is shared across all threads, is at the // risk of experiencing race conditions. Hence, we synchronize on the JobConf object here. There is negligible @@ -94,7 +101,8 @@ public class HoodieParquetRealtimeInputFormat extends HoodieParquetInputFormat { // TO fix this, hoodie columns are appended late at the time record-reader gets built instead of construction // time. if (!realtimeSplit.getDeltaLogPaths().isEmpty()) { - HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, realtimeSplit.getVirtualKeyInfo()); + HoodieRealtimeInputFormatUtils.addRequiredProjectionFields(jobConf, realtimeSplit.getVirtualKeyInfo(), + StringUtils.isNullOrEmpty(preCombineKey) ? Option.empty() : Option.of(preCombineKey)); } jobConf.set(HoodieInputFormatUtils.HOODIE_READ_COLUMNS_PROP, "true"); setConf(jobConf); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java index b917f004bc..0143672fa0 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeCompactedRecordReader.java @@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner; import org.apache.hudi.common.util.Option; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; +import org.apache.hudi.hadoop.utils.HiveAvroSerializer; import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils; import org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils; import org.apache.log4j.LogManager; @@ -81,7 +82,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader .withFileSystem(FSUtils.getFs(split.getPath().toString(), jobConf)) .withBasePath(split.getBasePath()) .withLogFilePaths(split.getDeltaLogPaths()) - .withReaderSchema(usesCustomPayload ? getWriterSchema() : getReaderSchema()) + .withReaderSchema(getLogScannerReaderSchema()) .withLatestInstantTime(split.getMaxCommitTime()) .withMaxMemorySizeInBytes(HoodieRealtimeRecordReaderUtils.getMaxCompactionMemoryInBytes(jobConf)) .withReadBlocksLazily(Boolean.parseBoolean(jobConf.get(HoodieRealtimeConfig.COMPACTION_LAZY_BLOCK_READ_ENABLED_PROP, HoodieRealtimeConfig.DEFAULT_COMPACTION_LAZY_BLOCK_READ_ENABLED))) @@ -112,9 +113,7 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader if (deltaRecordMap.containsKey(key)) { // mark the key as handled this.deltaRecordKeys.remove(key); - // TODO(NA): Invoke preCombine here by converting arrayWritable to Avro. This is required since the - // deltaRecord may not be a full record and needs values of columns from the parquet - Option<GenericRecord> rec = buildGenericRecordwithCustomPayload(deltaRecordMap.get(key)); + Option<GenericRecord> rec = supportPayload ? mergeRecord(deltaRecordMap.get(key), arrayWritable) : buildGenericRecordwithCustomPayload(deltaRecordMap.get(key)); // If the record is not present, this is a delete record using an empty payload so skip this base record // and move to the next record if (!rec.isPresent()) { @@ -173,6 +172,24 @@ class RealtimeCompactedRecordReader extends AbstractRealtimeRecordReader } } + private Option<GenericRecord> mergeRecord(HoodieRecord<? extends HoodieRecordPayload> newRecord, ArrayWritable writableFromParquet) throws IOException { + GenericRecord oldRecord = convertArrayWritableToHoodieRecord(writableFromParquet); + // presto will not append partition columns to jobConf.get(serdeConstants.LIST_COLUMNS), but hive will do it. This will lead following results + // eg: current table: col1: int, col2: int, par: string, and column par is partition columns. + // for hive engine, the hiveSchema will be: col1,col2,par, and the writerSchema will be col1,col2,par + // for presto engine, the hiveSchema will be: col1,col2, but the writerSchema will be col1,col2,par + // so to be compatible with hive and presto, we should rewrite oldRecord before we call combineAndGetUpdateValue, + // once presto on hudi have it's own mor reader, we can remove the rewrite logical. + Option<GenericRecord> combinedValue = newRecord.getData().combineAndGetUpdateValue(HiveAvroSerializer.rewriteRecordIgnoreResultCheck(oldRecord, + getLogScannerReaderSchema()), getLogScannerReaderSchema(), payloadProps); + return combinedValue; + } + + private GenericRecord convertArrayWritableToHoodieRecord(ArrayWritable arrayWritable) { + GenericRecord record = serializer.serialize(arrayWritable, getHiveSchema()); + return record; + } + @Override public NullWritable createKey() { return parquetReader.createKey(); diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java new file mode 100644 index 0000000000..16942ba8b3 --- /dev/null +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HiveAvroSerializer.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.utils; + +import org.apache.avro.JsonProperties; +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericEnumSymbol; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.avro.util.Utf8; +import org.apache.hadoop.hive.common.type.HiveChar; +import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.common.type.HiveVarchar; +import org.apache.hadoop.hive.serde2.avro.AvroSerdeUtils; +import org.apache.hadoop.hive.serde2.avro.InstanceCache; +import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableDateObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.hudi.avro.AvroSchemaUtils.resolveUnionSchema; +import static org.apache.hudi.avro.HoodieAvroUtils.isMetadataField; + +/** + * Helper class to serialize hive writable type to avro record. + */ +public class HiveAvroSerializer { + + private final List<String> columnNames; + private final List<TypeInfo> columnTypes; + private final ObjectInspector objectInspector; + + private static final Logger LOG = LogManager.getLogger(HiveAvroSerializer.class); + + public HiveAvroSerializer(ObjectInspector objectInspector, List<String> columnNames, List<TypeInfo> columnTypes) { + this.columnNames = columnNames; + this.columnTypes = columnTypes; + this.objectInspector = objectInspector; + } + + private static final Schema STRING_SCHEMA = Schema.create(Schema.Type.STRING); + + public GenericRecord serialize(Object o, Schema schema) { + + StructObjectInspector soi = (StructObjectInspector) objectInspector; + GenericData.Record record = new GenericData.Record(schema); + + List<? extends StructField> outputFieldRefs = soi.getAllStructFieldRefs(); + if (outputFieldRefs.size() != columnNames.size()) { + throw new HoodieException("Number of input columns was different than output columns (in = " + columnNames.size() + " vs out = " + outputFieldRefs.size()); + } + + int size = schema.getFields().size(); + + List<? extends StructField> allStructFieldRefs = soi.getAllStructFieldRefs(); + List<Object> structFieldsDataAsList = soi.getStructFieldsDataAsList(o); + + for (int i = 0; i < size; i++) { + Schema.Field field = schema.getFields().get(i); + if (i >= columnTypes.size()) { + break; + } + try { + setUpRecordFieldFromWritable(columnTypes.get(i), structFieldsDataAsList.get(i), + allStructFieldRefs.get(i).getFieldObjectInspector(), record, field); + } catch (Exception e) { + LOG.error(String.format("current columnNames: %s", columnNames.stream().collect(Collectors.joining(",")))); + LOG.error(String.format("current type: %s", columnTypes.stream().map(f -> f.getTypeName()).collect(Collectors.joining(",")))); + LOG.error(String.format("current value: %s", HoodieRealtimeRecordReaderUtils.arrayWritableToString((ArrayWritable) o))); + throw e; + } + } + return record; + } + + private void setUpRecordFieldFromWritable(TypeInfo typeInfo, Object structFieldData, ObjectInspector fieldOI, GenericData.Record record, Schema.Field field) { + Object val = serialize(typeInfo, fieldOI, structFieldData, field.schema()); + if (val == null) { + if (field.defaultVal() instanceof JsonProperties.Null) { + record.put(field.name(), null); + } else { + record.put(field.name(), field.defaultVal()); + } + } else { + record.put(field.name(), val); + } + } + + /** + * Determine if an Avro schema is of type Union[T, NULL]. Avro supports nullable + * types via a union of type T and null. This is a very common use case. + * As such, we want to silently convert it to just T and allow the value to be null. + * + * When a Hive union type is used with AVRO, the schema type becomes + * Union[NULL, T1, T2, ...]. The NULL in the union should be silently removed + * + * @return true if type represents Union[T, Null], false otherwise + */ + public static boolean isNullableType(Schema schema) { + if (!schema.getType().equals(Schema.Type.UNION)) { + return false; + } + + List<Schema> itemSchemas = schema.getTypes(); + if (itemSchemas.size() < 2) { + return false; + } + + for (Schema itemSchema : itemSchemas) { + if (Schema.Type.NULL.equals(itemSchema.getType())) { + return true; + } + } + + // [null, null] not allowed, so this check is ok. + return false; + } + + /** + * If the union schema is a nullable union, get the schema for the non-nullable type. + * This method does no checking that the provided Schema is nullable. If the provided + * union schema is non-nullable, it simply returns the union schema + */ + public static Schema getOtherTypeFromNullableType(Schema unionSchema) { + final List<Schema> types = unionSchema.getTypes(); + if (types.size() == 2) { // most common scenario + if (types.get(0).getType() == Schema.Type.NULL) { + return types.get(1); + } + if (types.get(1).getType() == Schema.Type.NULL) { + return types.get(0); + } + // not a nullable union + return unionSchema; + } + + final List<Schema> itemSchemas = new ArrayList<>(); + for (Schema itemSchema : types) { + if (!Schema.Type.NULL.equals(itemSchema.getType())) { + itemSchemas.add(itemSchema); + } + } + + if (itemSchemas.size() > 1) { + return Schema.createUnion(itemSchemas); + } else { + return itemSchemas.get(0); + } + } + + private Object serialize(TypeInfo typeInfo, ObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException { + if (null == structFieldData) { + return null; + } + + if (isNullableType(schema)) { + schema = getOtherTypeFromNullableType(schema); + } + /* Because we use Hive's 'string' type when Avro calls for enum, we have to expressly check for enum-ness */ + if (Schema.Type.ENUM.equals(schema.getType())) { + assert fieldOI instanceof PrimitiveObjectInspector; + return serializeEnum((PrimitiveObjectInspector) fieldOI, structFieldData, schema); + } + switch (typeInfo.getCategory()) { + case PRIMITIVE: + assert fieldOI instanceof PrimitiveObjectInspector; + return serializePrimitive((PrimitiveObjectInspector) fieldOI, structFieldData, schema); + case MAP: + assert fieldOI instanceof MapObjectInspector; + assert typeInfo instanceof MapTypeInfo; + return serializeMap((MapTypeInfo) typeInfo, (MapObjectInspector) fieldOI, structFieldData, schema); + case LIST: + assert fieldOI instanceof ListObjectInspector; + assert typeInfo instanceof ListTypeInfo; + return serializeList((ListTypeInfo) typeInfo, (ListObjectInspector) fieldOI, structFieldData, schema); + case UNION: + assert fieldOI instanceof UnionObjectInspector; + assert typeInfo instanceof UnionTypeInfo; + return serializeUnion((UnionTypeInfo) typeInfo, (UnionObjectInspector) fieldOI, structFieldData, schema); + case STRUCT: + assert fieldOI instanceof StructObjectInspector; + assert typeInfo instanceof StructTypeInfo; + return serializeStruct((StructTypeInfo) typeInfo, (StructObjectInspector) fieldOI, structFieldData, schema); + default: + throw new HoodieException("Ran out of TypeInfo Categories: " + typeInfo.getCategory()); + } + } + + /** private cache to avoid lots of EnumSymbol creation while serializing. + * Two levels because the enum symbol is specific to a schema. + * Object because we want to avoid the overhead of repeated toString calls while maintaining compatability. + * Provided there are few enum types per record, and few symbols per enum, memory use should be moderate. + * eg 20 types with 50 symbols each as length-10 Strings should be on the order of 100KB per AvroSerializer. + */ + final InstanceCache<Schema, InstanceCache<Object, GenericEnumSymbol>> enums = new InstanceCache<Schema, InstanceCache<Object, GenericEnumSymbol>>() { + @Override + protected InstanceCache<Object, GenericEnumSymbol> makeInstance(final Schema schema, + Set<Schema> seenSchemas) { + return new InstanceCache<Object, GenericEnumSymbol>() { + @Override + protected GenericEnumSymbol makeInstance(Object seed, Set<Object> seenSchemas) { + return new GenericData.EnumSymbol(schema, seed.toString()); + } + }; + } + }; + + private Object serializeEnum(PrimitiveObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException { + try { + return enums.retrieve(schema).retrieve(serializePrimitive(fieldOI, structFieldData, schema)); + } catch (Exception e) { + throw new HoodieException(e); + } + } + + private Object serializeStruct(StructTypeInfo typeInfo, StructObjectInspector ssoi, Object o, Schema schema) { + int size = schema.getFields().size(); + List<? extends StructField> allStructFieldRefs = ssoi.getAllStructFieldRefs(); + List<Object> structFieldsDataAsList = ssoi.getStructFieldsDataAsList(o); + GenericData.Record record = new GenericData.Record(schema); + ArrayList<TypeInfo> allStructFieldTypeInfos = typeInfo.getAllStructFieldTypeInfos(); + + for (int i = 0; i < size; i++) { + Schema.Field field = schema.getFields().get(i); + setUpRecordFieldFromWritable(allStructFieldTypeInfos.get(i), structFieldsDataAsList.get(i), + allStructFieldRefs.get(i).getFieldObjectInspector(), record, field); + } + return record; + } + + private Object serializePrimitive(PrimitiveObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException { + switch (fieldOI.getPrimitiveCategory()) { + case BINARY: + if (schema.getType() == Schema.Type.BYTES) { + return AvroSerdeUtils.getBufferFromBytes((byte[])fieldOI.getPrimitiveJavaObject(structFieldData)); + } else if (schema.getType() == Schema.Type.FIXED) { + GenericData.Fixed fixed = new GenericData.Fixed(schema, (byte[])fieldOI.getPrimitiveJavaObject(structFieldData)); + return fixed; + } else { + throw new HoodieException("Unexpected Avro schema for Binary TypeInfo: " + schema.getType()); + } + case DECIMAL: + HiveDecimal dec = (HiveDecimal)fieldOI.getPrimitiveJavaObject(structFieldData); + LogicalTypes.Decimal decimal = (LogicalTypes.Decimal)schema.getLogicalType(); + BigDecimal bd = new BigDecimal(dec.toString()).setScale(decimal.getScale()); + return HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, schema, decimal); + case CHAR: + HiveChar ch = (HiveChar)fieldOI.getPrimitiveJavaObject(structFieldData); + return new Utf8(ch.getStrippedValue()); + case VARCHAR: + HiveVarchar vc = (HiveVarchar)fieldOI.getPrimitiveJavaObject(structFieldData); + return new Utf8(vc.getValue()); + case STRING: + String string = (String)fieldOI.getPrimitiveJavaObject(structFieldData); + return new Utf8(string); + case DATE: + return DateWritable.dateToDays(((DateObjectInspector)fieldOI).getPrimitiveJavaObject(structFieldData)); + case TIMESTAMP: + Timestamp timestamp = + ((TimestampObjectInspector) fieldOI).getPrimitiveJavaObject(structFieldData); + return timestamp.getTime(); + case INT: + if (schema.getLogicalType() != null && schema.getLogicalType().getName().equals("date")) { + return DateWritable.dateToDays(new WritableDateObjectInspector().getPrimitiveJavaObject(structFieldData)); + } + return fieldOI.getPrimitiveJavaObject(structFieldData); + case UNKNOWN: + throw new HoodieException("Received UNKNOWN primitive category."); + case VOID: + return null; + default: // All other primitive types are simple + return fieldOI.getPrimitiveJavaObject(structFieldData); + } + } + + private Object serializeUnion(UnionTypeInfo typeInfo, UnionObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException { + byte tag = fieldOI.getTag(structFieldData); + + // Invariant that Avro's tag ordering must match Hive's. + return serialize(typeInfo.getAllUnionObjectTypeInfos().get(tag), + fieldOI.getObjectInspectors().get(tag), + fieldOI.getField(structFieldData), + schema.getTypes().get(tag)); + } + + private Object serializeList(ListTypeInfo typeInfo, ListObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException { + List<?> list = fieldOI.getList(structFieldData); + List<Object> deserialized = new GenericData.Array<Object>(list.size(), schema); + + TypeInfo listElementTypeInfo = typeInfo.getListElementTypeInfo(); + ObjectInspector listElementObjectInspector = fieldOI.getListElementObjectInspector(); + Schema elementType = schema.getElementType().getField("element") == null ? schema.getElementType() : schema.getElementType().getField("element").schema(); + + for (int i = 0; i < list.size(); i++) { + Object childFieldData = list.get(i); + if (childFieldData instanceof ArrayWritable && ((ArrayWritable) childFieldData).get().length != ((StructTypeInfo) listElementTypeInfo).getAllStructFieldNames().size()) { + deserialized.add(i, serialize(listElementTypeInfo, listElementObjectInspector, ((ArrayWritable) childFieldData).get()[0], elementType)); + } else { + deserialized.add(i, serialize(listElementTypeInfo, listElementObjectInspector, childFieldData, elementType)); + } + } + return deserialized; + } + + private Object serializeMap(MapTypeInfo typeInfo, MapObjectInspector fieldOI, Object structFieldData, Schema schema) throws HoodieException { + // Avro only allows maps with string keys + if (!mapHasStringKey(fieldOI.getMapKeyObjectInspector())) { + throw new HoodieException("Avro only supports maps with keys as Strings. Current Map is: " + typeInfo.toString()); + } + + ObjectInspector mapKeyObjectInspector = fieldOI.getMapKeyObjectInspector(); + ObjectInspector mapValueObjectInspector = fieldOI.getMapValueObjectInspector(); + TypeInfo mapKeyTypeInfo = typeInfo.getMapKeyTypeInfo(); + TypeInfo mapValueTypeInfo = typeInfo.getMapValueTypeInfo(); + Map<?,?> map = fieldOI.getMap(structFieldData); + Schema valueType = schema.getValueType(); + + Map<Object, Object> deserialized = new LinkedHashMap<Object, Object>(fieldOI.getMapSize(structFieldData)); + + for (Map.Entry<?, ?> entry : map.entrySet()) { + deserialized.put(serialize(mapKeyTypeInfo, mapKeyObjectInspector, entry.getKey(), STRING_SCHEMA), + serialize(mapValueTypeInfo, mapValueObjectInspector, entry.getValue(), valueType)); + } + + return deserialized; + } + + private boolean mapHasStringKey(ObjectInspector mapKeyObjectInspector) { + return mapKeyObjectInspector instanceof PrimitiveObjectInspector + && ((PrimitiveObjectInspector) mapKeyObjectInspector).getPrimitiveCategory().equals(PrimitiveObjectInspector.PrimitiveCategory.STRING); + } + + public static GenericRecord rewriteRecordIgnoreResultCheck(GenericRecord oldRecord, Schema newSchema) { + GenericRecord newRecord = new GenericData.Record(newSchema); + boolean isSpecificRecord = oldRecord instanceof SpecificRecordBase; + for (Schema.Field f : newSchema.getFields()) { + if (!(isSpecificRecord && isMetadataField(f.name()))) { + copyOldValueOrSetDefault(oldRecord, newRecord, f); + } + } + return newRecord; + } + + private static void copyOldValueOrSetDefault(GenericRecord oldRecord, GenericRecord newRecord, Schema.Field field) { + Schema oldSchema = oldRecord.getSchema(); + Object fieldValue = oldSchema.getField(field.name()) == null ? null : oldRecord.get(field.name()); + + if (fieldValue != null) { + // In case field's value is a nested record, we have to rewrite it as well + Object newFieldValue; + if (fieldValue instanceof GenericRecord) { + GenericRecord record = (GenericRecord) fieldValue; + newFieldValue = rewriteRecordIgnoreResultCheck(record, resolveUnionSchema(field.schema(), record.getSchema().getFullName())); + } else { + newFieldValue = fieldValue; + } + newRecord.put(field.name(), newFieldValue); + } else if (field.defaultVal() instanceof JsonProperties.Null) { + newRecord.put(field.name(), null); + } else { + newRecord.put(field.name(), field.defaultVal()); + } + } +} + diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java index 4b351d1205..44e6dd7f93 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeInputFormatUtils.java @@ -19,6 +19,7 @@ package org.apache.hudi.hadoop.utils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; @@ -31,6 +32,10 @@ import org.apache.hudi.hadoop.realtime.RealtimeSplit; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + import static org.apache.hudi.common.util.TypeUtils.unsafeCast; public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { @@ -78,7 +83,7 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { return conf; } - public static void addRequiredProjectionFields(Configuration configuration, Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) { + public static void addRequiredProjectionFields(Configuration configuration, Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo, Option<String> preCombineKeyOpt) { // Need this to do merge records in HoodieRealtimeRecordReader if (!hoodieVirtualKeyInfo.isPresent()) { addProjectionField(configuration, HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieInputFormatUtils.HOODIE_RECORD_KEY_COL_POS); @@ -91,6 +96,18 @@ public class HoodieRealtimeInputFormatUtils extends HoodieInputFormatUtils { addProjectionField(configuration, hoodieVirtualKey.getPartitionPathField().get(), hoodieVirtualKey.getPartitionPathFieldIndex().get()); } } + + if (preCombineKeyOpt.isPresent()) { + // infer col pos + String preCombineKey = preCombineKeyOpt.get(); + List<String> columnNameList = Arrays.stream(configuration.get(serdeConstants.LIST_COLUMNS).split(",")).collect(Collectors.toList()); + int pos = columnNameList.indexOf(preCombineKey); + if (pos != -1) { + addProjectionField(configuration, preCombineKey, pos); + LOG.info(String.format("add preCombineKey: %s to project columns with position %s", preCombineKey, pos)); + } + } + } public static boolean requiredProjectionFieldsExistInConf(Configuration configuration, Option<HoodieVirtualKeyInfo> hoodieVirtualKeyInfo) { diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java index bf4cbff666..e3466a6401 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieRealtimeRecordReaderUtils.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.HiveDecimalUtils; import org.apache.hadoop.io.ArrayWritable; @@ -54,6 +55,7 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Arrays; +import java.sql.Timestamp; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -176,6 +178,9 @@ public class HoodieRealtimeRecordReaderUtils { } return new IntWritable((Integer) value); case LONG: + if (schema.getLogicalType() != null && "timestamp-micros".equals(schema.getLogicalType().getName())) { + return new TimestampWritable(new Timestamp((Long) value)); + } return new LongWritable((Long) value); case FLOAT: return new FloatWritable((Float) value); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java new file mode 100644 index 0000000000..9de4630877 --- /dev/null +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHiveAvroSerializer.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.hadoop.utils; + +import org.apache.hudi.avro.HoodieAvroUtils; + +import org.apache.avro.LogicalTypes; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestHiveAvroSerializer { + + private static final String SIMPLE_SCHEMA = "{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\"" + + ":[{\"name\":\"id\",\"type\":[\"null\",\"int\"],\"default\":null}," + + "{\"name\":\"col1\",\"type\":[\"null\",\"long\"],\"default\":null}," + + "{\"name\":\"col2\",\"type\":[\"null\",\"float\"],\"default\":null}," + + "{\"name\":\"col3\",\"type\":[\"null\",\"double\"],\"default\":null}," + + "{\"name\":\"col4\",\"type\":[\"null\",{\"type\":\"fixed\",\"name\":\"fixed\",\"namespace\":\"hoodie.h0.h0_record.col4\"," + + "\"size\":5,\"logicalType\":\"decimal\",\"precision\":10,\"scale\":4}],\"default\":null}," + + "{\"name\":\"col5\",\"type\":[\"null\",\"string\"],\"default\":null}," + + "{\"name\":\"col6\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}," + + "{\"name\":\"col7\",\"type\":[\"null\",{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}],\"default\":null}," + + "{\"name\":\"col8\",\"type\":[\"null\",\"boolean\"],\"default\":null}," + + "{\"name\":\"col9\",\"type\":[\"null\",\"bytes\"],\"default\":null}," + + "{\"name\":\"par\",\"type\":[\"null\",{\"type\":\"int\",\"logicalType\":\"date\"}],\"default\":null}]}"; + private static final String NESTED_CHEMA = "{\"name\":\"MyClass\",\"type\":\"record\",\"namespace\":\"com.acme.avro\",\"fields\":[" + + "{\"name\":\"firstname\",\"type\":\"string\"}," + + "{\"name\":\"lastname\",\"type\":\"string\"}," + + "{\"name\":\"student\",\"type\":{\"name\":\"student\",\"type\":\"record\",\"fields\":[" + + "{\"name\":\"firstname\",\"type\":[\"null\" ,\"string\"],\"default\": null},{\"name\":\"lastname\",\"type\":[\"null\" ,\"string\"],\"default\": null}]}}]}"; + + @Test + public void testSerialize() { + Schema avroSchema = new Schema.Parser().parse(SIMPLE_SCHEMA); + // create a test record with avroSchema + GenericData.Record avroRecord = new GenericData.Record(avroSchema); + avroRecord.put("id", 1); + avroRecord.put("col1", 1000L); + avroRecord.put("col2", -5.001f); + avroRecord.put("col3", 12.999d); + Schema currentDecimalType = avroSchema.getField("col4").schema().getTypes().get(1); + BigDecimal bd = new BigDecimal("123.456").setScale(((LogicalTypes.Decimal) currentDecimalType.getLogicalType()).getScale()); + avroRecord.put("col4", HoodieAvroUtils.DECIMAL_CONVERSION.toFixed(bd, currentDecimalType, currentDecimalType.getLogicalType())); + avroRecord.put("col5", "2011-01-01"); + avroRecord.put("col6", 18987); + avroRecord.put("col7", 1640491505000000L); + avroRecord.put("col8", false); + ByteBuffer bb = ByteBuffer.wrap(new byte[]{97, 48, 53}); + avroRecord.put("col9", bb); + assertTrue(GenericData.get().validate(avroSchema, avroRecord)); + ArrayWritable writable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, avroSchema); + + List<Writable> writableList = Arrays.stream(writable.get()).collect(Collectors.toList()); + writableList.remove(writableList.size() - 1); + ArrayWritable clipWritable = new ArrayWritable(writable.getValueClass(), writableList.toArray(new Writable[0])); + + List<TypeInfo> columnTypeList = createHiveTypeInfoFrom("int,bigint,float,double,decimal(10,4),string,date,timestamp,boolean,binary,date"); + List<String> columnNameList = createHiveColumnsFrom("id,col1,col2,col3,col4,col5,col6,col7,col8,col9,par"); + StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); + GenericRecord testRecord = new HiveAvroSerializer(new ArrayWritableObjectInspector(rowTypeInfo), columnNameList, columnTypeList).serialize(writable, avroSchema); + assertTrue(GenericData.get().validate(avroSchema, testRecord)); + // test + List<TypeInfo> columnTypeListClip = createHiveTypeInfoFrom("int,bigint,float,double,decimal(10,4),string,date,timestamp,boolean,binary"); + List<String> columnNameListClip = createHiveColumnsFrom("id,col1,col2,col3,col4,col5,col6,col7,col8,col9"); + StructTypeInfo rowTypeInfoClip = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameListClip, columnTypeListClip); + GenericRecord testRecordClip = new HiveAvroSerializer(new ArrayWritableObjectInspector(rowTypeInfoClip), columnNameListClip, columnTypeListClip).serialize(clipWritable, avroSchema); + assertTrue(GenericData.get().validate(avroSchema, testRecordClip)); + + } + + @Test + public void testNestedValueSerialize() { + Schema nestedSchema = new Schema.Parser().parse(NESTED_CHEMA); + GenericRecord avroRecord = new GenericData.Record(nestedSchema); + avroRecord.put("firstname", "person1"); + avroRecord.put("lastname", "person2"); + GenericRecord studentRecord = new GenericData.Record(avroRecord.getSchema().getField("student").schema()); + studentRecord.put("firstname", "person1"); + studentRecord.put("lastname", "person2"); + avroRecord.put("student", studentRecord); + + assertTrue(GenericData.get().validate(nestedSchema, avroRecord)); + ArrayWritable writable = (ArrayWritable) HoodieRealtimeRecordReaderUtils.avroToArrayWritable(avroRecord, nestedSchema); + + List<TypeInfo> columnTypeList = createHiveTypeInfoFrom("string,string,struct<firstname:string,lastname:string>"); + List<String> columnNameList = createHiveColumnsFrom("firstname,lastname,student"); + StructTypeInfo rowTypeInfo = (StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); + GenericRecord testRecord = new HiveAvroSerializer(new ArrayWritableObjectInspector(rowTypeInfo), columnNameList, columnTypeList).serialize(writable, nestedSchema); + assertTrue(GenericData.get().validate(nestedSchema, testRecord)); + } + + private List<String> createHiveColumnsFrom(final String columnNamesStr) { + List<String> columnNames; + if (columnNamesStr.length() == 0) { + columnNames = new ArrayList<>(); + } else { + columnNames = Arrays.asList(columnNamesStr.split(",")); + } + + return columnNames; + } + + private List<TypeInfo> createHiveTypeInfoFrom(final String columnsTypeStr) { + List<TypeInfo> columnTypes; + + if (columnsTypeStr.length() == 0) { + columnTypes = new ArrayList<>(); + } else { + columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnsTypeStr); + } + + return columnTypes; + } +}