This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d272d283658f feat(schema): Remove direct usage of Avro schema in
Flink-client path (#17739)
d272d283658f is described below
commit d272d283658f5dc124f144c52ee7adc02b848db5
Author: Tim Brown <[email protected]>
AuthorDate: Mon Jan 12 22:16:08 2026 -0500
feat(schema): Remove direct usage of Avro schema in Flink-client path
(#17739)
---
.../hudi/execution/HoodieLazyInsertIterable.java | 2 +-
.../hudi/client/model/HoodieFlinkRecord.java | 33 +-
.../model/PartialUpdateFlinkRecordMerger.java | 8 +-
.../hudi/execution/FlinkLazyInsertIterable.java | 4 +-
.../row/HoodieRowDataFileWriterFactory.java | 6 +-
.../hudi/table/format/FlinkRecordContext.java | 16 +-
.../org/apache/hudi/util/AvroSchemaConverter.java | 399 ---------------------
.../apache/hudi/util/AvroToRowDataConverters.java | 2 +-
.../apache/hudi/util/HoodieSchemaConverter.java | 25 +-
...ueryContexts.java => RowDataQueryContexts.java} | 34 +-
.../apache/hudi/util/RowDataToAvroConverters.java | 61 ++--
.../hudi/merge/TestHoodieFlinkRecordMerger.java | 12 +-
.../hudi/util/TestHoodieSchemaConverter.java | 118 +++---
.../hudi/source/stats/ColumnStatsSchemas.java | 5 +-
.../org/apache/hudi/table/HoodieTableFactory.java | 10 +-
.../org/apache/hudi/table/HoodieTableSource.java | 4 +-
.../apache/hudi/table/catalog/HoodieCatalog.java | 16 +-
.../hudi/table/catalog/HoodieCatalogUtil.java | 5 +-
.../hudi/table/catalog/TableOptionProperties.java | 4 +-
.../table/format/FlinkRowDataReaderContext.java | 6 +-
.../hudi/table/format/InternalSchemaManager.java | 6 +-
.../hudi/table/format/cdc/CdcInputFormat.java | 12 +-
.../table/format/mor/MergeOnReadInputFormat.java | 4 +-
.../table/format/mor/MergeOnReadTableState.java | 12 +-
.../apache/hudi/source/TestStreamReadOperator.java | 6 +-
.../apache/hudi/table/ITTestSchemaEvolution.java | 6 +-
.../table/TestHoodieFileGroupReaderOnFlink.java | 12 +-
.../apache/hudi/table/TestHoodieTableFactory.java | 6 +-
.../apache/hudi/table/format/TestInputFormat.java | 6 +-
.../apache/hudi/utils/TestAvroSchemaConverter.java | 94 -----
.../org/apache/hudi/utils/TestConfigurations.java | 4 +-
.../test/java/org/apache/hudi/utils/TestData.java | 16 +-
.../utils/TestRecordKeyToRowDataConverter.java | 6 +-
.../hudi/utils/TestRowDataToAvroConverters.java | 6 +-
34 files changed, 248 insertions(+), 718 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
index 9655fdca5b13..8487a1f01ee2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
@@ -97,7 +97,7 @@ public abstract class HoodieLazyInsertIterable<T>
}
public static <T> Function<HoodieRecord<T>,
HoodieInsertValueGenResult<HoodieRecord>> getTransformerInternal(HoodieSchema
schema,
-
HoodieWriteConfig writeConfig) {
+
HoodieWriteConfig writeConfig) {
// NOTE: Whether record have to be cloned here is determined based on the
executor type used
// for writing: executors relying on an inner queue, will be keeping
references to the records
// and therefore in the environments where underlying buffer holding
the record could be
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
index 5d7917fc25a9..7f4fdd0e5caf 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
@@ -34,14 +34,13 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.keygen.BaseKeyGenerator;
import org.apache.hudi.table.format.FlinkRecordContext;
-import org.apache.hudi.util.RowDataAvroQueryContexts;
-import org.apache.hudi.util.RowDataAvroQueryContexts.RowDataQueryContext;
+import org.apache.hudi.util.RowDataQueryContexts;
+import org.apache.hudi.util.RowDataQueryContexts.RowDataQueryContext;
import org.apache.hudi.util.RowProjection;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
-import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericRowData;
@@ -103,7 +102,7 @@ public class HoodieFlinkRecord extends
HoodieRecord<RowData> {
if (recordSchema.getField(field).isEmpty()) {
return OrderingValues.getDefault();
}
- return (Comparable<?>) getColumnValue(recordSchema.toAvroSchema(),
field, props);
+ return (Comparable<?>) getColumnValue(recordSchema, field, props);
});
}
}
@@ -141,7 +140,7 @@ public class HoodieFlinkRecord extends
HoodieRecord<RowData> {
@Override
public String getRecordKey(HoodieSchema recordSchema, String keyFieldName) {
if (key == null) {
- String recordKey =
Objects.toString(RowDataAvroQueryContexts.fromAvroSchema(recordSchema.toAvroSchema()).getFieldQueryContext(keyFieldName).getFieldGetter().getFieldOrNull(data));
+ String recordKey =
Objects.toString(RowDataQueryContexts.fromSchema(recordSchema).getFieldQueryContext(keyFieldName).getFieldGetter().getFieldOrNull(data));
key = new HoodieKey(recordKey, null);
}
return getRecordKey();
@@ -164,9 +163,9 @@ public class HoodieFlinkRecord extends
HoodieRecord<RowData> {
if (fieldValue == null) {
return null;
}
-
+
HoodieSchemaType schemaType = fieldSchema.getType();
-
+
if (schemaType == HoodieSchemaType.DATE) {
return LocalDate.ofEpochDay(((Integer) fieldValue).longValue());
} else if (schemaType == HoodieSchemaType.TIMESTAMP &&
keepConsistentLogicalTimestamp) {
@@ -190,20 +189,20 @@ public class HoodieFlinkRecord extends
HoodieRecord<RowData> {
@Override
public Object getColumnValueAsJava(HoodieSchema recordSchema, String column,
Properties props) {
- return getColumnValueAsJava(recordSchema.toAvroSchema(), column, props,
true);
+ return getColumnValueAsJava(recordSchema, column, props, true);
}
- private Object getColumnValueAsJava(Schema recordSchema, String column,
Properties props, boolean allowsNull) {
+ private Object getColumnValueAsJava(HoodieSchema recordSchema, String
column, Properties props, boolean allowsNull) {
boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(),
HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
- RowDataQueryContext rowDataQueryContext =
RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone);
+ RowDataQueryContext rowDataQueryContext =
RowDataQueryContexts.fromSchema(recordSchema, utcTimezone);
return rowDataQueryContext.getFieldQueryContext(column).getValAsJava(data,
allowsNull);
}
- private Object getColumnValue(Schema recordSchema, String column, Properties
props) {
+ private Object getColumnValue(HoodieSchema recordSchema, String column,
Properties props) {
boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(),
HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
- RowDataQueryContext rowDataQueryContext =
RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone);
+ RowDataQueryContext rowDataQueryContext =
RowDataQueryContexts.fromSchema(recordSchema, utcTimezone);
return
rowDataQueryContext.getFieldQueryContext(column).getFieldGetter().getFieldOrNull(data);
}
@@ -236,7 +235,7 @@ public class HoodieFlinkRecord extends
HoodieRecord<RowData> {
@Override
public HoodieRecord rewriteRecordWithNewSchema(HoodieSchema recordSchema,
Properties props, HoodieSchema newSchema, Map<String, String> renameCols) {
- RowProjection rowProjection =
RowDataAvroQueryContexts.getRowProjection(recordSchema.toAvroSchema(),
newSchema.toAvroSchema(), renameCols);
+ RowProjection rowProjection =
RowDataQueryContexts.getRowProjection(recordSchema, newSchema, renameCols);
RowData newRow = rowProjection.project(getData());
return new HoodieFlinkRecord(getKey(), getOperation(), newRow);
}
@@ -285,8 +284,8 @@ public class HoodieFlinkRecord extends
HoodieRecord<RowData> {
public Option<HoodieAvroIndexedRecord> toIndexedRecord(HoodieSchema
recordSchema, Properties props) {
boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(),
HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
- RowDataQueryContext rowDataQueryContext =
RowDataAvroQueryContexts.fromAvroSchema(recordSchema.toAvroSchema(),
utcTimezone);
- IndexedRecord indexedRecord = (IndexedRecord)
rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema.toAvroSchema(),
getData());
+ RowDataQueryContext rowDataQueryContext =
RowDataQueryContexts.fromSchema(recordSchema, utcTimezone);
+ IndexedRecord indexedRecord = (IndexedRecord)
rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema,
getData());
return Option.of(new HoodieAvroIndexedRecord(getKey(), indexedRecord,
getOperation(), getMetadata(), orderingValue, isDelete));
}
@@ -294,8 +293,8 @@ public class HoodieFlinkRecord extends
HoodieRecord<RowData> {
public ByteArrayOutputStream getAvroBytes(HoodieSchema recordSchema,
Properties props) {
boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(),
HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
- RowDataQueryContext rowDataQueryContext =
RowDataAvroQueryContexts.fromAvroSchema(recordSchema.toAvroSchema(),
utcTimezone);
- IndexedRecord indexedRecord = (IndexedRecord)
rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema.toAvroSchema(),
getData());
+ RowDataQueryContext rowDataQueryContext =
RowDataQueryContexts.fromSchema(recordSchema, utcTimezone);
+ IndexedRecord indexedRecord = (IndexedRecord)
rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema,
getData());
return HoodieAvroUtils.avroToBytesStream(indexedRecord);
}
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
index a08c7f39a549..6bae6c2e8eac 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.engine.RecordContext;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.BufferedRecords;
-import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.util.RowDataQueryContexts;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@@ -129,7 +129,7 @@ public class PartialUpdateFlinkRecordMerger extends
HoodieFlinkRecordMerger {
// later in the file writer.
int mergedArity = newSchema.getFields().size();
boolean utcTimezone =
Boolean.parseBoolean(props.getProperty("read.utc-timezone", "true"));
- RowData.FieldGetter[] fieldGetters =
RowDataAvroQueryContexts.fromAvroSchema(newSchema.toAvroSchema(),
utcTimezone).fieldGetters();
+ RowData.FieldGetter[] fieldGetters =
RowDataQueryContexts.fromSchema(newSchema, utcTimezone).fieldGetters();
int lowOrderIdx = 0;
int highOrderIdx = 0;
@@ -138,10 +138,10 @@ public class PartialUpdateFlinkRecordMerger extends
HoodieFlinkRecordMerger {
// shift start index for merging if there is schema discrepancy
if (lowOrderArity != mergedArity) {
lowOrderIdx += lowOrderArity - mergedArity;
- lowOrderFieldGetters =
RowDataAvroQueryContexts.fromAvroSchema(lowOrderSchema.toAvroSchema(),
utcTimezone).fieldGetters();
+ lowOrderFieldGetters = RowDataQueryContexts.fromSchema(lowOrderSchema,
utcTimezone).fieldGetters();
} else if (highOrderArity != mergedArity) {
highOrderIdx += highOrderArity - mergedArity;
- highOrderFieldGetters =
RowDataAvroQueryContexts.fromAvroSchema(highOrderSchema.toAvroSchema(),
utcTimezone).fieldGetters();
+ highOrderFieldGetters = RowDataQueryContexts.fromSchema(highOrderSchema,
utcTimezone).fieldGetters();
}
RowData lowOrderRow = (RowData) lowOrderRecord.getRecord();
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
index dd98d8886e66..dbeb24c550ec 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
@@ -18,11 +18,11 @@
package org.apache.hudi.execution;
-import org.apache.hudi.common.schema.HoodieSchema;
-import org.apache.hudi.common.schema.HoodieSchemaCache;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
import org.apache.hudi.common.util.queue.HoodieExecutor;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
index 39ec2d08eb18..3dbc65c3290e 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
@@ -32,7 +32,7 @@ import org.apache.hudi.io.storage.HoodieFileWriterFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
-import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.util.RowDataQueryContexts;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hadoop.conf.Configuration;
@@ -67,7 +67,7 @@ public class HoodieRowDataFileWriterFactory extends
HoodieFileWriterFactory {
HoodieConfig config,
HoodieSchema schema) throws IOException {
//TODO boundary to revisit in follow up to use HoodieSchema directly
- final RowType rowType = (RowType)
RowDataAvroQueryContexts.fromAvroSchema(schema.getAvroSchema()).getRowType().getLogicalType();
+ final RowType rowType = (RowType)
RowDataQueryContexts.fromSchema(schema).getRowType().getLogicalType();
HoodieRowDataParquetWriteSupport writeSupport =
new HoodieRowDataParquetWriteSupport(
storage.getConf().unwrapAs(Configuration.class), rowType, null);
@@ -94,7 +94,7 @@ public class HoodieRowDataFileWriterFactory extends
HoodieFileWriterFactory {
HoodieSchema schema,
TaskContextSupplier taskContextSupplier) throws IOException {
//TODO boundary to revisit in follow up to use HoodieSchema directly
- final RowType rowType = (RowType)
RowDataAvroQueryContexts.fromAvroSchema(schema.getAvroSchema()).getRowType().getLogicalType();
+ final RowType rowType = (RowType)
RowDataQueryContexts.fromSchema(schema).getRowType().getLogicalType();
return newParquetFileWriter(instantTime, storagePath, config, rowType,
taskContextSupplier);
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
index 927ff5093b86..d242dcbfb9d6 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
@@ -34,7 +34,7 @@ import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.OrderingValueEngineTypeConverter;
import org.apache.hudi.util.RecordKeyToRowDataConverter;
-import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.util.RowDataQueryContexts;
import org.apache.hudi.util.RowDataUtils;
import org.apache.hudi.util.RowProjection;
import org.apache.hudi.util.SchemaEvolvingRowDataProjection;
@@ -82,8 +82,8 @@ public class FlinkRecordContext extends
RecordContext<RowData> {
@Override
public Object getValue(RowData record, HoodieSchema schema, String
fieldName) {
- RowDataAvroQueryContexts.FieldQueryContext fieldQueryContext =
- RowDataAvroQueryContexts.fromAvroSchema(schema.toAvroSchema(),
utcTimezone).getFieldQueryContext(fieldName);
+ RowDataQueryContexts.FieldQueryContext fieldQueryContext =
+ RowDataQueryContexts.fromSchema(schema,
utcTimezone).getFieldQueryContext(fieldName);
if (fieldQueryContext == null) {
return null;
} else {
@@ -108,7 +108,7 @@ public class FlinkRecordContext extends
RecordContext<RowData> {
@Override
public GenericRecord convertToAvroRecord(RowData record, HoodieSchema
schema) {
- return (GenericRecord)
RowDataAvroQueryContexts.fromAvroSchema(schema.toAvroSchema()).getRowDataToAvroConverter().convert(schema.toAvroSchema(),
record);
+ return (GenericRecord)
RowDataQueryContexts.fromSchema(schema).getRowDataToAvroConverter().convert(schema,
record);
}
@Override
@@ -125,7 +125,7 @@ public class FlinkRecordContext extends
RecordContext<RowData> {
@Override
public RowData convertAvroRecord(IndexedRecord avroRecord) {
Schema recordSchema = avroRecord.getSchema();
- AvroToRowDataConverters.AvroToRowDataConverter converter =
RowDataAvroQueryContexts.fromAvroSchema(recordSchema,
utcTimezone).getAvroToRowDataConverter();
+ AvroToRowDataConverters.AvroToRowDataConverter converter =
RowDataQueryContexts.fromSchema(HoodieSchema.fromAvroSchema(recordSchema),
utcTimezone).getAvroToRowDataConverter();
RowData rowData = (RowData) converter.convert(avroRecord);
Schema.Field operationField =
recordSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD);
if (operationField != null) {
@@ -180,7 +180,7 @@ public class FlinkRecordContext extends
RecordContext<RowData> {
if (record instanceof BinaryRowData) {
return record;
}
- RowDataSerializer rowDataSerializer =
RowDataAvroQueryContexts.getRowDataSerializer(schema.toAvroSchema());
+ RowDataSerializer rowDataSerializer =
RowDataQueryContexts.getRowDataSerializer(schema);
return rowDataSerializer.toBinaryRow(record);
}
@@ -197,8 +197,8 @@ public class FlinkRecordContext extends
RecordContext<RowData> {
*/
@Override
public UnaryOperator<RowData> projectRecord(HoodieSchema from, HoodieSchema
to, Map<String, String> renamedColumns) {
- RowType fromType = (RowType)
RowDataAvroQueryContexts.fromAvroSchema(from.toAvroSchema()).getRowType().getLogicalType();
- RowType toType = (RowType)
RowDataAvroQueryContexts.fromAvroSchema(to.toAvroSchema()).getRowType().getLogicalType();
+ RowType fromType = (RowType)
RowDataQueryContexts.fromSchema(from).getRowType().getLogicalType();
+ RowType toType = (RowType)
RowDataQueryContexts.fromSchema(to).getRowType().getLogicalType();
RowProjection rowProjection =
SchemaEvolvingRowDataProjection.instance(fromType, toType, renamedColumns);
return rowProjection::project;
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
deleted file mode 100644
index 1729044a1464..000000000000
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.util;
-
-import org.apache.hudi.common.util.ReflectionUtils;
-
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.ArrayType;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeFamily;
-import org.apache.flink.table.types.logical.MapType;
-import org.apache.flink.table.types.logical.MultisetType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimeType;
-import org.apache.flink.table.types.logical.TimestampType;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Converts an Avro schema into Flink's type information. It uses {@link
org.apache.flink.api.java.typeutils.RowTypeInfo} for
- * representing objects and converts Avro types into types that are compatible
with Flink's Table &
- * SQL API.
- *
- * <p>Note: Changes in this class need to be kept in sync with the
corresponding runtime classes
- * {@code org.apache.flink.formats.avro.AvroRowDeserializationSchema} and
{@code org.apache.flink.formats.avro.AvroRowSerializationSchema}.
- *
- * <p>NOTE: reference from Flink release 1.12.0, should remove when Flink
version upgrade to that.
- */
-public class AvroSchemaConverter {
-
- /**
- * Converts an Avro schema {@code schema} into a nested row structure with
deterministic field order and
- * data types that are compatible with Flink's Table & SQL API.
- *
- * @param schema Avro schema definition
- * @return data type matching the schema
- */
- public static DataType convertToDataType(Schema schema) {
- switch (schema.getType()) {
- case RECORD:
- final List<Schema.Field> schemaFields = schema.getFields();
-
- final DataTypes.Field[] fields = new
DataTypes.Field[schemaFields.size()];
- for (int i = 0; i < schemaFields.size(); i++) {
- final Schema.Field field = schemaFields.get(i);
- fields[i] = DataTypes.FIELD(field.name(),
convertToDataType(field.schema()));
- }
- return DataTypes.ROW(fields).notNull();
- case ENUM:
- case STRING:
- // convert Avro's Utf8/CharSequence to String
- return DataTypes.STRING().notNull();
- case ARRAY:
- return
DataTypes.ARRAY(convertToDataType(schema.getElementType())).notNull();
- case MAP:
- return DataTypes.MAP(
- DataTypes.STRING().notNull(),
- convertToDataType(schema.getValueType()))
- .notNull();
- case UNION:
- final Schema actualSchema;
- final boolean nullable;
- if (schema.getTypes().size() == 2
- && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
- actualSchema = schema.getTypes().get(1);
- nullable = true;
- } else if (schema.getTypes().size() == 2
- && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
- actualSchema = schema.getTypes().get(0);
- nullable = true;
- } else if (schema.getTypes().size() == 1) {
- actualSchema = schema.getTypes().get(0);
- nullable = false;
- } else {
- List<Schema> nonNullTypes = schema.getTypes().stream()
- .filter(s -> s.getType() != Schema.Type.NULL)
- .collect(Collectors.toList());
- nullable = schema.getTypes().size() > nonNullTypes.size();
-
- // use Kryo for serialization
- DataType rawDataType = (DataType) ReflectionUtils.invokeStaticMethod(
- "org.apache.hudi.utils.DataTypeUtils",
- "createAtomicRawType",
- new Object[] {false, Types.GENERIC(Object.class)},
- Boolean.class,
- TypeInformation.class);
-
- if (recordTypesOfSameNumFields(nonNullTypes)) {
- DataType converted = DataTypes.ROW(
- DataTypes.FIELD("wrapper", rawDataType))
- .notNull();
- return nullable ? converted.nullable() : converted;
- }
- // use Kryo for serialization
- return nullable ? rawDataType.nullable() : rawDataType;
- }
- DataType converted = convertToDataType(actualSchema);
- return nullable ? converted.nullable() : converted;
- case FIXED:
- // logical decimal type
- if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
- final LogicalTypes.Decimal decimalType =
- (LogicalTypes.Decimal) schema.getLogicalType();
- return DataTypes.DECIMAL(decimalType.getPrecision(),
decimalType.getScale())
- .notNull();
- }
- // convert fixed size binary data to primitive byte arrays
- return DataTypes.VARBINARY(schema.getFixedSize()).notNull();
- case BYTES:
- // logical decimal type
- if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
- final LogicalTypes.Decimal decimalType =
- (LogicalTypes.Decimal) schema.getLogicalType();
- return DataTypes.DECIMAL(decimalType.getPrecision(),
decimalType.getScale())
- .notNull();
- }
- return DataTypes.BYTES().notNull();
- case INT:
- // logical date and time type
- final org.apache.avro.LogicalType logicalType =
schema.getLogicalType();
- if (logicalType == LogicalTypes.date()) {
- return DataTypes.DATE().notNull();
- } else if (logicalType == LogicalTypes.timeMillis()) {
- return DataTypes.TIME(3).notNull();
- }
- return DataTypes.INT().notNull();
- case LONG:
- // logical timestamp type
- if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
- return DataTypes.TIMESTAMP(3).notNull();
- } else if (schema.getLogicalType() ==
LogicalTypes.localTimestampMillis()) {
- return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull();
- } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
- return DataTypes.TIMESTAMP(6).notNull();
- } else if (schema.getLogicalType() ==
LogicalTypes.localTimestampMicros()) {
- return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull();
- } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
- return DataTypes.TIME(3).notNull();
- } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
- return DataTypes.TIME(6).notNull();
- }
- return DataTypes.BIGINT().notNull();
- case FLOAT:
- return DataTypes.FLOAT().notNull();
- case DOUBLE:
- return DataTypes.DOUBLE().notNull();
- case BOOLEAN:
- return DataTypes.BOOLEAN().notNull();
- case NULL:
- return DataTypes.NULL();
- default:
- throw new IllegalArgumentException("Unsupported Avro type '" +
schema.getType() + "'.");
- }
- }
-
- /**
- * Returns true if all the types are RECORD type with same number of fields.
- */
- private static boolean recordTypesOfSameNumFields(List<Schema> types) {
- if (types == null || types.size() == 0) {
- return false;
- }
- if (types.stream().anyMatch(s -> s.getType() != Schema.Type.RECORD)) {
- return false;
- }
- int numFields = types.get(0).getFields().size();
- return types.stream().allMatch(s -> s.getFields().size() == numFields);
- }
-
- /**
- * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro
schema.
- *
- * <p>Use "record" as the type name.
- *
- * @param schema the schema type, usually it should be the top level record
type, e.g. not a
- * nested type
- * @return Avro's {@link Schema} matching this logical type.
- */
- public static Schema convertToSchema(LogicalType schema) {
- return convertToSchema(schema, "record");
- }
-
- /**
- * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro
schema.
- *
- * <p>The "{rowName}." is used as the nested row type name prefix in order
to generate the right
- * schema. Nested record type that only differs with type name is still
compatible.
- *
- * @param logicalType logical type
- * @param rowName the record name
- * @return Avro's {@link Schema} matching this logical type.
- */
- public static Schema convertToSchema(LogicalType logicalType, String
rowName) {
- int precision;
- boolean nullable = logicalType.isNullable();
- switch (logicalType.getTypeRoot()) {
- case NULL:
- return SchemaBuilder.builder().nullType();
- case BOOLEAN:
- Schema bool = SchemaBuilder.builder().booleanType();
- return nullable ? nullableSchema(bool) : bool;
- case TINYINT:
- case SMALLINT:
- case INTEGER:
- Schema integer = SchemaBuilder.builder().intType();
- return nullable ? nullableSchema(integer) : integer;
- case BIGINT:
- Schema bigint = SchemaBuilder.builder().longType();
- return nullable ? nullableSchema(bigint) : bigint;
- case FLOAT:
- Schema f = SchemaBuilder.builder().floatType();
- return nullable ? nullableSchema(f) : f;
- case DOUBLE:
- Schema d = SchemaBuilder.builder().doubleType();
- return nullable ? nullableSchema(d) : d;
- case CHAR:
- case VARCHAR:
- Schema str = SchemaBuilder.builder().stringType();
- return nullable ? nullableSchema(str) : str;
- case BINARY:
- case VARBINARY:
- Schema binary = SchemaBuilder.builder().bytesType();
- return nullable ? nullableSchema(binary) : binary;
- case TIMESTAMP_WITHOUT_TIME_ZONE:
- // use long to represents Timestamp
- final TimestampType timestampType = (TimestampType) logicalType;
- precision = timestampType.getPrecision();
- org.apache.avro.LogicalType timestampLogicalType;
- if (precision <= 3) {
- timestampLogicalType = LogicalTypes.timestampMillis();
- } else if (precision <= 6) {
- timestampLogicalType = LogicalTypes.timestampMicros();
- } else {
- throw new IllegalArgumentException(
- "Avro does not support TIMESTAMP type with precision: "
- + precision
- + ", it only support precisions <= 6.");
- }
- Schema timestamp =
timestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
- return nullable ? nullableSchema(timestamp) : timestamp;
- case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
- // use long to represents LocalZonedTimestampType
- final LocalZonedTimestampType localZonedTimestampType =
(LocalZonedTimestampType) logicalType;
- precision = localZonedTimestampType.getPrecision();
- org.apache.avro.LogicalType localZonedTimestampLogicalType;
- if (precision <= 3) {
- localZonedTimestampLogicalType = LogicalTypes.localTimestampMillis();
- } else if (precision <= 6) {
- localZonedTimestampLogicalType = LogicalTypes.localTimestampMicros();
- } else {
- throw new IllegalArgumentException(
- "Avro does not support LOCAL TIMESTAMP type with precision: "
- + precision
- + ", it only support precisions <= 6.");
- }
- Schema localZonedTimestamp =
localZonedTimestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
- return nullable ? nullableSchema(localZonedTimestamp) :
localZonedTimestamp;
- case DATE:
- // use int to represents Date
- Schema date =
LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
- return nullable ? nullableSchema(date) : date;
- case TIME_WITHOUT_TIME_ZONE:
- precision = ((TimeType) logicalType).getPrecision();
- if (precision > 3) {
- throw new IllegalArgumentException(
- "Avro does not support TIME type with precision: "
- + precision
- + ", it only supports precision less than 3.");
- }
- // use int to represents Time, we only support millisecond when
deserialization
- Schema time =
-
LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
- return nullable ? nullableSchema(time) : time;
- case DECIMAL:
- DecimalType decimalType = (DecimalType) logicalType;
- // store BigDecimal as Fixed
- // for spark compatibility.
- Schema decimal =
- LogicalTypes.decimal(decimalType.getPrecision(),
decimalType.getScale())
- .addToSchema(SchemaBuilder
- .fixed(String.format("%s.fixed", rowName))
-
.size(computeMinBytesForDecimalPrecision(decimalType.getPrecision())));
- return nullable ? nullableSchema(decimal) : decimal;
- case ROW:
- RowType rowType = (RowType) logicalType;
- List<String> fieldNames = rowType.getFieldNames();
- // we have to make sure the record name is different in a Schema
- SchemaBuilder.FieldAssembler<Schema> builder =
- SchemaBuilder.builder().record(rowName).fields();
- for (int i = 0; i < rowType.getFieldCount(); i++) {
- String fieldName = fieldNames.get(i);
- LogicalType fieldType = rowType.getTypeAt(i);
- SchemaBuilder.GenericDefault<Schema> fieldBuilder =
- builder.name(fieldName)
- .type(convertToSchema(fieldType, rowName + "." + fieldName));
-
- if (fieldType.isNullable()) {
- builder = fieldBuilder.withDefault(null);
- } else {
- builder = fieldBuilder.noDefault();
- }
- }
- Schema record = builder.endRecord();
- return nullable ? nullableSchema(record) : record;
- case MULTISET:
- case MAP:
- Schema map =
- SchemaBuilder.builder()
- .map()
- .values(
- convertToSchema(
- extractValueTypeToAvroMap(logicalType), rowName));
- return nullable ? nullableSchema(map) : map;
- case ARRAY:
- ArrayType arrayType = (ArrayType) logicalType;
- Schema array =
- SchemaBuilder.builder()
- .array()
- .items(convertToSchema(arrayType.getElementType(), rowName));
- return nullable ? nullableSchema(array) : array;
- case RAW:
- default:
- throw new UnsupportedOperationException(
- "Unsupported to derive Schema for type: " + logicalType);
- }
- }
-
- public static LogicalType extractValueTypeToAvroMap(LogicalType type) {
- LogicalType keyType;
- LogicalType valueType;
- if (type instanceof MapType) {
- MapType mapType = (MapType) type;
- keyType = mapType.getKeyType();
- valueType = mapType.getValueType();
- } else {
- MultisetType multisetType = (MultisetType) type;
- keyType = multisetType.getElementType();
- valueType = new IntType();
- }
- if (!isFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
- throw new UnsupportedOperationException(
- "Avro format doesn't support non-string as key type of map. "
- + "The key type is: "
- + keyType.asSummaryString());
- }
- return valueType;
- }
-
- /**
- * Returns whether the given logical type belongs to the family.
- */
- public static boolean isFamily(LogicalType logicalType, LogicalTypeFamily
family) {
- return logicalType.getTypeRoot().getFamilies().contains(family);
- }
-
- /**
- * Returns schema with nullable true.
- */
- private static Schema nullableSchema(Schema schema) {
- return schema.isNullable()
- ? schema
- : Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
- }
-
- private static int computeMinBytesForDecimalPrecision(int precision) {
- int numBytes = 1;
- while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) {
- numBytes += 1;
- }
- return numBytes;
- }
-}
-
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
index f9107e6df53e..fee24e520382 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
@@ -198,7 +198,7 @@ public class AvroToRowDataConverters {
final AvroToRowDataConverter keyConverter =
createConverter(DataTypes.STRING().getLogicalType(), utcTimezone);
final AvroToRowDataConverter valueConverter =
-
createNullableConverter(AvroSchemaConverter.extractValueTypeToAvroMap(type),
utcTimezone);
+
createNullableConverter(HoodieSchemaConverter.extractValueTypeToMap(type),
utcTimezone);
return avroObject -> {
final Map<?, ?> map = (Map<?, ?>) avroObject;
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
index 14c59d5789a8..5e211c4851a0 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
@@ -420,7 +420,7 @@ public class HoodieSchemaConverter {
return convertToDataType(unionTypes.get(0));
}
- // Complex multi-type unions - use RAW type (matches AvroSchemaConverter
logic)
+ // Complex multi-type unions - use RAW type
List<HoodieSchema> nonNullTypes = unionTypes.stream()
.filter(t -> t.getType() != HoodieSchemaType.NULL)
.collect(Collectors.toList());
@@ -458,4 +458,25 @@ public class HoodieSchemaConverter {
int numFields = types.get(0).getFields().size();
return types.stream().allMatch(s -> s.getFields().size() == numFields);
}
-}
\ No newline at end of file
+
+ public static LogicalType extractValueTypeToMap(LogicalType type) {
+ LogicalType keyType;
+ LogicalType valueType;
+ if (type instanceof MapType) {
+ MapType mapType = (MapType) type;
+ keyType = mapType.getKeyType();
+ valueType = mapType.getValueType();
+ } else {
+ MultisetType multisetType = (MultisetType) type;
+ keyType = multisetType.getElementType();
+ valueType = new IntType();
+ }
+ if (!isFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
+ throw new UnsupportedOperationException(
+ "Avro format doesn't support non-string as key type of map. "
+ + "The key type is: "
+ + keyType.asSummaryString());
+ }
+ return valueType;
+ }
+}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataQueryContexts.java
similarity index 76%
rename from
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
rename to
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataQueryContexts.java
index ad020b41912d..ace597859a3a 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataQueryContexts.java
@@ -18,6 +18,7 @@
package org.apache.hudi.util;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.common.util.collection.Triple;
import org.apache.hudi.exception.HoodieException;
@@ -26,7 +27,6 @@ import
org.apache.hudi.util.RowDataToAvroConverters.RowDataToAvroConverter;
import lombok.AllArgsConstructor;
import lombok.Getter;
-import org.apache.avro.Schema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.DataType;
@@ -41,21 +41,21 @@ import java.util.function.Function;
/**
* Maintains auxiliary utilities for row data fields handling.
*/
-public class RowDataAvroQueryContexts {
- private static final Map<Pair<Schema, Boolean>, RowDataQueryContext>
QUERY_CONTEXT_MAP = new ConcurrentHashMap<>();
+public class RowDataQueryContexts {
+ private static final Map<Pair<HoodieSchema, Boolean>, RowDataQueryContext>
QUERY_CONTEXT_MAP = new ConcurrentHashMap<>();
// BinaryRowWriter in RowDataSerializer are reused, and it's not thread-safe.
- private static final ThreadLocal<Map<Schema, RowDataSerializer>>
ROWDATA_SERIALIZER_CACHE = ThreadLocal.withInitial(HashMap::new);
+ private static final ThreadLocal<Map<HoodieSchema, RowDataSerializer>>
ROWDATA_SERIALIZER_CACHE = ThreadLocal.withInitial(HashMap::new);
- private static final Map<Triple<Schema, Schema, Map<String, String>>,
RowProjection> ROW_PROJECTION_CACHE = new ConcurrentHashMap<>();
+ private static final Map<Triple<HoodieSchema, HoodieSchema, Map<String,
String>>, RowProjection> ROW_PROJECTION_CACHE = new ConcurrentHashMap<>();
- public static RowDataQueryContext fromAvroSchema(Schema avroSchema) {
- return fromAvroSchema(avroSchema, true);
+ public static RowDataQueryContext fromSchema(HoodieSchema schema) {
+ return fromSchema(schema, true);
}
- public static RowDataQueryContext fromAvroSchema(Schema avroSchema, boolean
utcTimezone) {
- return QUERY_CONTEXT_MAP.computeIfAbsent(Pair.of(avroSchema, utcTimezone),
k -> {
- DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema);
+ public static RowDataQueryContext fromSchema(HoodieSchema schema, boolean
utcTimezone) {
+ return QUERY_CONTEXT_MAP.computeIfAbsent(Pair.of(schema, utcTimezone), k
-> {
+ DataType dataType = HoodieSchemaConverter.convertToDataType(schema);
RowType rowType = (RowType) dataType.getLogicalType();
RowType.RowField[] rowFields = rowType.getFields().toArray(new
RowType.RowField[0]);
RowData.FieldGetter[] fieldGetters = new
RowData.FieldGetter[rowFields.length];
@@ -72,18 +72,18 @@ public class RowDataAvroQueryContexts {
});
}
- public static RowDataSerializer getRowDataSerializer(Schema avroSchema) {
- return ROWDATA_SERIALIZER_CACHE.get().computeIfAbsent(avroSchema, schema
-> {
- RowType rowType = (RowType)
fromAvroSchema(schema).getRowType().getLogicalType();
+ public static RowDataSerializer getRowDataSerializer(HoodieSchema schema) {
+ return ROWDATA_SERIALIZER_CACHE.get().computeIfAbsent(schema,
providedSchema -> {
+ RowType rowType = (RowType)
fromSchema(providedSchema).getRowType().getLogicalType();
return new RowDataSerializer(rowType);
});
}
- public static RowProjection getRowProjection(Schema from, Schema to,
Map<String, String> renameCols) {
- Triple<Schema, Schema, Map<String, String>> cacheKey = Triple.of(from, to,
renameCols);
+ public static RowProjection getRowProjection(HoodieSchema from, HoodieSchema
to, Map<String, String> renameCols) {
+ Triple<HoodieSchema, HoodieSchema, Map<String, String>> cacheKey =
Triple.of(from, to, renameCols);
return ROW_PROJECTION_CACHE.computeIfAbsent(cacheKey, key -> {
- RowType fromType = (RowType)
RowDataAvroQueryContexts.fromAvroSchema(from).getRowType().getLogicalType();
- RowType toType = (RowType)
RowDataAvroQueryContexts.fromAvroSchema(to).getRowType().getLogicalType();
+ RowType fromType = (RowType)
RowDataQueryContexts.fromSchema(from).getRowType().getLogicalType();
+ RowType toType = (RowType)
RowDataQueryContexts.fromSchema(to).getRowType().getLogicalType();
return SchemaEvolvingRowDataProjection.instance(fromType, toType,
renameCols);
});
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
index bc86e28248f2..9eb2979e19aa 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
@@ -18,8 +18,11 @@
package org.apache.hudi.util;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
import org.apache.avro.Conversions;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
@@ -63,7 +66,7 @@ public class RowDataToAvroConverters {
*/
@FunctionalInterface
public interface RowDataToAvroConverter extends Serializable {
- Object convert(Schema schema, Object object);
+ Object convert(HoodieSchema schema, Object object);
}
//
--------------------------------------------------------------------------------
@@ -90,7 +93,7 @@ public class RowDataToAvroConverters {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Schema schema, Object object) {
+ public Object convert(HoodieSchema schema, Object object) {
return null;
}
};
@@ -101,7 +104,7 @@ public class RowDataToAvroConverters {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Schema schema, Object object) {
+ public Object convert(HoodieSchema schema, Object object) {
return ((Byte) object).intValue();
}
};
@@ -112,7 +115,7 @@ public class RowDataToAvroConverters {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Schema schema, Object object) {
+ public Object convert(HoodieSchema schema, Object object) {
return ((Short) object).intValue();
}
};
@@ -131,7 +134,7 @@ public class RowDataToAvroConverters {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Schema schema, Object object) {
+ public Object convert(HoodieSchema schema, Object object) {
return object;
}
};
@@ -143,7 +146,7 @@ public class RowDataToAvroConverters {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Schema schema, Object object) {
+ public Object convert(HoodieSchema schema, Object object) {
return new Utf8(((BinaryStringData) object).toBytes());
}
};
@@ -155,7 +158,7 @@ public class RowDataToAvroConverters {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Schema schema, Object object) {
+ public Object convert(HoodieSchema schema, Object object) {
return ByteBuffer.wrap((byte[]) object);
}
};
@@ -167,7 +170,7 @@ public class RowDataToAvroConverters {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Schema schema, Object object) {
+ public Object convert(HoodieSchema schema, Object object) {
return ((TimestampData) object).toInstant().toEpochMilli();
}
};
@@ -176,7 +179,7 @@ public class RowDataToAvroConverters {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Schema schema, Object object) {
+ public Object convert(HoodieSchema schema, Object object) {
Instant instant = ((TimestampData) object).toInstant();
return
Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1000_000),
instant.getNano() / 1000);
}
@@ -193,7 +196,7 @@ public class RowDataToAvroConverters {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Schema schema, Object object) {
+ public Object convert(HoodieSchema schema, Object object) {
return utcTimezone ? ((TimestampData)
object).toInstant().toEpochMilli() : ((TimestampData)
object).toTimestamp().getTime();
}
};
@@ -203,7 +206,7 @@ public class RowDataToAvroConverters {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Schema schema, Object object) {
+ public Object convert(HoodieSchema schema, Object object) {
Instant instant = utcTimezone ? ((TimestampData)
object).toInstant() : ((TimestampData) object).toTimestamp().toInstant();
return
Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1000_000),
instant.getNano() / 1000);
}
@@ -218,9 +221,9 @@ public class RowDataToAvroConverters {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Schema schema, Object object) {
+ public Object convert(HoodieSchema schema, Object object) {
BigDecimal javaDecimal = ((DecimalData) object).toBigDecimal();
- return DECIMAL_CONVERSION.toFixed(javaDecimal, schema,
schema.getLogicalType());
+ return DECIMAL_CONVERSION.toFixed(javaDecimal,
schema.toAvroSchema(), schema.toAvroSchema().getLogicalType());
}
};
break;
@@ -244,19 +247,19 @@ public class RowDataToAvroConverters {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Schema schema, Object object) {
+ public Object convert(HoodieSchema schema, Object object) {
if (object == null) {
return null;
}
// get actual schema if it is a nullable schema
- Schema actualSchema;
- if (schema.getType() == Schema.Type.UNION) {
- List<Schema> types = schema.getTypes();
+ HoodieSchema actualSchema;
+ if (schema.getType() == HoodieSchemaType.UNION) {
+ List<HoodieSchema> types = schema.getTypes();
int size = types.size();
- if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+ if (size == 2 && types.get(1).getType() == HoodieSchemaType.NULL) {
actualSchema = types.get(0);
- } else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+ } else if (size == 2 && types.get(0).getType() ==
HoodieSchemaType.NULL) {
actualSchema = types.get(1);
} else {
throw new IllegalArgumentException(
@@ -289,12 +292,12 @@ public class RowDataToAvroConverters {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Schema schema, Object object) {
+ public Object convert(HoodieSchema schema, Object object) {
final RowData row = (RowData) object;
- final List<Schema.Field> fields = schema.getFields();
- final GenericRecord record = new GenericData.Record(schema);
+ final List<HoodieSchemaField> fields = schema.getFields();
+ final GenericRecord record = new
GenericData.Record(schema.toAvroSchema());
for (int i = 0; i < length; ++i) {
- final Schema.Field schemaField = fields.get(i);
+ final HoodieSchemaField schemaField = fields.get(i);
Object avroObject =
fieldConverters[i].convert(
schemaField.schema(), fieldGetters[i].getFieldOrNull(row));
@@ -314,8 +317,8 @@ public class RowDataToAvroConverters {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Schema schema, Object object) {
- final Schema elementSchema = schema.getElementType();
+ public Object convert(HoodieSchema schema, Object object) {
+ final HoodieSchema elementSchema = schema.getElementType();
ArrayData arrayData = (ArrayData) object;
List<Object> list = new ArrayList<>();
for (int i = 0; i < arrayData.size(); ++i) {
@@ -329,7 +332,7 @@ public class RowDataToAvroConverters {
}
private static RowDataToAvroConverter createMapConverter(LogicalType type,
boolean utcTimezone) {
- LogicalType valueType =
AvroSchemaConverter.extractValueTypeToAvroMap(type);
+ LogicalType valueType = HoodieSchemaConverter.extractValueTypeToMap(type);
final ArrayData.ElementGetter valueGetter =
ArrayData.createElementGetter(valueType);
final RowDataToAvroConverter valueConverter = createConverter(valueType,
utcTimezone);
@@ -337,8 +340,8 @@ public class RowDataToAvroConverters {
private static final long serialVersionUID = 1L;
@Override
- public Object convert(Schema schema, Object object) {
- final Schema valueSchema = schema.getValueType();
+ public Object convert(HoodieSchema schema, Object object) {
+ final HoodieSchema valueSchema = schema.getValueType();
final MapData mapData = (MapData) object;
final ArrayData keyArray = mapData.keyArray();
final ArrayData valueArray = mapData.valueArray();
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/merge/TestHoodieFlinkRecordMerger.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/merge/TestHoodieFlinkRecordMerger.java
index a6c3f5031aff..e932a077da61 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/merge/TestHoodieFlinkRecordMerger.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/merge/TestHoodieFlinkRecordMerger.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.read.BufferedRecord;
import org.apache.hudi.common.table.read.BufferedRecords;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
@@ -68,7 +68,7 @@ public class TestHoodieFlinkRecordMerger {
@Test
void testMergingWithNewRecordAsDelete() throws IOException {
- HoodieSchema schema =
HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(RECORD_ROWTYPE));
+ HoodieSchema schema =
HoodieSchemaConverter.convertToSchema(RECORD_ROWTYPE);
HoodieKey key = new HoodieKey(RECORD_KEY, PARTITION);
RowData oldRow = createRow(key, "001", "001_01", "file1", 1, "str_val1",
1L);
BufferedRecord<RowData> oldRecord =
BufferedRecords.fromEngineRecord(oldRow, schema, recordContext, 1L, RECORD_KEY,
false);
@@ -82,7 +82,7 @@ public class TestHoodieFlinkRecordMerger {
@Test
void testMergingWithOldRecordAsDelete() throws IOException {
- HoodieSchema schema =
HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(RECORD_ROWTYPE));
+ HoodieSchema schema =
HoodieSchemaConverter.convertToSchema(RECORD_ROWTYPE);
HoodieKey key = new HoodieKey(RECORD_KEY, PARTITION);
RowData newRow = createRow(key, "001", "001_01", "file1", 1, "str_val1",
1L);
BufferedRecord<RowData> newRecord =
BufferedRecords.fromEngineRecord(newRow, schema, recordContext, 1L, RECORD_KEY,
false);
@@ -96,7 +96,7 @@ public class TestHoodieFlinkRecordMerger {
@Test
void testMergingWithOldRecordAccepted() throws IOException {
- HoodieSchema schema =
HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(RECORD_ROWTYPE));
+ HoodieSchema schema =
HoodieSchemaConverter.convertToSchema(RECORD_ROWTYPE);
HoodieKey key = new HoodieKey(RECORD_KEY, PARTITION);
RowData oldRow = createRow(key, "001", "001_01", "file1", 1, "str_val1",
3L);
BufferedRecord<RowData> oldRecord =
BufferedRecords.fromEngineRecord(oldRow, schema, recordContext, 3L, RECORD_KEY,
false);
@@ -111,7 +111,7 @@ public class TestHoodieFlinkRecordMerger {
@Test
void testMergingWithNewRecordAccepted() throws IOException {
- HoodieSchema schema =
HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(RECORD_ROWTYPE));
+ HoodieSchema schema =
HoodieSchemaConverter.convertToSchema(RECORD_ROWTYPE);
HoodieKey key = new HoodieKey(RECORD_KEY, PARTITION);
RowData oldRow = createRow(key, "001", "001_01", "file1", 1, "str_val1",
1L);
BufferedRecord<RowData> oldRecord =
BufferedRecords.fromEngineRecord(oldRow, schema, recordContext, 1L, RECORD_KEY,
false);
@@ -126,7 +126,7 @@ public class TestHoodieFlinkRecordMerger {
@Test
void testMergingWithCommitTimeRecordMerger() throws IOException {
- HoodieSchema schema =
HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(RECORD_ROWTYPE));
+ HoodieSchema schema =
HoodieSchemaConverter.convertToSchema(RECORD_ROWTYPE);
HoodieKey key = new HoodieKey(RECORD_KEY, PARTITION);
RowData oldRow = createRow(key, "001", "001_01", "file1", 1, "str_val1",
2L);
BufferedRecord<RowData> oldRecord =
BufferedRecords.fromEngineRecord(oldRow, schema, recordContext, 2L, RECORD_KEY,
false);
diff --git
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
index c4892c48e696..a6fc063857eb 100644
---
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
+++
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
@@ -19,9 +19,11 @@
package org.apache.hudi.util;
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.schema.HoodieSchemaField;
import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
@@ -248,43 +250,6 @@ public class TestHoodieSchemaConverter {
assertEquals(2, addressSchema.getFields().size());
}
- @Test
- public void testCompareWithAvroConversion() {
- // Test that HoodieSchemaConverter produces the same result as
- // AvroSchemaConverter + HoodieSchema.fromAvroSchema()
-
- RowType flinkRowType = (RowType) DataTypes.ROW(
- DataTypes.FIELD("id", DataTypes.BIGINT().notNull()),
- DataTypes.FIELD("name", DataTypes.STRING().nullable()),
- DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP(3).notNull()),
- DataTypes.FIELD("decimal_val", DataTypes.DECIMAL(10, 2).notNull())
- ).notNull().getLogicalType();
-
- // Method 1: Direct HoodieSchema conversion
- HoodieSchema directSchema =
HoodieSchemaConverter.convertToSchema(flinkRowType, "TestRecord");
-
- // Method 2: Via Avro conversion
- HoodieSchema viaAvroSchema = HoodieSchema.fromAvroSchema(
- AvroSchemaConverter.convertToSchema(flinkRowType, "TestRecord"));
-
- // Both should produce equivalent schemas
- assertNotNull(directSchema);
- assertNotNull(viaAvroSchema);
- assertEquals(HoodieSchemaType.RECORD, directSchema.getType());
- assertEquals(HoodieSchemaType.RECORD, viaAvroSchema.getType());
- assertEquals(4, directSchema.getFields().size());
- assertEquals(4, viaAvroSchema.getFields().size());
-
- // Verify field types match
- for (int i = 0; i < 4; i++) {
- assertEquals(
- viaAvroSchema.getFields().get(i).schema().getType(),
- directSchema.getFields().get(i).schema().getType(),
- "Field " + i + " type mismatch"
- );
- }
- }
-
@Test
public void testComplexNestedStructure() {
LogicalType complexType = DataTypes.ROW(
@@ -317,26 +282,6 @@ public class TestHoodieSchemaConverter {
assertEquals(2, nestedRecord.getFields().size());
}
- @Test
- public void testNativeConversionMatchesAvroPath() {
- // Verify native conversion produces same result as Avro path
- RowType originalRowType = (RowType) DataTypes.ROW(
- DataTypes.FIELD("id", DataTypes.BIGINT().notNull()),
- DataTypes.FIELD("name", DataTypes.STRING().nullable()),
- DataTypes.FIELD("age", DataTypes.INT().notNull())
- ).notNull().getLogicalType();
-
- HoodieSchema hoodieSchema =
HoodieSchemaConverter.convertToSchema(originalRowType, "TestRecord");
-
- // Native conversion
- DataType nativeResult =
HoodieSchemaConverter.convertToDataType(hoodieSchema);
-
- // Avro path (for comparison)
- DataType avroResult =
AvroSchemaConverter.convertToDataType(hoodieSchema.getAvroSchema());
-
- assertEquals(avroResult.getLogicalType(), nativeResult.getLogicalType());
- }
-
@Test
public void testRoundTripConversion() {
RowType originalRowType = (RowType) DataTypes.ROW(
@@ -528,4 +473,61 @@ public class TestHoodieSchemaConverter {
DataType dataType = HoodieSchemaConverter.convertToDataType(fixedSchema);
assertTrue(dataType.getLogicalType() instanceof VarBinaryType);
}
-}
\ No newline at end of file
+
+ @Test
+ void testUnionSchemaWithMultipleRecordTypes() {
+ HoodieSchema schema =
HoodieSchema.fromAvroSchema(HoodieMetadataRecord.SCHEMA$);
+ DataType dataType = HoodieSchemaConverter.convertToDataType(schema);
+ int pos =
HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos();
+ final String expected = "ROW<"
+ + "`fileName` STRING, "
+ + "`columnName` STRING, "
+ + "`minValue` ROW<`wrapper` RAW('java.lang.Object', ?) NOT NULL>, "
+ + "`maxValue` ROW<`wrapper` RAW('java.lang.Object', ?) NOT NULL>, "
+ + "`valueCount` BIGINT, "
+ + "`nullCount` BIGINT, "
+ + "`totalSize` BIGINT, "
+ + "`totalUncompressedSize` BIGINT, "
+ + "`isDeleted` BOOLEAN NOT NULL, "
+ + "`isTightBound` BOOLEAN NOT NULL, "
+ + "`valueType` ROW<`typeOrdinal` INT NOT NULL, `additionalInfo`
STRING>>";
+ assertEquals(expected, dataType.getChildren().get(pos).toString());
+ }
+
+ @Test
+ void testLocalTimestampType() {
+ DataType dataType = DataTypes.ROW(
+ DataTypes.FIELD("f_localtimestamp_millis",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+ DataTypes.FIELD("f_localtimestamp_micros",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))
+ );
+ // convert to avro schema
+ HoodieSchema schema =
HoodieSchemaConverter.convertToSchema(dataType.getLogicalType());
+ final String expectedSchema = ""
+ + "[ \"null\", {\n"
+ + " \"type\" : \"record\",\n"
+ + " \"name\" : \"record\",\n"
+ + " \"fields\" : [ {\n"
+ + " \"name\" : \"f_localtimestamp_millis\",\n"
+ + " \"type\" : [ \"null\", {\n"
+ + " \"type\" : \"long\",\n"
+ + " \"logicalType\" : \"local-timestamp-millis\"\n"
+ + " } ],\n"
+ + " \"default\" : null\n"
+ + " }, {\n"
+ + " \"name\" : \"f_localtimestamp_micros\",\n"
+ + " \"type\" : [ \"null\", {\n"
+ + " \"type\" : \"long\",\n"
+ + " \"logicalType\" : \"local-timestamp-micros\"\n"
+ + " } ],\n"
+ + " \"default\" : null\n"
+ + " } ]\n"
+ + "} ]";
+ assertEquals(expectedSchema, schema.toString(true));
+ // convert it back
+ DataType convertedDataType =
HoodieSchemaConverter.convertToDataType(schema);
+ final String expectedDataType = "ROW<"
+ + "`f_localtimestamp_millis` TIMESTAMP_LTZ(3), "
+ + "`f_localtimestamp_micros` TIMESTAMP_LTZ(6)>";
+ assertEquals(expectedDataType, convertedDataType.toString());
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java
index 1188c241990c..f4fdf1dbc6e1 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java
@@ -19,8 +19,9 @@
package org.apache.hudi.source.stats;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.metadata.HoodieMetadataPayload;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
@@ -53,7 +54,7 @@ public class ColumnStatsSchemas {
public static final int ORD_COL_NAME = 5;
private static DataType getMetadataDataType() {
- return AvroSchemaConverter.convertToDataType(HoodieMetadataRecord.SCHEMA$);
+ return
HoodieSchemaConverter.convertToDataType(HoodieSchema.fromAvroSchema(HoodieMetadataRecord.SCHEMA$));
}
private static DataType getColStatsDataType() {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 3ca968dc3dbf..87006a03d630 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -18,8 +18,8 @@
package org.apache.hudi.table;
-import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieIndexConfig;
@@ -31,8 +31,8 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.SerializableSchema;
import org.apache.hudi.util.StreamerUtil;
@@ -434,9 +434,9 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
* @param rowType The specified table row type
*/
private static void inferAvroSchema(Configuration conf, LogicalType rowType)
{
- if (!conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()
- && !conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) {
- String inferredSchema = AvroSchemaConverter.convertToSchema(rowType,
AvroSchemaUtils.getAvroRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME))).toString();
+ if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isEmpty()
+ && conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isEmpty()) {
+ String inferredSchema = HoodieSchemaConverter.convertToSchema(rowType,
HoodieSchemaUtils.getRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME))).toString();
conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, inferredSchema);
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index b9b4c2fff34d..b1bba78f0b07 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -544,7 +544,7 @@ public class HoodieTableSource implements
return CdcInputFormat.builder()
.config(this.conf)
.tableState(hoodieTableState)
- // use the explicit fields' data type because the AvroSchemaConverter
+ // use the explicit fields' data type because the HoodieSchemaConverter
// is not very stable.
.fieldTypes(rowDataType.getChildren())
.predicates(this.predicates)
@@ -569,7 +569,7 @@ public class HoodieTableSource implements
return MergeOnReadInputFormat.builder()
.config(this.conf)
.tableState(hoodieTableState)
- // use the explicit fields' data type because the AvroSchemaConverter
+ // use the explicit fields' data type because the HoodieSchemaConverter
// is not very stable.
.fieldTypes(rowDataType.getChildren())
.predicates(this.predicates)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index 10bdaa7bdf79..b7fbe94fb76d 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.catalog;
import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.CollectionUtils;
@@ -29,13 +30,12 @@ import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
-import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.CatalogUtils;
import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.DataTypes;
@@ -259,12 +259,12 @@ public class HoodieCatalog extends AbstractCatalog {
final String path = inferTablePath(catalogPathStr, tablePath);
Map<String, String> options =
TableOptionProperties.loadFromProperties(path, hadoopConf);
- final Schema latestSchema = getLatestTableSchema(path);
+ final HoodieSchema latestSchema = getLatestTableSchema(path);
if (latestSchema != null) {
List<String> pkColumns = TableOptionProperties.getPkColumns(options);
// if the table is initialized from spark, the write schema is nullable
for pk columns.
DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable(
- AvroSchemaConverter.convertToDataType(latestSchema), pkColumns);
+ HoodieSchemaConverter.convertToDataType(latestSchema), pkColumns);
org.apache.flink.table.api.Schema.Builder builder =
org.apache.flink.table.api.Schema.newBuilder()
.fromRowDataType(tableDataType);
final String pkConstraintName =
TableOptionProperties.getPkConstraintName(options);
@@ -316,7 +316,7 @@ public class HoodieCatalog extends AbstractCatalog {
if (!resolvedSchema.getPrimaryKey().isPresent() &&
!conf.containsKey(RECORD_KEY_FIELD.key())) {
throw new CatalogException("Primary key definition is missing");
}
- final String avroSchema = AvroSchemaConverter.convertToSchema(
+ final String avroSchema = HoodieSchemaConverter.convertToSchema(
resolvedSchema.toPhysicalRowDataType().getLogicalType(),
AvroSchemaUtils.getAvroRecordQualifiedName(tablePath.getObjectName())).toString();
conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
@@ -592,11 +592,11 @@ public class HoodieCatalog extends AbstractCatalog {
throw new UnsupportedOperationException("alterPartitionColumnStatistics is
not implemented.");
}
- private @Nullable Schema getLatestTableSchema(String path) {
+ private @Nullable HoodieSchema getLatestTableSchema(String path) {
if (path != null && StreamerUtil.tableExists(path, hadoopConf)) {
try {
HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(path,
hadoopConf);
- return new TableSchemaResolver(metaClient).getTableAvroSchema(false);
// change log mode is not supported now
+ return new TableSchemaResolver(metaClient).getTableSchema(false); //
change log mode is not supported now
} catch (Throwable throwable) {
log.warn("Failed to resolve the latest table schema.", throwable);
// ignored
@@ -616,7 +616,7 @@ public class HoodieCatalog extends AbstractCatalog {
private void refreshTableProperties(ObjectPath tablePath, CatalogBaseTable
newCatalogTable) {
Map<String, String> options = newCatalogTable.getOptions();
ResolvedCatalogTable resolvedTable = (ResolvedCatalogTable)
newCatalogTable;
- final String avroSchema = AvroSchemaConverter.convertToSchema(
+ final String avroSchema = HoodieSchemaConverter.convertToSchema(
resolvedTable.getResolvedSchema().toPhysicalRowDataType().getLogicalType(),
AvroSchemaUtils.getAvroRecordQualifiedName(tablePath.getObjectName())).toString();
options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), avroSchema);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
index 3b8aaf48d12b..ab54f3b393e0 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
@@ -22,7 +22,6 @@ import org.apache.hudi.adapter.Utils;
import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.schema.HoodieSchema;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.configuration.FlinkOptions;
@@ -31,8 +30,8 @@ import org.apache.hudi.exception.HoodieCatalogException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.Type;
import org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
-import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import lombok.extern.slf4j.Slf4j;
@@ -232,7 +231,7 @@ public class HoodieCatalogUtil {
HoodieFlinkWriteClient<?> writeClient = createWriteClient(tablePath,
oldTable, hadoopConf, inferTablePathFunc);
Pair<InternalSchema, HoodieTableMetaClient> pair =
writeClient.getInternalSchemaAndMetaClient();
InternalSchema oldSchema = pair.getLeft();
- Function<LogicalType, Type> convertFunc = (LogicalType logicalType) ->
InternalSchemaConverter.convertToField(HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(logicalType)));
+ Function<LogicalType, Type> convertFunc = (LogicalType logicalType) ->
InternalSchemaConverter.convertToField(HoodieSchemaConverter.convertToSchema(logicalType));
InternalSchema newSchema = Utils.applyTableChange(oldSchema,
tableChanges, convertFunc);
if (!oldSchema.equals(newSchema)) {
writeClient.setOperationType(WriteOperationType.ALTER_SCHEMA);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
index 0c1674247a27..b2f5d428cdaf 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
@@ -27,8 +27,8 @@ import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
-import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.table.catalog.CatalogTable;
@@ -203,7 +203,7 @@ public class TableOptionProperties {
List<String> partitionKeys,
boolean withOperationField) {
RowType rowType =
supplementMetaFields(DataTypeUtils.toRowType(catalogTable.getUnresolvedSchema()),
withOperationField);
- HoodieSchema schema =
HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(rowType));
+ HoodieSchema schema = HoodieSchemaConverter.convertToSchema(rowType);
String sparkVersion =
catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
Map<String, String> sparkTableProperties =
SparkDataSourceTableUtils.getSparkTableProperties(
partitionKeys,
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 8310dbe4f52a..991f099d8981 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -47,7 +47,7 @@ import org.apache.hudi.storage.StorageConfiguration;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.Lazy;
import org.apache.hudi.util.RecordKeyToRowDataConverter;
-import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.util.RowDataQueryContexts;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.JoinedRowData;
@@ -106,7 +106,7 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
(HoodieRowDataParquetReader) HoodieIOFactory.getIOFactory(storage)
.getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
.getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET,
Option.empty());
- DataType rowType =
RowDataAvroQueryContexts.fromAvroSchema(dataSchema.toAvroSchema()).getRowType();
+ DataType rowType =
RowDataQueryContexts.fromSchema(dataSchema).getRowType();
return rowDataParquetReader.getRowDataIterator(schemaManager, rowType,
requiredSchema, getSafePredicates(requiredSchema));
}
@@ -203,7 +203,7 @@ public class FlinkRowDataReaderContext extends
HoodieReaderContext<RowData> {
// For e.g, if the pk fields are [a, b] but user only select a, then the pk
// semantics is lost.
RecordKeyToRowDataConverter recordKeyRowConverter = new
RecordKeyToRowDataConverter(
- pkFieldsPos, (RowType)
RowDataAvroQueryContexts.fromAvroSchema(requiredSchema.toAvroSchema()).getRowType().getLogicalType());
+ pkFieldsPos, (RowType)
RowDataQueryContexts.fromSchema(requiredSchema).getRowType().getLogicalType());
((FlinkRecordContext)
recordContext).setRecordKeyRowConverter(recordKeyRowConverter);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
index 76d45a390723..93b79750a21e 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
@@ -37,7 +37,7 @@ import
org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
import org.apache.hudi.storage.HoodieStorageUtils;
import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
import lombok.Getter;
import org.apache.flink.table.types.DataType;
@@ -157,8 +157,8 @@ public class InternalSchemaManager implements Serializable {
}
List<Integer> selectedFieldList =
IntStream.of(selectedFields).boxed().collect(Collectors.toList());
// mergeSchema is built with useColumnTypeFromFileSchema = true
- List<DataType> mergeSchemaAsDataTypes =
AvroSchemaConverter.convertToDataType(
- InternalSchemaConverter.convert(mergeSchema,
"tableName").toAvroSchema()).getChildren();
+ List<DataType> mergeSchemaAsDataTypes =
HoodieSchemaConverter.convertToDataType(
+ InternalSchemaConverter.convert(mergeSchema,
"tableName")).getChildren();
DataType[] fileFieldTypes = new DataType[queryFieldTypes.length];
for (int i = 0; i < queryFieldTypes.length; i++) {
// position of ChangedType in querySchema
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index cc9d8dbfa70c..745af94485b1 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -141,7 +141,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
try {
// get full schema iterator.
final HoodieSchema schema = HoodieSchemaCache.intern(
- HoodieSchema.parse(tableState.getAvroSchema()));
+ HoodieSchema.parse(tableState.getTableSchema()));
// before/after images have assumption of snapshot scan, so `emitDelete`
is set as false
return getSplitRowIterator(split, schema, schema,
FlinkOptions.REALTIME_PAYLOAD_COMBINE, false);
} catch (IOException e) {
@@ -181,7 +181,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
MergeOnReadInputSplit inputSplit = fileSlice2Split(tablePath,
fileSlice, maxCompactionMemoryInBytes);
return new RemoveBaseFileIterator(tableState,
getFileSliceIterator(inputSplit));
case AS_IS:
- HoodieSchema dataSchema =
HoodieSchemaUtils.removeMetadataFields(HoodieSchema.parse(tableState.getAvroSchema()));
+ HoodieSchema dataSchema =
HoodieSchemaUtils.removeMetadataFields(HoodieSchema.parse(tableState.getTableSchema()));
HoodieSchema cdcSchema =
HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
switch (mode) {
case DATA_BEFORE_AFTER:
@@ -216,7 +216,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
*/
private ClosableIterator<HoodieRecord<RowData>>
getSplitRecordIterator(MergeOnReadInputSplit split) throws IOException {
final HoodieSchema schema = HoodieSchemaCache.intern(
- HoodieSchema.parse(tableState.getAvroSchema()));
+ HoodieSchema.parse(tableState.getTableSchema()));
HoodieFileGroupReader<RowData> fileGroupReader =
createFileGroupReader(split, schema, schema,
FlinkOptions.REALTIME_PAYLOAD_COMBINE, true);
return fileGroupReader.getClosableHoodieRecordIterator();
@@ -363,7 +363,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
MergeOnReadTableState tableState,
ClosableIterator<HoodieRecord<RowData>> logRecordIterator,
HoodieTableMetaClient metaClient) throws IOException {
- this.tableSchema = HoodieSchema.parse(tableState.getAvroSchema());
+ this.tableSchema = HoodieSchema.parse(tableState.getTableSchema());
this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
this.imageManager = imageManager;
this.projection =
tableState.getRequiredRowType().equals(tableState.getRowType())
@@ -495,8 +495,8 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
MergeOnReadTableState tableState,
HoodieSchema cdcSchema,
HoodieCDCFileSplit fileSplit) {
- this.requiredSchema =
HoodieSchema.parse(tableState.getRequiredAvroSchema());
- this.requiredPos = getRequiredPos(tableState.getAvroSchema(),
this.requiredSchema);
+ this.requiredSchema = HoodieSchema.parse(tableState.getRequiredSchema());
+ this.requiredPos = getRequiredPos(tableState.getTableSchema(),
this.requiredSchema);
this.recordBuilder = new
GenericRecordBuilder(requiredSchema.getAvroSchema());
this.avroToRowDataConverter =
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
StoragePath hadoopTablePath = new StoragePath(tablePath);
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 165cc6a7d57a..1fd6b3491128 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -190,9 +190,9 @@ public class MergeOnReadInputFormat
+ "flink partition Index: " + split.getSplitNumber()
+ "merge type: " + split.getMergeType());
final HoodieSchema tableSchema = HoodieSchemaCache.intern(
- HoodieSchema.parse(tableState.getAvroSchema()));
+ HoodieSchema.parse(tableState.getTableSchema()));
final HoodieSchema requiredSchema = HoodieSchemaCache.intern(
- HoodieSchema.parse(tableState.getRequiredAvroSchema()));
+ HoodieSchema.parse(tableState.getRequiredSchema()));
return getSplitRowIterator(split, tableSchema, requiredSchema, mergeType,
emitDelete);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index e5e7e99dc24b..868a7f814aba 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -36,21 +36,21 @@ public class MergeOnReadTableState implements Serializable {
private final RowType rowType;
private final RowType requiredRowType;
- private final String avroSchema;
- private final String requiredAvroSchema;
+ private final String tableSchema;
+ private final String requiredSchema;
private final List<MergeOnReadInputSplit> inputSplits;
private final int operationPos;
public MergeOnReadTableState(
RowType rowType,
RowType requiredRowType,
- String avroSchema,
- String requiredAvroSchema,
+ String tableSchema,
+ String requiredSchema,
List<MergeOnReadInputSplit> inputSplits) {
this.rowType = rowType;
this.requiredRowType = requiredRowType;
- this.avroSchema = avroSchema;
- this.requiredAvroSchema = requiredAvroSchema;
+ this.tableSchema = tableSchema;
+ this.requiredSchema = requiredSchema;
this.inputSplits = inputSplits;
this.operationPos =
rowType.getFieldIndex(HoodieRecord.OPERATION_METADATA_FIELD);
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index 56082f107bb4..e540cd55b006 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -26,7 +26,7 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
import org.apache.hudi.table.format.mor.MergeOnReadTableState;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
import org.apache.hudi.utils.TestData;
@@ -253,13 +253,13 @@ public class TestStreamReadOperator {
} catch (Exception e) {
throw new HoodieException("Get table avro schema error", e);
}
- final DataType rowDataType =
AvroSchemaConverter.convertToDataType(tableSchema.getAvroSchema());
+ final DataType rowDataType =
HoodieSchemaConverter.convertToDataType(tableSchema);
final RowType rowType = (RowType) rowDataType.getLogicalType();
final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
rowType,
TestConfigurations.ROW_TYPE,
tableSchema.toString(),
-
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
+
HoodieSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
Collections.emptyList());
MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder()
.config(conf)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index cebc24651ec2..1598b4d7ed89 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -40,8 +40,8 @@ import org.apache.hudi.sink.compact.CompactOperator;
import org.apache.hudi.sink.compact.CompactionCommitEvent;
import org.apache.hudi.sink.compact.CompactionCommitSink;
import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
-import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.utils.FlinkMiniCluster;
import lombok.extern.slf4j.Slf4j;
@@ -313,7 +313,7 @@ public class ITTestSchemaEvolution {
private void writeTableWithSchema2(TableOptions tableOptions) throws
ExecutionException, InterruptedException {
tableOptions.withOption(
FlinkOptions.SOURCE_AVRO_SCHEMA.key(),
- AvroSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_AFTER,
"hoodie.t1.t1_record"));
+ HoodieSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_AFTER,
"hoodie.t1.t1_record"));
//language=SQL
tEnv.executeSql("drop table t1");
@@ -384,7 +384,7 @@ public class ITTestSchemaEvolution {
KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition",
KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), true,
FlinkOptions.WRITE_BATCH_SIZE.key(), 0.000001, // each record triggers
flush
- FlinkOptions.SOURCE_AVRO_SCHEMA.key(),
AvroSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_BEFORE),
+ FlinkOptions.SOURCE_AVRO_SCHEMA.key(),
HoodieSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_BEFORE),
FlinkOptions.READ_TASKS.key(), 1,
FlinkOptions.WRITE_TASKS.key(), 1,
FlinkOptions.INDEX_BOOTSTRAP_TASKS.key(), 1,
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index f7ff64d3dfcb..45a9b8ecab0b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -49,7 +49,7 @@ import org.apache.hudi.table.format.FlinkRowDataReaderContext;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.util.AvroToRowDataConverters;
import org.apache.hudi.util.HoodieSchemaConverter;
-import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.util.RowDataQueryContexts;
import org.apache.hudi.utils.TestData;
import org.apache.flink.configuration.Configuration;
@@ -131,7 +131,7 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
HoodieSchema recordSchema,
HoodieReaderContext<RowData> readerContext,
boolean sortOutput) throws IOException {
- RowDataSerializer rowDataSerializer =
RowDataAvroQueryContexts.getRowDataSerializer(recordSchema.toAvroSchema());
+ RowDataSerializer rowDataSerializer =
RowDataQueryContexts.getRowDataSerializer(recordSchema);
try (ClosableIterator<RowData> iterator =
fileGroupReader.getClosableIterator()) {
while (iterator.hasNext()) {
RowData rowData = rowDataSerializer.copy(iterator.next());
@@ -148,7 +148,7 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
HoodieSchema localSchema = getRecordSchema(schemaStr);
conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, localSchema.toString());
AvroToRowDataConverters.AvroToRowDataConverter avroConverter =
-
RowDataAvroQueryContexts.fromAvroSchema(localSchema.getAvroSchema()).getAvroToRowDataConverter();
+
RowDataQueryContexts.fromSchema(localSchema).getAvroToRowDataConverter();
List<RowData> rowDataList = recordList.stream().map(record -> {
try {
return (RowData)
avroConverter.convert(record.toIndexedRecord(localSchema,
CollectionUtils.emptyProps()).get().getData());
@@ -169,7 +169,7 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
TestData.assertRowDataEquals(
Collections.singletonList(actual),
Collections.singletonList(expected),
-
RowDataAvroQueryContexts.fromAvroSchema(schema.toAvroSchema()).getRowType());
+ RowDataQueryContexts.fromSchema(schema).getRowType());
}
@Override
@@ -324,7 +324,7 @@ public class TestHoodieFileGroupReaderOnFlink extends
TestHoodieFileGroupReaderB
}
private static HoodieSchema getRecordSchema(String schemaStr) {
- HoodieSchema recordSchema = new HoodieSchema.Parser().parse(schemaStr);
- return
HoodieSchemaConverter.convertToSchema(RowDataAvroQueryContexts.fromAvroSchema(recordSchema.getAvroSchema()).getRowType().getLogicalType());
+ HoodieSchema recordSchema = HoodieSchema.parse(schemaStr);
+ return
HoodieSchemaConverter.convertToSchema(RowDataQueryContexts.fromSchema(recordSchema).getRowType().getLogicalType());
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index a4a69706e4fe..adce67c00f30 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -28,7 +28,7 @@ import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.CatalogUtils;
import org.apache.hudi.utils.SchemaBuilder;
@@ -364,7 +364,7 @@ public class TestHoodieTableFactory {
final HoodieTableSink tableSink3 =
(HoodieTableSink) new
HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf,
schema3, ""));
final Configuration conf3 = tableSink3.getConf();
- final String expected =
AvroSchemaConverter.convertToSchema(schema3.toSourceRowDataType().getLogicalType(),
AvroSchemaUtils.getAvroRecordQualifiedName("t1")).toString();
+ final String expected =
HoodieSchemaConverter.convertToSchema(schema3.toSourceRowDataType().getLogicalType(),
AvroSchemaUtils.getAvroRecordQualifiedName("t1")).toString();
assertThat(conf3.get(FlinkOptions.SOURCE_AVRO_SCHEMA), is(expected));
}
@@ -565,7 +565,7 @@ public class TestHoodieTableFactory {
final HoodieTableSink tableSink3 =
(HoodieTableSink) new
HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf,
schema3, ""));
final Configuration conf3 = tableSink3.getConf();
- final String expected =
AvroSchemaConverter.convertToSchema(schema3.toSinkRowDataType().getLogicalType(),
AvroSchemaUtils.getAvroRecordQualifiedName("t1")).toString();
+ final String expected =
HoodieSchemaConverter.convertToSchema(schema3.toSinkRowDataType().getLogicalType(),
AvroSchemaUtils.getAvroRecordQualifiedName("t1")).toString();
assertThat(conf3.get(FlinkOptions.SOURCE_AVRO_SCHEMA), is(expected));
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index fc4694bd3a6f..8431a8e0490b 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -50,8 +50,8 @@ import org.apache.hudi.table.format.cdc.CdcInputFormat;
import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
-import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.SerializableSchema;
import org.apache.hudi.util.StreamerUtil;
import org.apache.hudi.utils.TestConfigurations;
@@ -1207,7 +1207,7 @@ public class TestInputFormat {
void testReadWithWiderSchema(HoodieTableType tableType) throws Exception {
Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(),
-
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_WIDER).toString());
+
HoodieSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_WIDER).toString());
beforeEach(tableType, options);
TestData.writeData(TestData.DATA_SET_INSERT, conf);
@@ -1223,7 +1223,7 @@ public class TestInputFormat {
conf.set(FlinkOptions.TABLE_NAME, "TestHoodieTable");
conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
conf.set(FlinkOptions.PARTITION_PATH_FIELD, "partition");
- conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA.key(),
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_DECIMAL_ORDERING).toString());
+ conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA.key(),
HoodieSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_DECIMAL_ORDERING).toString());
conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // by default
close the async compaction
StreamerUtil.initTableIfNotExists(conf);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
deleted file mode 100644
index afe75e061566..000000000000
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utils;
-
-import org.apache.hudi.avro.model.HoodieMetadataRecord;
-import org.apache.hudi.common.schema.HoodieSchema;
-import org.apache.hudi.metadata.HoodieMetadataPayload;
-import org.apache.hudi.util.AvroSchemaConverter;
-
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.types.DataType;
-import org.junit.jupiter.api.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-/**
- * Test cases for {@link org.apache.hudi.util.AvroSchemaConverter}.
- */
-public class TestAvroSchemaConverter {
-
- @Test
- void testUnionSchemaWithMultipleRecordTypes() {
- HoodieSchema schema =
HoodieSchema.fromAvroSchema(HoodieMetadataRecord.SCHEMA$);
- DataType dataType =
AvroSchemaConverter.convertToDataType(schema.getAvroSchema());
- int pos =
HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos();
- final String expected = "ROW<"
- + "`fileName` STRING, "
- + "`columnName` STRING, "
- + "`minValue` ROW<`wrapper` RAW('java.lang.Object', ?) NOT NULL>, "
- + "`maxValue` ROW<`wrapper` RAW('java.lang.Object', ?) NOT NULL>, "
- + "`valueCount` BIGINT, "
- + "`nullCount` BIGINT, "
- + "`totalSize` BIGINT, "
- + "`totalUncompressedSize` BIGINT, "
- + "`isDeleted` BOOLEAN NOT NULL, "
- + "`isTightBound` BOOLEAN NOT NULL, "
- + "`valueType` ROW<`typeOrdinal` INT NOT NULL, `additionalInfo`
STRING>>";
- assertThat(dataType.getChildren().get(pos).toString(), is(expected));
- }
-
- @Test
- void testLocalTimestampType() {
- DataType dataType = DataTypes.ROW(
- DataTypes.FIELD("f_localtimestamp_millis",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
- DataTypes.FIELD("f_localtimestamp_micros",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))
- );
- // convert to avro schema
- HoodieSchema schema =
HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(dataType.getLogicalType()));
- final String expectedSchema = ""
- + "[ \"null\", {\n"
- + " \"type\" : \"record\",\n"
- + " \"name\" : \"record\",\n"
- + " \"fields\" : [ {\n"
- + " \"name\" : \"f_localtimestamp_millis\",\n"
- + " \"type\" : [ \"null\", {\n"
- + " \"type\" : \"long\",\n"
- + " \"logicalType\" : \"local-timestamp-millis\"\n"
- + " } ],\n"
- + " \"default\" : null\n"
- + " }, {\n"
- + " \"name\" : \"f_localtimestamp_micros\",\n"
- + " \"type\" : [ \"null\", {\n"
- + " \"type\" : \"long\",\n"
- + " \"logicalType\" : \"local-timestamp-micros\"\n"
- + " } ],\n"
- + " \"default\" : null\n"
- + " } ]\n"
- + "} ]";
- assertThat(schema.toString(true), is(expectedSchema));
- // convert it back
- DataType convertedDataType =
AvroSchemaConverter.convertToDataType(schema.getAvroSchema());
- final String expectedDataType = "ROW<"
- + "`f_localtimestamp_millis` TIMESTAMP_LTZ(3), "
- + "`f_localtimestamp_micros` TIMESTAMP_LTZ(6)>";
- assertThat(convertedDataType.toString(), is(expectedDataType));
- }
-}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 2430a1404529..46c90afa7a10 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -21,8 +21,8 @@ package org.apache.hudi.utils;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.streamer.FlinkStreamerConfig;
-import org.apache.hudi.util.AvroSchemaConverter;
import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.utils.factory.CollectSinkTableFactory;
import org.apache.hudi.utils.factory.ContinuousFileSourceFactory;
@@ -374,7 +374,7 @@ public class TestConfigurations {
public static Configuration getDefaultConf(String tablePath, DataType
dataType) {
Configuration conf = new Configuration();
conf.set(FlinkOptions.PATH, tablePath);
- conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA,
AvroSchemaConverter.convertToSchema(dataType.getLogicalType()).toString());
+ conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA,
HoodieSchemaConverter.convertToSchema(dataType.getLogicalType()).toString());
conf.set(FlinkOptions.TABLE_NAME, "TestHoodieTable");
conf.set(FlinkOptions.PARTITION_PATH_FIELD, "partition");
return conf;
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 9020073f2a37..8dfbd1d724a9 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -37,18 +37,17 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.sink.utils.BucketStreamWriteFunctionWrapper;
-import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
import org.apache.hudi.sink.utils.BulkInsertFunctionWrapper;
import org.apache.hudi.sink.utils.ConsistentBucketStreamWriteFunctionWrapper;
import org.apache.hudi.sink.utils.InsertFunctionWrapper;
+import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
import org.apache.hudi.sink.utils.TestFunctionWrapper;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.format.FormatUtils;
import org.apache.hudi.table.format.InternalSchemaManager;
import org.apache.hudi.util.AvroToRowDataConverters;
-import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.util.RowDataQueryContexts;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -977,8 +976,7 @@ public class TestData {
HoodieTableMetaClient metaClient = createMetaClient(basePath);
HoodieFlinkTable<?> table = HoodieFlinkTable.create(config,
HoodieFlinkEngineContext.DEFAULT, metaClient);
- Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
- HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(schema);
+ HoodieSchema schema = new TableSchemaResolver(metaClient).getTableSchema();
String latestInstant =
metaClient.getActiveTimeline().filterCompletedInstants()
.lastInstant().map(HoodieInstant::requestedTime).orElse(null);
@@ -993,7 +991,7 @@ public class TestData {
List<String> readBuffer = new ArrayList<>();
List<FileSlice> fileSlices =
table.getSliceView().getLatestMergedFileSlicesBeforeOrOn(partitionDir.getName(),
latestInstant).collect(Collectors.toList());
for (FileSlice fileSlice : fileSlices) {
- try (ClosableIterator<RowData> rowIterator =
getRecordIterator(fileSlice, hoodieSchema, metaClient, config)) {
+ try (ClosableIterator<RowData> rowIterator =
getRecordIterator(fileSlice, schema, metaClient, config)) {
while (rowIterator.hasNext()) {
RowData rowData = rowIterator.next();
readBuffer.add(filterOutVariables(schema, rowData));
@@ -1048,8 +1046,8 @@ public class TestData {
return String.join(",", fields);
}
- private static String filterOutVariables(Schema schema, RowData record) {
- RowDataAvroQueryContexts.RowDataQueryContext queryContext =
RowDataAvroQueryContexts.fromAvroSchema(schema);
+ private static String filterOutVariables(HoodieSchema schema, RowData
record) {
+ RowDataQueryContexts.RowDataQueryContext queryContext =
RowDataQueryContexts.fromSchema(schema);
List<String> fields = new ArrayList<>();
fields.add(getFieldValue(queryContext, record, "_hoodie_record_key"));
fields.add(getFieldValue(queryContext, record, "_hoodie_partition_path"));
@@ -1061,7 +1059,7 @@ public class TestData {
return String.join(",", fields);
}
- private static String
getFieldValue(RowDataAvroQueryContexts.RowDataQueryContext queryContext,
RowData rowData, String fieldName) {
+ private static String getFieldValue(RowDataQueryContexts.RowDataQueryContext
queryContext, RowData rowData, String fieldName) {
return
String.valueOf(queryContext.getFieldQueryContext(fieldName).getValAsJava(rowData,
true));
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRecordKeyToRowDataConverter.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRecordKeyToRowDataConverter.java
index 9fa2bd3aa3b8..25967f2246ef 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRecordKeyToRowDataConverter.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRecordKeyToRowDataConverter.java
@@ -19,9 +19,9 @@
package org.apache.hudi.utils;
import org.apache.hudi.keygen.KeyGenUtils;
-import org.apache.hudi.util.AvroSchemaConverter;
-import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.RecordKeyToRowDataConverter;
+import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.table.api.DataTypes;
@@ -103,7 +103,7 @@ public class TestRecordKeyToRowDataConverter {
RowDataToAvroConverters.RowDataToAvroConverter converter =
RowDataToAvroConverters.createConverter(rowType);
GenericRecord avroRecord =
- (GenericRecord)
converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData);
+ (GenericRecord)
converter.convert(HoodieSchemaConverter.convertToSchema(rowType), rowData);
RecordKeyToRowDataConverter keyToRowDataConverter =
new RecordKeyToRowDataConverter(new int[]{0, 1, 2, 3, 4, 5, 6},
rowType);
final String recordKey = KeyGenUtils.getRecordKey(avroRecord,
rowType.getFieldNames(), false);
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
index 471c0686b60f..185c5830616c 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
@@ -18,7 +18,7 @@
package org.apache.hudi.utils;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
import org.apache.hudi.util.RowDataToAvroConverters;
import org.apache.avro.generic.GenericRecord;
@@ -58,7 +58,7 @@ class TestRowDataToAvroConverters {
RowDataToAvroConverters.RowDataToAvroConverter converter =
RowDataToAvroConverters.createConverter(rowType, false);
GenericRecord avroRecord =
- (GenericRecord)
converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData);
+ (GenericRecord)
converter.convert(HoodieSchemaConverter.convertToSchema(rowType), rowData);
Assertions.assertEquals(timestampFromLocal,
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long)
avroRecord.get(0)), ZoneId.systemDefault())));
}
@@ -76,7 +76,7 @@ class TestRowDataToAvroConverters {
RowDataToAvroConverters.RowDataToAvroConverter converter =
RowDataToAvroConverters.createConverter(rowType);
GenericRecord avroRecord =
- (GenericRecord)
converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData);
+ (GenericRecord)
converter.convert(HoodieSchemaConverter.convertToSchema(rowType), rowData);
Assertions.assertEquals(timestampFromUtc0,
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long)
avroRecord.get(0)), ZoneId.of("UTC"))));
Assertions.assertEquals("2021-03-30 08:44:29",
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long)
avroRecord.get(0)), ZoneId.of("UTC+1"))));
Assertions.assertEquals("2021-03-30 15:44:29",
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long)
avroRecord.get(0)), ZoneId.of("Asia/Shanghai"))));