This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ee485c234ff9 [HUDI-9705] Fix bugs in spark and avro reader contexts
for type promotion and field renaming (#13714)
ee485c234ff9 is described below
commit ee485c234ff948311e9cb7defb7adb9a8a512def
Author: Tim Brown <[email protected]>
AuthorDate: Wed Aug 13 16:57:22 2025 -0400
[HUDI-9705] Fix bugs in spark and avro reader contexts for type promotion
and field renaming (#13714)
---
.../parquet/HoodieParquetFileFormatHelper.scala | 33 ++++++---
.../apache/hudi/avro/HoodieAvroReaderContext.java | 22 +++++-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 32 ++++-----
.../table/read/FileGroupReaderSchemaHandler.java | 25 ++++++-
.../common/table/read/HoodieFileGroupReader.java | 4 +-
.../read/ParquetRowIndexBasedSchemaHandler.java | 8 +--
.../hudi/io/storage/HoodieAvroFileReader.java | 8 ++-
.../io/storage/HoodieNativeAvroHFileReader.java | 5 +-
.../hudi/metadata/HoodieBackedTableMetadata.java | 3 +-
.../hudi/metadata/HoodieTableMetadataUtil.java | 2 +-
.../org/apache/hudi/avro/TestHoodieAvroUtils.java | 49 ++++++++++++++
.../common/table/read/SchemaHandlerTestBase.java | 26 +++++---
.../read/TestFileGroupReaderSchemaHandler.java | 78 ++++++++++++++++------
.../TestParquetRowIndexBasedSchemaHandler.java | 13 ++--
.../buffer/TestKeyBasedFileGroupRecordBuffer.java | 17 ++---
.../TestSortedKeyBasedFileGroupRecordBuffer.java | 4 +-
...TestStreamingKeyBasedFileGroupRecordBuffer.java | 17 ++---
.../apache/hudi/io/hadoop/HoodieAvroOrcReader.java | 7 +-
.../hudi/io/hadoop/HoodieAvroParquetReader.java | 17 +++--
.../io/hadoop/HoodieAvroParquetReaderIterator.java | 9 ++-
.../org/apache/hudi/cdc/CDCFileGroupIterator.scala | 3 +-
.../TestPositionBasedFileGroupRecordBuffer.java | 4 +-
22 files changed, 273 insertions(+), 113 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
index fa3731c2c57a..e3523b75ce67 100644
---
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
+++
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
@@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.metadata.FileMetaData
import org.apache.spark.sql.HoodieSchemaUtils
import
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, Attribute,
Cast, CreateNamedStruct, CreateStruct, Expression, GetStructField,
LambdaFunction, Literal, MapEntries, MapFromEntries, NamedLambdaVariable,
UnsafeProjection}
-import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, FloatType,
MapType, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType,
DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField,
StructType}
object HoodieParquetFileFormatHelper {
@@ -104,18 +104,24 @@ object HoodieParquetFileFormatHelper {
requiredSchema: StructType,
partitionSchema: StructType,
schemaUtils: HoodieSchemaUtils):
UnsafeProjection = {
- val floatToDoubleCache = scala.collection.mutable.HashMap.empty[(DataType,
DataType), Boolean]
+ val addedCastCache = scala.collection.mutable.HashMap.empty[(DataType,
DataType), Boolean]
- def hasFloatToDoubleConversion(src: DataType, dst: DataType): Boolean = {
- floatToDoubleCache.getOrElseUpdate((src, dst), {
+ def hasUnsupportedConversion(src: DataType, dst: DataType): Boolean = {
+ addedCastCache.getOrElseUpdate((src, dst), {
(src, dst) match {
case (FloatType, DoubleType) => true
+ case (IntegerType, DecimalType()) => true
+ case (LongType, DecimalType()) => true
+ case (FloatType, DecimalType()) => true
+ case (DoubleType, DecimalType()) => true
+ case (StringType, DecimalType()) => true
+ case (StringType, DateType) => true
case (StructType(srcFields), StructType(dstFields)) =>
- srcFields.zip(dstFields).exists { case (sf, df) =>
hasFloatToDoubleConversion(sf.dataType, df.dataType) }
+ srcFields.zip(dstFields).exists { case (sf, df) =>
hasUnsupportedConversion(sf.dataType, df.dataType) }
case (ArrayType(sElem, _), ArrayType(dElem, _)) =>
- hasFloatToDoubleConversion(sElem, dElem)
+ hasUnsupportedConversion(sElem, dElem)
case (MapType(sKey, sVal, _), MapType(dKey, dVal, _)) =>
- hasFloatToDoubleConversion(sKey, dKey) ||
hasFloatToDoubleConversion(sVal, dVal)
+ hasUnsupportedConversion(sKey, dKey) ||
hasUnsupportedConversion(sVal, dVal)
case _ => false
}
})
@@ -127,7 +133,14 @@ object HoodieParquetFileFormatHelper {
case (FloatType, DoubleType) =>
val toStr = Cast(expr, StringType, if (needTimeZone) timeZoneId else
None)
Cast(toStr, dstType, if (needTimeZone) timeZoneId else None)
- case (s: StructType, d: StructType) if hasFloatToDoubleConversion(s,
d) =>
+ case (IntegerType | LongType | FloatType | DoubleType, dec:
DecimalType) =>
+ val toStr = Cast(expr, StringType, if (needTimeZone) timeZoneId else
None)
+ Cast(toStr, dec, if (needTimeZone) timeZoneId else None)
+ case (StringType, dec: DecimalType) =>
+ Cast(expr, dec, if (needTimeZone) timeZoneId else None)
+ case (StringType, DateType) =>
+ Cast(expr, DateType, if (needTimeZone) timeZoneId else None)
+ case (s: StructType, d: StructType) if hasUnsupportedConversion(s, d)
=>
val structFields = s.fields.zip(d.fields).zipWithIndex.map {
case ((srcField, dstField), i) =>
val child = GetStructField(expr, i, Some(dstField.name))
@@ -136,13 +149,13 @@ object HoodieParquetFileFormatHelper {
CreateNamedStruct(d.fields.zip(structFields).flatMap {
case (f, c) => Seq(Literal(f.name), c)
})
- case (ArrayType(sElementType, containsNull), ArrayType(dElementType,
_)) if hasFloatToDoubleConversion(sElementType, dElementType) =>
+ case (ArrayType(sElementType, containsNull), ArrayType(dElementType,
_)) if hasUnsupportedConversion(sElementType, dElementType) =>
val lambdaVar = NamedLambdaVariable("element", sElementType,
containsNull)
val body = recursivelyCastExpressions(lambdaVar, sElementType,
dElementType)
val func = LambdaFunction(body, Seq(lambdaVar))
ArrayTransform(expr, func)
case (MapType(sKeyType, sValType, vnull), MapType(dKeyType, dValType,
_))
- if hasFloatToDoubleConversion(sKeyType, dKeyType) ||
hasFloatToDoubleConversion(sValType, dValType) =>
+ if hasUnsupportedConversion(sKeyType, dKeyType) ||
hasUnsupportedConversion(sValType, dValType) =>
val kv = NamedLambdaVariable("kv", new StructType()
.add("key", sKeyType, nullable = false)
.add("value", sValType, nullable = vnull), nullable = false)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index d966e9ffa788..fb17d8302236 100644
---
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -23,7 +23,9 @@ import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.OverwriteWithLatestMerger;
@@ -63,6 +65,7 @@ import static
org.apache.hudi.common.util.ValidationUtils.checkState;
*/
public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord> {
private final Map<StoragePath, HoodieAvroFileReader> reusableFileReaders;
+ private final boolean isMultiFormat;
/**
* Constructs an instance of the reader context that will read data into
Avro records.
@@ -124,6 +127,7 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
boolean requiresPayloadRecords) {
super(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, new
AvroRecordContext(tableConfig, payloadClassName, requiresPayloadRecords));
this.reusableFileReaders = reusableFileReaders;
+ this.isMultiFormat = tableConfig.isMultipleBaseFileFormatsEnabled();
}
@Override
@@ -135,15 +139,27 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
Schema requiredSchema,
HoodieStorage storage) throws IOException {
HoodieAvroFileReader reader;
+ boolean isLogFile = FSUtils.isLogFile(filePath);
+ Schema fileOutputSchema;
+ Map<String, String> renamedColumns;
+ if (isLogFile) {
+ fileOutputSchema = requiredSchema;
+ renamedColumns = Collections.emptyMap();
+ } else {
+ Pair<Schema, Map<String, String>> requiredSchemaForFileAndRenamedColumns
= getSchemaHandler().getRequiredSchemaForFileAndRenamedColumns(filePath);
+ fileOutputSchema = requiredSchemaForFileAndRenamedColumns.getLeft();
+ renamedColumns = requiredSchemaForFileAndRenamedColumns.getRight();
+ }
if (reusableFileReaders.containsKey(filePath)) {
reader = reusableFileReaders.get(filePath);
} else {
+ HoodieFileFormat fileFormat = isMultiFormat && !isLogFile ?
HoodieFileFormat.fromFileExtension(filePath.getFileExtension()) :
baseFileFormat;
reader = (HoodieAvroFileReader) HoodieIOFactory.getIOFactory(storage)
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new
HoodieConfig(),
- filePath, baseFileFormat, Option.empty());
+ filePath, fileFormat, Option.empty());
}
if (keyFilterOpt.isEmpty()) {
- return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
+ return reader.getIndexedRecordIterator(dataSchema, fileOutputSchema,
renamedColumns);
}
if (reader.supportKeyPredicate()) {
List<String> keys = reader.extractKeys(keyFilterOpt);
@@ -157,7 +173,7 @@ public class HoodieAvroReaderContext extends
HoodieReaderContext<IndexedRecord>
return reader.getIndexedRecordsByKeyPrefixIterator(keyPrefixes,
requiredSchema);
}
}
- return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
+ return reader.getIndexedRecordIterator(dataSchema, fileOutputSchema,
renamedColumns);
}
@Override
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 0de9d4dd62f2..c74c6a698831 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -1406,8 +1406,15 @@ public class HoodieAvroUtils {
if (writerSchema.equals(readerSchema)) {
return false;
}
+ if (readerSchema.getLogicalType() != null) {
+ // check if logical types are equal
+ return
!readerSchema.getLogicalType().equals(writerSchema.getLogicalType());
+ }
switch (readerSchema.getType()) {
case RECORD:
+ if (readerSchema.getFields().size() > writerSchema.getFields().size())
{
+ return true;
+ }
for (Schema.Field field : readerSchema.getFields()) {
Schema.Field writerField = writerSchema.getField(field.name());
if (writerField == null ||
recordNeedsRewriteForExtendedAvroTypePromotion(writerField.schema(),
field.schema())) {
@@ -1430,13 +1437,13 @@ public class HoodieAvroUtils {
case ENUM:
return needsRewriteToString(writerSchema, true);
case STRING:
- case BYTES:
return needsRewriteToString(writerSchema, false);
case DOUBLE:
- // To maintain precision, you need to convert Float -> String -> Double
- return writerSchema.getType().equals(Schema.Type.FLOAT);
+ case FLOAT:
+ case LONG:
+ return !(writerSchema.getType().equals(Schema.Type.INT) ||
writerSchema.getType().equals(Schema.Type.LONG));
default:
- return false;
+ return !writerSchema.getType().equals(readerSchema.getType());
}
}
@@ -1446,18 +1453,13 @@ public class HoodieAvroUtils {
* string so some intervention is needed
*/
private static boolean needsRewriteToString(Schema schema, boolean isEnum) {
- switch (schema.getType()) {
- case INT:
- case LONG:
- case FLOAT:
- case DOUBLE:
- case BYTES:
- return true;
- case ENUM:
- return !isEnum;
- default:
- return false;
+ if (schema.getLogicalType() != null) {
+ return true;
+ }
+ if (schema.getType() == Schema.Type.ENUM) {
+ return !isEnum;
}
+ return true;
}
/**
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
index 727077f4a52c..2404eb91fd0e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
@@ -24,16 +24,21 @@ import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import
org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer;
+import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
@@ -76,22 +81,24 @@ public class FileGroupReaderSchemaHandler<T> {
protected final TypedProperties properties;
private final DeleteContext deleteContext;
+ private final HoodieTableMetaClient metaClient;
public FileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
Schema tableSchema,
Schema requestedSchema,
Option<InternalSchema> internalSchemaOpt,
- HoodieTableConfig hoodieTableConfig,
- TypedProperties properties) {
+ TypedProperties properties,
+ HoodieTableMetaClient metaClient) {
this.properties = properties;
this.readerContext = readerContext;
this.tableSchema = tableSchema;
this.requestedSchema = AvroSchemaCache.intern(requestedSchema);
- this.hoodieTableConfig = hoodieTableConfig;
+ this.hoodieTableConfig = metaClient.getTableConfig();
this.deleteContext = new DeleteContext(properties, tableSchema);
this.requiredSchema =
AvroSchemaCache.intern(prepareRequiredSchema(this.deleteContext));
this.internalSchema = pruneInternalSchema(requiredSchema,
internalSchemaOpt);
this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
+ this.metaClient = metaClient;
}
public Schema getTableSchema() {
@@ -125,6 +132,18 @@ public class FileGroupReaderSchemaHandler<T> {
return deleteContext;
}
+ public Pair<Schema, Map<String, String>>
getRequiredSchemaForFileAndRenamedColumns(StoragePath path) {
+ if (internalSchema.isEmptySchema()) {
+ return Pair.of(requiredSchema, Collections.emptyMap());
+ }
+ long commitInstantTime =
Long.parseLong(FSUtils.getCommitTime(path.getName()));
+ InternalSchema fileSchema =
InternalSchemaCache.searchSchemaAndCache(commitInstantTime, metaClient);
+ Pair<InternalSchema, Map<String, String>> mergedInternalSchema = new
InternalSchemaMerger(fileSchema, internalSchema,
+ true, false, false).mergeSchemaGetRenamed();
+ Schema mergedAvroSchema =
AvroSchemaCache.intern(AvroInternalSchemaConverter.convert(mergedInternalSchema.getLeft(),
requiredSchema.getFullName()));
+ return Pair.of(mergedAvroSchema, mergedInternalSchema.getRight());
+ }
+
private InternalSchema pruneInternalSchema(Schema requiredSchema,
Option<InternalSchema> internalSchemaOption) {
if (!internalSchemaOption.isPresent()) {
return InternalSchema.getEmptyInternalSchema();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 54b8d2f63152..1a7294854f18 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -116,8 +116,8 @@ public final class HoodieFileGroupReader<T> implements
Closeable {
readerContext.setShouldMergeUseRecordPosition(readerParameters.useRecordPosition()
&& !isSkipMerge && readerContext.getHasLogFiles() &&
inputSplit.isParquetBaseFile());
readerContext.setHasBootstrapBaseFile(inputSplit.getBaseFileOption().flatMap(HoodieBaseFile::getBootstrapBaseFile).isPresent());
readerContext.setSchemaHandler(readerContext.getRecordContext().supportsParquetRowIndex()
- ? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, tableConfig, props)
- : new FileGroupReaderSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, tableConfig, props));
+ ? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, props, metaClient)
+ : new FileGroupReaderSchemaHandler<>(readerContext, dataSchema,
requestedSchema, internalSchemaOpt, props, metaClient));
this.outputConverter =
readerContext.getSchemaHandler().getOutputConverter();
this.orderingFieldNames =
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), props,
hoodieTableMetaClient);
this.readStats = new HoodieReadStats();
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/ParquetRowIndexBasedSchemaHandler.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/ParquetRowIndexBasedSchemaHandler.java
index c365e09cab29..d0be7534884e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/ParquetRowIndexBasedSchemaHandler.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/ParquetRowIndexBasedSchemaHandler.java
@@ -21,7 +21,7 @@ package org.apache.hudi.common.table.read;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
@@ -46,9 +46,9 @@ public class ParquetRowIndexBasedSchemaHandler<T> extends
FileGroupReaderSchemaH
Schema dataSchema,
Schema requestedSchema,
Option<InternalSchema>
internalSchemaOpt,
- HoodieTableConfig hoodieTableConfig,
- TypedProperties properties) {
- super(readerContext, dataSchema, requestedSchema, internalSchemaOpt,
hoodieTableConfig, properties);
+ TypedProperties properties,
+ HoodieTableMetaClient metaClient) {
+ super(readerContext, dataSchema, requestedSchema, internalSchemaOpt,
properties, metaClient);
if (!readerContext.getRecordContext().supportsParquetRowIndex()) {
throw new IllegalStateException("Using " + this.getClass().getName() + "
but context does not support parquet row index");
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
index bb79087e97e0..e84090a54182 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
@@ -29,7 +29,9 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import java.io.IOException;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
@@ -48,7 +50,11 @@ public abstract class HoodieAvroFileReader implements
HoodieFileReader<IndexedRe
return getIndexedRecordIterator(readerSchema, readerSchema);
}
- public abstract ClosableIterator<IndexedRecord>
getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws
IOException;
+ public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema
readerSchema, Schema requestedSchema) throws IOException {
+ return getIndexedRecordIterator(readerSchema, requestedSchema,
Collections.emptyMap());
+ }
+
+ public abstract ClosableIterator<IndexedRecord>
getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema,
Map<String, String> renamedColumns) throws IOException;
public abstract ClosableIterator<IndexedRecord>
getIndexedRecordsByKeysIterator(List<String> keys,
Schema readerSchema)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
index a5e339b83da9..48aae71fd0c9 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
@@ -96,9 +96,10 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
@Override
public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema
readerSchema,
- Schema
requestedSchema)
+ Schema
requestedSchema,
+ Map<String,
String> renamedColumns)
throws IOException {
- if (!Objects.equals(readerSchema, requestedSchema)) {
+ if (!Objects.equals(readerSchema, requestedSchema) ||
!renamedColumns.isEmpty()) {
throw new UnsupportedOperationException(
"Schema projections are not supported in HFile reader");
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 2f0d76043de3..e8b009e3a109 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -561,7 +561,8 @@ public class HoodieBackedTableMetadata extends
BaseTableMetadata {
readerContext.initRecordMerger(metadataConfig.getProps());
readerContext.setHasBootstrapBaseFile(false);
readerContext.setHasLogFiles(fileSlice.hasLogFiles());
- readerContext.setSchemaHandler(new
FileGroupReaderSchemaHandler<>(readerContext, SCHEMA, SCHEMA, Option.empty(),
metadataMetaClient.getTableConfig(), metadataConfig.getProps()));
+ readerContext.setSchemaHandler(new
FileGroupReaderSchemaHandler<>(readerContext, SCHEMA, SCHEMA, Option.empty(),
+ metadataConfig.getProps(), metadataMetaClient));
readerContext.setShouldMergeUseRecordPosition(false);
readerContext.setLatestCommitTime(latestMetadataInstantTime);
return FileGroupRecordBufferLoader.createReusable(readerContext);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 4bfeee6d8704..230eb954507b 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1044,7 +1044,7 @@ public class HoodieTableMetadataUtil {
readerContext.setHasLogFiles(true);
HoodieTableConfig tableConfig = datasetMetaClient.getTableConfig();
readerContext.initRecordMerger(properties);
- readerContext.setSchemaHandler(new
FileGroupReaderSchemaHandler<>(readerContext, writerSchemaOpt.get(),
writerSchemaOpt.get(), Option.empty(), tableConfig, properties));
+ readerContext.setSchemaHandler(new
FileGroupReaderSchemaHandler<>(readerContext, writerSchemaOpt.get(),
writerSchemaOpt.get(), Option.empty(), properties, datasetMetaClient));
HoodieReadStats readStats = new HoodieReadStats();
KeyBasedFileGroupRecordBuffer<T> recordBuffer = new
KeyBasedFileGroupRecordBuffer<>(readerContext, datasetMetaClient,
readerContext.getMergeMode(), PartialUpdateMode.NONE, properties,
tableConfig.getPreCombineFields(),
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index f7823cef0cdd..cdb69431927d 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -1131,4 +1131,53 @@ public class TestHoodieAvroUtils {
assertEquals(expectedSchema, projectedSchema);
assertTrue(AvroSchemaUtils.isSchemaCompatible(projectedSchema,
expectedSchema, false));
}
+
+ private static Stream<Arguments>
recordNeedsRewriteForExtendedAvroTypePromotion() {
+ Schema decimal1 = LogicalTypes.decimal(12,
2).addToSchema(Schema.create(Schema.Type.BYTES));
+ Schema decimal2 = LogicalTypes.decimal(10,
2).addToSchema(Schema.create(Schema.Type.BYTES));
+ Schema dateSchema =
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+ Schema doubleSchema = Schema.create(Schema.Type.DOUBLE);
+ Schema intSchema = Schema.create(Schema.Type.INT);
+ Schema longSchema = Schema.create(Schema.Type.LONG);
+ Schema floatSchema = Schema.create(Schema.Type.FLOAT);
+ Schema stringSchema = Schema.create(Schema.Type.STRING);
+
+ Schema recordSchema1 = Schema.createRecord("record1", null, "com.example",
false,
+ Arrays.asList(new Schema.Field("decimalField", decimal1, null, null),
+ new Schema.Field("doubleField", doubleSchema, null, null)));
+
+ Schema recordSchema2 = Schema.createRecord("record2", null,
"com.example2", false,
+ Arrays.asList(new Schema.Field("decimalField", decimal1, null, null),
+ new Schema.Field("doubleField", doubleSchema, null, null)));
+
+ return Stream.of(
+ Arguments.of(intSchema, longSchema, false),
+ Arguments.of(intSchema, floatSchema, false),
+ Arguments.of(longSchema, intSchema, true),
+ Arguments.of(longSchema, floatSchema, false),
+ Arguments.of(decimal1, decimal2, true),
+ Arguments.of(doubleSchema, decimal1, true),
+ Arguments.of(decimal1, doubleSchema, true),
+ Arguments.of(intSchema, stringSchema, true),
+ Arguments.of(longSchema, doubleSchema, false),
+ Arguments.of(intSchema, doubleSchema, false),
+ Arguments.of(longSchema, stringSchema, true),
+ Arguments.of(floatSchema, stringSchema, true),
+ Arguments.of(doubleSchema, stringSchema, true),
+ Arguments.of(decimal1, stringSchema, true),
+ Arguments.of(stringSchema, decimal2, true),
+ Arguments.of(stringSchema, intSchema, true),
+ Arguments.of(floatSchema, doubleSchema, true),
+ Arguments.of(doubleSchema, floatSchema, true),
+ Arguments.of(recordSchema1, recordSchema2, false),
+ Arguments.of(dateSchema, stringSchema, true)
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource
+ void recordNeedsRewriteForExtendedAvroTypePromotion(Schema writerSchema,
Schema readerSchema, boolean expected) {
+ boolean result =
HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema,
readerSchema);
+ assertEquals(expected, result);
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
index 85813f59f1bc..bed30d3a2538 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.engine.RecordContext;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.DefaultJavaTypeConverter;
@@ -38,6 +39,7 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.provider.Arguments;
import java.io.IOException;
@@ -67,6 +69,13 @@ public abstract class SchemaHandlerTestBase {
.map(Schema.Field::name).filter(f ->
!f.equals("_hoodie_is_deleted")).toArray(String[]::new));
protected static final Schema DATA_COLS_ONLY_SCHEMA =
generateProjectionSchema("begin_lat", "tip_history", "rider");
protected static final Schema META_COLS_ONLY_SCHEMA =
generateProjectionSchema("_hoodie_commit_seqno", "_hoodie_record_key");
+ protected final HoodieTableMetaClient metaClient =
mock(HoodieTableMetaClient.class);
+ protected final HoodieTableConfig hoodieTableConfig =
mock(HoodieTableConfig.class);
+
+ @BeforeEach
+ void setup() {
+ when(metaClient.getTableConfig()).thenReturn(hoodieTableConfig);
+ }
static Stream<Arguments> testMorParams(boolean supportsParquetRowIndex) {
Stream.Builder<Arguments> b = Stream.builder();
@@ -89,14 +98,13 @@ public abstract class SchemaHandlerTestBase {
boolean supportsParquetRowIndex,
boolean hasBuiltInDelete) throws IOException {
Schema dataSchema = hasBuiltInDelete ? DATA_SCHEMA : DATA_SCHEMA_NO_DELETE;
- HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class);
setupMORTable(mergeMode, hasPrecombine, hoodieTableConfig);
HoodieRecordMerger merger = mockRecordMerger(isProjectionCompatible,
isProjectionCompatible ? new String[] {"begin_lat", "begin_lon",
"_hoodie_record_key", "timestamp"} : new String[] {"begin_lat", "begin_lon",
"timestamp"});
HoodieReaderContext<String> readerContext =
createReaderContext(hoodieTableConfig, supportsParquetRowIndex, true, false,
mergeUseRecordPosition, merger);
readerContext.setRecordMerger(Option.of(merger));
Schema requestedSchema = dataSchema;
- FileGroupReaderSchemaHandler schemaHandler =
createSchemaHandler(readerContext, dataSchema, requestedSchema,
hoodieTableConfig, supportsParquetRowIndex);
+ FileGroupReaderSchemaHandler schemaHandler =
createSchemaHandler(readerContext, dataSchema, requestedSchema,
supportsParquetRowIndex);
Schema expectedRequiredFullSchema = supportsParquetRowIndex &&
mergeUseRecordPosition
?
ParquetRowIndexBasedSchemaHandler.addPositionalMergeCol(requestedSchema)
: requestedSchema;
@@ -105,7 +113,7 @@ public abstract class SchemaHandlerTestBase {
//read subset of columns
requestedSchema = DATA_COLS_ONLY_SCHEMA;
- schemaHandler = createSchemaHandler(readerContext, dataSchema,
requestedSchema, hoodieTableConfig, supportsParquetRowIndex);
+ schemaHandler = createSchemaHandler(readerContext, dataSchema,
requestedSchema, supportsParquetRowIndex);
Schema expectedRequiredSchema;
if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) {
expectedRequiredSchema = generateProjectionSchema(hasBuiltInDelete,
"begin_lat", "tip_history", "rider", "_hoodie_record_key", "timestamp");
@@ -130,13 +138,12 @@ public abstract class SchemaHandlerTestBase {
boolean supportsParquetRowIndex,
boolean hasBuiltInDelete) throws IOException {
Schema dataSchema = hasBuiltInDelete ? DATA_SCHEMA : DATA_SCHEMA_NO_DELETE;
- HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class);
setupMORTable(mergeMode, hasPrecombine, hoodieTableConfig);
HoodieRecordMerger merger = mockRecordMerger(isProjectionCompatible, new
String[] {"begin_lat", "begin_lon", "timestamp"});
HoodieReaderContext<String> readerContext =
createReaderContext(hoodieTableConfig, supportsParquetRowIndex, true, true,
mergeUseRecordPosition, merger);
readerContext.setRecordMerger(Option.of(merger));
Schema requestedSchema = dataSchema;
- FileGroupReaderSchemaHandler schemaHandler =
createSchemaHandler(readerContext, dataSchema, requestedSchema,
hoodieTableConfig, supportsParquetRowIndex);
+ FileGroupReaderSchemaHandler schemaHandler =
createSchemaHandler(readerContext, dataSchema, requestedSchema,
supportsParquetRowIndex);
Schema expectedRequiredFullSchema = supportsParquetRowIndex &&
mergeUseRecordPosition
?
ParquetRowIndexBasedSchemaHandler.addPositionalMergeCol(requestedSchema)
: requestedSchema;
@@ -153,7 +160,7 @@ public abstract class SchemaHandlerTestBase {
//read subset of columns
requestedSchema = generateProjectionSchema("begin_lat", "tip_history",
"_hoodie_record_key", "rider");
- schemaHandler = createSchemaHandler(readerContext, dataSchema,
requestedSchema, hoodieTableConfig, supportsParquetRowIndex);
+ schemaHandler = createSchemaHandler(readerContext, dataSchema,
requestedSchema, supportsParquetRowIndex);
Schema expectedRequiredSchema;
if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) {
expectedRequiredSchema = generateProjectionSchema(hasBuiltInDelete,
"_hoodie_record_key", "begin_lat", "tip_history", "rider", "timestamp");
@@ -180,7 +187,7 @@ public abstract class SchemaHandlerTestBase {
// request only data cols
requestedSchema = DATA_COLS_ONLY_SCHEMA;
- schemaHandler = createSchemaHandler(readerContext, dataSchema,
requestedSchema, hoodieTableConfig, supportsParquetRowIndex);
+ schemaHandler = createSchemaHandler(readerContext, dataSchema,
requestedSchema, supportsParquetRowIndex);
if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) {
expectedRequiredSchema = generateProjectionSchema(hasBuiltInDelete,
"_hoodie_record_key", "begin_lat", "tip_history", "rider", "timestamp");
assertTrue(readerContext.getNeedsBootstrapMerge());
@@ -215,7 +222,7 @@ public abstract class SchemaHandlerTestBase {
// request only meta cols
requestedSchema = META_COLS_ONLY_SCHEMA;
- schemaHandler = createSchemaHandler(readerContext, dataSchema,
requestedSchema, hoodieTableConfig, supportsParquetRowIndex);
+ schemaHandler = createSchemaHandler(readerContext, dataSchema,
requestedSchema, supportsParquetRowIndex);
if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) {
expectedRequiredSchema = generateProjectionSchema(hasBuiltInDelete,
"_hoodie_commit_seqno", "_hoodie_record_key", "timestamp");
assertTrue(readerContext.getNeedsBootstrapMerge());
@@ -283,8 +290,7 @@ public abstract class SchemaHandlerTestBase {
}
abstract FileGroupReaderSchemaHandler
createSchemaHandler(HoodieReaderContext<String> readerContext, Schema
dataSchema,
- Schema
requestedSchema, HoodieTableConfig hoodieTableConfig,
- boolean
supportsParquetRowIndex);
+ Schema
requestedSchema, boolean supportsParquetRowIndex);
static Schema generateProjectionSchema(String... fields) {
return HoodieAvroUtils.generateProjectionSchema(DATA_SCHEMA,
Arrays.asList(fields));
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
index 39ff90cc05c5..2da196905f92 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
@@ -22,17 +22,23 @@ package org.apache.hudi.common.table.read;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
-import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.InternalSchemaCache;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
import org.junit.jupiter.api.Test;
@@ -40,12 +46,17 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
@@ -60,29 +71,27 @@ public class TestFileGroupReaderSchemaHandler extends
SchemaHandlerTestBase {
@Test
public void testCow() {
- HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class);
when(hoodieTableConfig.populateMetaFields()).thenReturn(true);
HoodieReaderContext<String> readerContext =
createReaderContext(hoodieTableConfig, false, false, false, false, null);
Schema requestedSchema = DATA_SCHEMA;
- FileGroupReaderSchemaHandler schemaHandler =
createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema,
hoodieTableConfig, false);
+ FileGroupReaderSchemaHandler schemaHandler =
createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, false);
assertEquals(requestedSchema, schemaHandler.getRequiredSchema());
//read subset of columns
requestedSchema = generateProjectionSchema("begin_lat", "tip_history",
"rider");
- schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA,
requestedSchema, hoodieTableConfig, false);
+ schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA,
requestedSchema, false);
assertEquals(requestedSchema, schemaHandler.getRequiredSchema());
assertFalse(readerContext.getNeedsBootstrapMerge());
}
@Test
public void testCowBootstrap() {
- HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class);
when(hoodieTableConfig.populateMetaFields()).thenReturn(true);
HoodieReaderContext<String> readerContext =
createReaderContext(hoodieTableConfig, false, false, true, false, null);
Schema requestedSchema = generateProjectionSchema("begin_lat",
"tip_history", "_hoodie_record_key", "rider");
//meta cols must go first in the required schema
- FileGroupReaderSchemaHandler schemaHandler =
createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema,
hoodieTableConfig, false);
+ FileGroupReaderSchemaHandler schemaHandler =
createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, false);
assertTrue(readerContext.getNeedsBootstrapMerge());
Schema expectedRequiredSchema =
generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history",
"rider");
assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema());
@@ -91,6 +100,34 @@ public class TestFileGroupReaderSchemaHandler extends
SchemaHandlerTestBase {
assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"),
getField("rider")), bootstrapFields.getRight());
}
+ @Test
+ void testGetRequiredSchemaForFileAndRenameColumns() {
+ when(hoodieTableConfig.populateMetaFields()).thenReturn(true);
+ HoodieReaderContext<String> readerContext =
createReaderContext(hoodieTableConfig, false, false, true, false, null);
+ Schema requestedSchema = generateProjectionSchema("_hoodie_record_key",
"timestamp", "rider");
+
+ InternalSchema internalSchema =
AvroInternalSchemaConverter.convert(DATA_SCHEMA);
+ InternalSchema originalSchema = new
InternalSchema(Types.RecordType.get(internalSchema.columns().stream().map(field
-> {
+ if (field.name().equals("timestamp")) {
+ // rename timestamp to ts in file schema and change type to int,
output schema names and types must match the requested schema
+ return Types.Field.get(field.fieldId(), "ts", Types.IntType.get());
+ }
+ return field;
+ }).collect(Collectors.toList())));
+ FileGroupReaderSchemaHandler<String> schemaHandler = new
FileGroupReaderSchemaHandler<>(readerContext, DATA_SCHEMA, requestedSchema,
+ Option.of(internalSchema), new TypedProperties(), metaClient);
+
+ try (MockedStatic<InternalSchemaCache> mockedStatic =
Mockito.mockStatic(InternalSchemaCache.class)) {
+ String instantTime = "20231010101010";
+ mockedStatic.when(() ->
InternalSchemaCache.searchSchemaAndCache(Long.parseLong(instantTime),
metaClient))
+ .thenReturn(originalSchema);
+ StoragePath filePath = new StoragePath("/2023-01-01/" +
FSUtils.makeBaseFileName(instantTime, "1-0-1", UUID.randomUUID().toString(),
HoodieFileFormat.PARQUET.getFileExtension()));
+ Pair<Schema, Map<String, String>> requiredSchemaAndRenamedFields =
schemaHandler.getRequiredSchemaForFileAndRenamedColumns(filePath);
+ assertEquals(Collections.singletonMap("timestamp", "ts"),
requiredSchemaAndRenamedFields.getRight());
+ assertEquals(requestedSchema, requiredSchemaAndRenamedFields.getLeft());
+ }
+ }
+
private static Stream<Arguments> testMorParams() {
return testMorParams(false);
}
@@ -118,10 +155,10 @@ public class TestFileGroupReaderSchemaHandler extends
SchemaHandlerTestBase {
}
@Override
- FileGroupReaderSchemaHandler createSchemaHandler(HoodieReaderContext<String>
readerContext, Schema dataSchema, Schema requestedSchema, HoodieTableConfig
hoodieTableConfig,
+ FileGroupReaderSchemaHandler createSchemaHandler(HoodieReaderContext<String>
readerContext, Schema dataSchema, Schema requestedSchema,
boolean
supportsParquetRowIndex) {
return new FileGroupReaderSchemaHandler(readerContext, dataSchema,
requestedSchema,
- Option.empty(), hoodieTableConfig, new TypedProperties());
+ Option.empty(), new TypedProperties(), metaClient);
}
@ParameterizedTest
@@ -189,23 +226,22 @@ public class TestFileGroupReaderSchemaHandler extends
SchemaHandlerTestBase {
Schema dataSchema = SchemaTestUtil.getSchemaFromFields(dataSchemaFields);
Schema requestedSchema =
SchemaTestUtil.getSchemaFromFields(Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD,
HoodieRecord.PARTITION_PATH_METADATA_FIELD));
- HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
- when(tableConfig.getRecordMergeMode()).thenReturn(mergeMode);
- when(tableConfig.populateMetaFields()).thenReturn(true);
-
when(tableConfig.getPreCombineFieldsStr()).thenReturn(Option.of(setPrecombine ?
preCombineField : StringUtils.EMPTY_STRING));
- when(tableConfig.getPreCombineFields()).thenReturn(setPrecombine ?
Collections.singletonList(preCombineField) : Collections.emptyList());
- when(tableConfig.getTableVersion()).thenReturn(tableVersion);
- if (tableConfig.getTableVersion() == HoodieTableVersion.SIX) {
+ when(hoodieTableConfig.getRecordMergeMode()).thenReturn(mergeMode);
+ when(hoodieTableConfig.populateMetaFields()).thenReturn(true);
+
when(hoodieTableConfig.getPreCombineFieldsStr()).thenReturn(Option.of(setPrecombine
? preCombineField : StringUtils.EMPTY_STRING));
+ when(hoodieTableConfig.getPreCombineFields()).thenReturn(setPrecombine ?
Collections.singletonList(preCombineField) : Collections.emptyList());
+ when(hoodieTableConfig.getTableVersion()).thenReturn(tableVersion);
+ if (hoodieTableConfig.getTableVersion() == HoodieTableVersion.SIX) {
if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
-
when(tableConfig.getPayloadClass()).thenReturn(DefaultHoodieRecordPayload.class.getName());
+
when(hoodieTableConfig.getPayloadClass()).thenReturn(DefaultHoodieRecordPayload.class.getName());
} else if (mergeMode == RecordMergeMode.COMMIT_TIME_ORDERING) {
-
when(tableConfig.getPayloadClass()).thenReturn(OverwriteWithLatestAvroPayload.class.getName());
+
when(hoodieTableConfig.getPayloadClass()).thenReturn(OverwriteWithLatestAvroPayload.class.getName());
} else {
-
when(tableConfig.getPayloadClass()).thenReturn(OverwriteNonDefaultsWithLatestAvroPayload.class.getName());
+
when(hoodieTableConfig.getPayloadClass()).thenReturn(OverwriteNonDefaultsWithLatestAvroPayload.class.getName());
}
}
if (mergeMode != null) {
- when(tableConfig.getRecordMergeStrategyId()).thenReturn(mergeStrategyId);
+
when(hoodieTableConfig.getRecordMergeStrategyId()).thenReturn(mergeStrategyId);
}
TypedProperties props = new TypedProperties();
@@ -227,7 +263,7 @@ public class TestFileGroupReaderSchemaHandler extends
SchemaHandlerTestBase {
expectedFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
}
Schema expectedSchema = ((mergeMode == RecordMergeMode.CUSTOM) &&
!isProjectionCompatible) ? dataSchema :
SchemaTestUtil.getSchemaFromFields(expectedFields);
- when(recordMerger.getMandatoryFieldsForMerging(dataSchema, tableConfig,
props)).thenReturn(expectedFields.toArray(new String[0]));
+ when(recordMerger.getMandatoryFieldsForMerging(dataSchema,
hoodieTableConfig, props)).thenReturn(expectedFields.toArray(new String[0]));
DeleteContext deleteContext = new DeleteContext(props, dataSchema);
assertEquals(addHoodieIsDeleted, deleteContext.hasBuiltInDeleteField());
@@ -235,7 +271,7 @@ public class TestFileGroupReaderSchemaHandler extends
SchemaHandlerTestBase {
? Option.of(Pair.of(customDeleteKey, customDeleteValue)) :
Option.empty(),
deleteContext.getCustomDeleteMarkerKeyValue());
FileGroupReaderSchemaHandler fileGroupReaderSchemaHandler = new
FileGroupReaderSchemaHandler(readerContext,
- dataSchema, requestedSchema, Option.empty(), tableConfig, props);
+ dataSchema, requestedSchema, Option.empty(), props, metaClient);
Schema actualSchema =
fileGroupReaderSchemaHandler.generateRequiredSchema(deleteContext);
assertEquals(expectedSchema, actualSchema);
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
index 22b28c7acf40..865026f1a160 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
@@ -22,7 +22,6 @@ package org.apache.hudi.common.table.read;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
@@ -41,18 +40,16 @@ import static
org.apache.hudi.common.table.read.ParquetRowIndexBasedSchemaHandle
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestParquetRowIndexBasedSchemaHandler extends
SchemaHandlerTestBase {
@Test
public void testCowBootstrapWithPositionMerge() {
- HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class);
when(hoodieTableConfig.populateMetaFields()).thenReturn(true);
HoodieReaderContext<String> readerContext =
createReaderContext(hoodieTableConfig, true, false, true, false, null);
Schema requestedSchema = generateProjectionSchema("begin_lat",
"tip_history", "_hoodie_record_key", "rider");
- FileGroupReaderSchemaHandler schemaHandler =
createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema,
hoodieTableConfig, true);
+ FileGroupReaderSchemaHandler schemaHandler =
createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, true);
assertTrue(readerContext.getNeedsBootstrapMerge());
//meta cols must go first in the required schema
Schema expectedRequiredSchema =
generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history",
"rider");
@@ -61,14 +58,14 @@ public class TestParquetRowIndexBasedSchemaHandler extends
SchemaHandlerTestBase
assertEquals(Arrays.asList(getField("_hoodie_record_key"),
getPositionalMergeField()), bootstrapFields.getLeft());
assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"),
getField("rider"), getPositionalMergeField()), bootstrapFields.getRight());
- schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA,
DATA_COLS_ONLY_SCHEMA, hoodieTableConfig, true);
+ schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA,
DATA_COLS_ONLY_SCHEMA, true);
assertFalse(readerContext.getNeedsBootstrapMerge());
assertEquals(DATA_COLS_ONLY_SCHEMA, schemaHandler.getRequiredSchema());
bootstrapFields = schemaHandler.getBootstrapRequiredFields();
assertTrue(bootstrapFields.getLeft().isEmpty());
assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"),
getField("rider")), bootstrapFields.getRight());
- schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA,
META_COLS_ONLY_SCHEMA, hoodieTableConfig, true);
+ schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA,
META_COLS_ONLY_SCHEMA, true);
assertFalse(readerContext.getNeedsBootstrapMerge());
assertEquals(META_COLS_ONLY_SCHEMA, schemaHandler.getRequiredSchema());
bootstrapFields = schemaHandler.getBootstrapRequiredFields();
@@ -103,9 +100,9 @@ public class TestParquetRowIndexBasedSchemaHandler extends
SchemaHandlerTestBase
}
@Override
- FileGroupReaderSchemaHandler createSchemaHandler(HoodieReaderContext<String>
readerContext, Schema dataSchema, Schema requestedSchema, HoodieTableConfig
hoodieTableConfig,
+ FileGroupReaderSchemaHandler createSchemaHandler(HoodieReaderContext<String>
readerContext, Schema dataSchema, Schema requestedSchema,
boolean
supportsParquetRowIndex) {
return new ParquetRowIndexBasedSchemaHandler(readerContext, dataSchema,
requestedSchema,
- Option.empty(), hoodieTableConfig, new TypedProperties());
+ Option.empty(), new TypedProperties(), metaClient);
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
index 50eaeb454df2..b4011a7205a5 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -153,8 +154,8 @@ class TestKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuffer {
readerContext.setHasLogFiles(false);
readerContext.setHasBootstrapBaseFile(false);
readerContext.initRecordMerger(properties);
- FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
- properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+ properties, mock(HoodieTableMetaClient.class));
readerContext.setSchemaHandler(schemaHandler);
List<HoodieRecord> inputRecords =
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate, testRecord7));
inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), false));
@@ -220,8 +221,8 @@ class TestKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuffer {
readerContext.setHasLogFiles(false);
readerContext.setHasBootstrapBaseFile(false);
readerContext.initRecordMerger(properties);
- FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
- properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+ properties, mock(HoodieTableMetaClient.class));
readerContext.setSchemaHandler(schemaHandler);
List<HoodieRecord> inputRecords =
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update,
testRecord4EarlierUpdate, testRecord7));
@@ -299,8 +300,8 @@ class TestKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuffer {
readerContext.setHasLogFiles(false);
readerContext.setHasBootstrapBaseFile(false);
readerContext.initRecordMerger(properties);
- FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
- properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+ properties, mock(HoodieTableMetaClient.class));
readerContext.setSchemaHandler(schemaHandler);
List<HoodieRecord> inputRecords =
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate));
inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), true));
@@ -374,8 +375,8 @@ class TestKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuffer {
readerContext.setHasLogFiles(false);
readerContext.setHasBootstrapBaseFile(false);
readerContext.initRecordMerger(properties);
- FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
- properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+ properties, mock(HoodieTableMetaClient.class));
readerContext.setSchemaHandler(schemaHandler);
List<HoodieRecord> inputRecords =
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate));
inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), true));
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
index 458979f1dfe0..13e8feba92c0 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
@@ -123,8 +123,8 @@ class TestSortedKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecordBuf
HoodieReaderContext<IndexedRecord> readerContext = new
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(),
Option.empty());
readerContext.setHasLogFiles(false);
readerContext.setHasBootstrapBaseFile(false);
- FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
- properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+ properties, mock(HoodieTableMetaClient.class));
readerContext.setSchemaHandler(schemaHandler);
readerContext.initRecordMerger(properties);
List<HoodieRecord> inputRecords =
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
index baf1140cfee2..8c7e5d65a530 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieAvroRecordMerger;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.PartialUpdateMode;
import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
@@ -83,8 +84,8 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecord
readerContext.setHasLogFiles(false);
readerContext.setHasBootstrapBaseFile(false);
readerContext.initRecordMerger(properties);
- FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
- properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+ properties, mock(HoodieTableMetaClient.class));
readerContext.setSchemaHandler(schemaHandler);
List<HoodieRecord> inputRecords =
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate, testRecord7));
inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), false));
@@ -120,8 +121,8 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecord
readerContext.setHasLogFiles(false);
readerContext.setHasBootstrapBaseFile(false);
readerContext.initRecordMerger(properties);
- FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
- properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+ properties, mock(HoodieTableMetaClient.class));
readerContext.setSchemaHandler(schemaHandler);
List<HoodieRecord> inputRecords =
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate, testRecord7));
inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), true));
@@ -159,8 +160,8 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecord
readerContext.setHasLogFiles(false);
readerContext.setHasBootstrapBaseFile(false);
readerContext.initRecordMerger(properties);
- FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
- properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+ properties, mock(HoodieTableMetaClient.class));
readerContext.setSchemaHandler(schemaHandler);
List<HoodieRecord> inputRecords =
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate));
inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), true));
@@ -197,8 +198,8 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends
BaseTestFileGroupRecord
readerContext.setHasLogFiles(false);
readerContext.setHasBootstrapBaseFile(false);
readerContext.initRecordMerger(properties);
- FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
tableConfig,
- properties);
+ FileGroupReaderSchemaHandler schemaHandler = new
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+ properties, mock(HoodieTableMetaClient.class));
readerContext.setSchemaHandler(schemaHandler);
List<HoodieRecord> inputRecords =
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime,
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate));
inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
testRecord6DeleteByCustomMarker), true));
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
index c7ddd16cb071..e805e22e0bff 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
@@ -47,6 +47,7 @@ import org.apache.orc.TypeDescription;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -82,7 +83,7 @@ public class HoodieAvroOrcReader extends HoodieAvroFileReader
{
}
@Override
- public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema
readerSchema, Schema requestedSchema) {
+ public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema
readerSchema, Schema requestedSchema, Map<String, String> renamedColumns) {
if (!Objects.equals(readerSchema, requestedSchema)) {
throw new UnsupportedOperationException("Schema projections are not
supported in HFile reader");
}
@@ -98,10 +99,10 @@ public class HoodieAvroOrcReader extends
HoodieAvroFileReader {
TypeDescription orcSchema =
AvroOrcUtils.createOrcSchema(prunedFileSchema);
RecordReader recordReader = reader.rows(new
Options(hadoopConf).schema(orcSchema));
ClosableIterator<IndexedRecord> recordIterator = new
OrcReaderIterator<>(recordReader, prunedFileSchema, orcSchema);
- if (readerSchema.equals(fileSchema)) {
+ if (renamedColumns.isEmpty() && readerSchema.equals(fileSchema)) {
return recordIterator;
} else {
- return new CloseableMappingIterator<>(recordIterator, data ->
HoodieAvroUtils.rewriteRecordWithNewSchema(data, requestedSchema));
+ return new CloseableMappingIterator<>(recordIterator, data ->
HoodieAvroUtils.rewriteRecordWithNewSchema(data, requestedSchema,
renamedColumns));
}
} catch (IOException io) {
throw new HoodieIOException("Unable to create an ORC reader.", io);
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
index 88ff9273196c..588100337397 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
@@ -49,7 +49,9 @@ import org.apache.parquet.hadoop.ParquetReader;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
@@ -100,12 +102,17 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
@Override
protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema
schema) throws IOException {
- return getIndexedRecordIteratorInternal(schema);
+ return getIndexedRecordIteratorInternal(schema, Collections.emptyMap());
}
@Override
public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema
readerSchema, Schema requestedSchema) throws IOException {
- return getIndexedRecordIteratorInternal(requestedSchema);
+ return getIndexedRecordIteratorInternal(requestedSchema,
Collections.emptyMap());
+ }
+
+ @Override
+ public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema
readerSchema, Schema requestedSchema, Map<String, String> renamedColumns)
throws IOException {
+ return getIndexedRecordIteratorInternal(requestedSchema, renamedColumns);
}
@Override
@@ -166,13 +173,13 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
return conf;
}
- private ClosableIterator<IndexedRecord>
getIndexedRecordIteratorInternal(Schema schema) throws IOException {
+ private ClosableIterator<IndexedRecord>
getIndexedRecordIteratorInternal(Schema schema, Map<String, String>
renamedColumns) throws IOException {
// NOTE: We have to set both Avro read-schema and projection schema to make
// sure that in case the file-schema is not equal to read-schema
we'd still
// be able to read that file (in case projection is a proper one)
Configuration hadoopConf =
storage.getConf().unwrapCopyAs(Configuration.class);
Option<Schema> promotedSchema = Option.empty();
- if
(HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(getSchema(),
schema)) {
+ if (!renamedColumns.isEmpty() ||
HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(getSchema(),
schema)) {
AvroReadSupport.setAvroReadSchema(hadoopConf, getSchema());
AvroReadSupport.setRequestedProjection(hadoopConf, getSchema());
promotedSchema = Option.of(schema);
@@ -186,7 +193,7 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
.set(ParquetInputFormat.STRICT_TYPE_CHECKING,
hadoopConf.get(ParquetInputFormat.STRICT_TYPE_CHECKING))
.build();
ParquetReaderIterator<IndexedRecord> parquetReaderIterator =
promotedSchema.isPresent()
- ? new HoodieAvroParquetReaderIterator(reader, promotedSchema.get())
+ ? new HoodieAvroParquetReaderIterator(reader, promotedSchema.get(),
renamedColumns)
: new ParquetReaderIterator<>(reader);
readerIterators.add(parquetReaderIterator);
return parquetReaderIterator;
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
index 2723f4d1900f..c0eaf16d08b5 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
@@ -26,15 +26,20 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.parquet.hadoop.ParquetReader;
+import java.util.Map;
+
public class HoodieAvroParquetReaderIterator extends
ParquetReaderIterator<IndexedRecord> {
private final Schema promotedSchema;
- public HoodieAvroParquetReaderIterator(ParquetReader<IndexedRecord>
parquetReader, Schema promotedSchema) {
+ private final Map<String, String> renamedColumns;
+
+ public HoodieAvroParquetReaderIterator(ParquetReader<IndexedRecord>
parquetReader, Schema promotedSchema, Map<String, String> renamedColumns) {
super(parquetReader);
this.promotedSchema = promotedSchema;
+ this.renamedColumns = renamedColumns;
}
@Override
public IndexedRecord next() {
- return HoodieAvroUtils.rewriteRecordWithNewSchema(super.next(),
this.promotedSchema);
+ return HoodieAvroUtils.rewriteRecordWithNewSchema(super.next(),
promotedSchema, renamedColumns);
}
}
diff --git
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index 18cfbf05fe9d..a9ce1a816302 100644
---
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -508,8 +508,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
readerContext.setHasBootstrapBaseFile(false)
readerContext.setHasLogFiles(true)
readerContext.setSchemaHandler(
- new FileGroupReaderSchemaHandler[InternalRow](readerContext, avroSchema,
avroSchema,
- Option.empty(), metaClient.getTableConfig, readerProperties))
+ new FileGroupReaderSchemaHandler[InternalRow](readerContext, avroSchema,
avroSchema, Option.empty(), readerProperties, metaClient))
val stats = new HoodieReadStats
keyBasedFileGroupRecordBuffer.ifPresent(k => k.close())
keyBasedFileGroupRecordBuffer = Option.of(new
KeyBasedFileGroupRecordBuffer[InternalRow](readerContext, metaClient,
readerContext.getMergeMode,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
index 7b2d825b55db..44c13eed32b9 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
@@ -139,8 +139,8 @@ public class TestPositionBasedFileGroupRecordBuffer extends
SparkClientFunctiona
ctx.setRecordMerger(Option.empty());
}
ctx.setSchemaHandler(HoodieSparkUtils.gteqSpark3_5()
- ? new ParquetRowIndexBasedSchemaHandler<>(ctx, avroSchema, avroSchema,
Option.empty(), metaClient.getTableConfig(), new TypedProperties())
- : new FileGroupReaderSchemaHandler<>(ctx, avroSchema, avroSchema,
Option.empty(), metaClient.getTableConfig(), new TypedProperties()));
+ ? new ParquetRowIndexBasedSchemaHandler<>(ctx, avroSchema, avroSchema,
Option.empty(), new TypedProperties(), metaClient)
+ : new FileGroupReaderSchemaHandler<>(ctx, avroSchema, avroSchema,
Option.empty(), new TypedProperties(), metaClient));
TypedProperties props = new TypedProperties();
props.put("hoodie.write.record.merge.mode", mergeMode.name());
props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),String.valueOf(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue()));