This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a0712c9d9117 fix: (HUDI-9119) Fix Avro SpecificRecord generation
process (#13990)
a0712c9d9117 is described below
commit a0712c9d9117c0881c4ad51d354bc09dc85be212
Author: Lin Liu <[email protected]>
AuthorDate: Sun Sep 28 21:32:39 2025 -0700
fix: (HUDI-9119) Fix Avro SpecificRecord generation process (#13990)
---------
Co-authored-by: Timothy Brown <[email protected]>
---
.../hudi/utils/TestMetadataConversionUtils.java | 4 +-
.../java/org/apache/hudi/avro/HoodieAvroUtils.java | 52 ++++++
.../common/table/log/block/HoodieDeleteBlock.java | 8 +-
.../table/timeline/TimelineMetadataUtils.java | 17 +-
.../org/apache/hudi/avro/TestHoodieAvroUtils.java | 182 +++++++++++++++++++++
5 files changed, 252 insertions(+), 11 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
index fb7790ef3304..01af9d3fc3ff 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utils;
import org.apache.hudi.avro.model.HoodieActionInstant;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.avro.model.HoodieClusteringGroup;
@@ -401,8 +402,9 @@ public class TestMetadataConversionUtils extends
HoodieCommonTestHarness {
}
private void createCleanMetadata(String instantTime) throws IOException {
+ HoodieCleanFileInfo fileInfo = new HoodieCleanFileInfo("file1", false);
HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new
HoodieActionInstant("", "", ""),
- "", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new
HashMap<>(), new ArrayList<>(), Collections.EMPTY_MAP);
+ "", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION,
Collections.singletonMap("key", Collections.singletonList(fileInfo)), new
ArrayList<>(), Collections.EMPTY_MAP);
HoodieCleanStat cleanStats = new HoodieCleanStat(
HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
HoodieTestUtils.DEFAULT_PARTITION_PATHS[new
Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
diff --git
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 5c73673736a1..61792e15f7b9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -56,6 +56,7 @@ import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificRecordBase;
import javax.annotation.Nullable;
@@ -1700,4 +1701,55 @@ public class HoodieAvroUtils {
}
return defaultValue;
}
+
+ public static <T extends SpecificRecordBase> T
convertToSpecificRecord(Class<T> clazz, GenericRecord genericRecord) {
+ SpecificData specificData = new SpecificData(clazz.getClassLoader());
+ return convertToSpecificRecord(clazz, genericRecord, specificData);
+ }
+
+ private static <T extends SpecificRecordBase> T
convertToSpecificRecord(Class<T> clazz, GenericRecord genericRecord,
SpecificData specificData) {
+ try {
+ if (genericRecord == null) {
+ return null;
+ }
+ T specificRecord = clazz.newInstance();
+ Schema schema = SpecificData.getForClass(clazz).getSchema(clazz);
+ for (Field field : schema.getFields()) {
+ Object value = genericRecord.get(field.pos());
+ if (value == null) {
+ specificRecord.put(field.pos(), null);
+ continue;
+ }
+ value = convertFieldToSpecificRecordValue(field.schema(), value,
specificData);
+ specificRecord.put(field.pos(), value);
+ }
+ return specificRecord;
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new HoodieException("Failed to convert to SpecificRecord " +
clazz.getName(), e);
+ }
+ }
+
+ private static Object convertFieldToSpecificRecordValue(Schema fieldSchema,
Object value, SpecificData specificData) {
+ Schema resolvedFieldSchema = getActualSchemaFromUnion(fieldSchema, value);
+ switch (resolvedFieldSchema.getType()) {
+ case RECORD:
+ value =
convertToSpecificRecord(specificData.getClass(resolvedFieldSchema),
(GenericRecord) value, specificData);
+ break;
+ case ARRAY:
+ value = ((List<?>) value).stream().map(element ->
convertFieldToSpecificRecordValue(resolvedFieldSchema.getElementType(),
element, specificData)).collect(Collectors.toList());
+ break;
+ case MAP:
+ value = ((Map<?, ?>)
value).entrySet().stream().collect(Collectors.toMap(
+ Map.Entry::getKey,
+ entry -> {
+ Object mapValue = entry.getValue();
+ return
convertFieldToSpecificRecordValue(resolvedFieldSchema.getValueType(), mapValue,
specificData);
+ }
+ ));
+ break;
+ default:
+ // no conversion required
+ }
+ return value;
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
index 8f648e8efc64..54e7f301e6a8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.log.block;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieDeleteRecord;
import org.apache.hudi.avro.model.HoodieDeleteRecordList;
import org.apache.hudi.common.fs.SizeAwareDataInputStream;
@@ -31,13 +32,14 @@ import org.apache.hudi.io.SeekableDataInputStream;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.util.Lazy;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -163,9 +165,9 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
} else if (version == 2) {
return SerializationUtils.deserialize(data);
} else {
- DatumReader<HoodieDeleteRecordList> reader = new
SpecificDatumReader<>(HoodieDeleteRecordList.class);
+ DatumReader<GenericRecord> reader = new
GenericDatumReader<>(HoodieDeleteRecordList.SCHEMA$);
BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, 0,
data.length, null);
- List<HoodieDeleteRecord> deleteRecordList = reader.read(null, decoder)
+ List<HoodieDeleteRecord> deleteRecordList =
HoodieAvroUtils.convertToSpecificRecord(HoodieDeleteRecordList.class,
reader.read(null, decoder))
.getDeleteRecordList();
return deleteRecordList.stream()
.map(record -> DeleteRecord.create(
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index ddcc8820fcf8..ff1939c2a38a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -18,6 +18,7 @@
package org.apache.hudi.common.table.timeline;
+import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieInstantInfo;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -35,9 +36,11 @@ import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableByteArrayInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecordBase;
@@ -123,18 +126,18 @@ public class TimelineMetadataUtils {
public static <T extends SpecificRecordBase> T
deserializeAvroMetadataLegacy(byte[] bytes, Class<T> clazz)
throws IOException {
- DatumReader<T> reader = new SpecificDatumReader<>(clazz);
- FileReader<T> fileReader = DataFileReader.openReader(new
SeekableByteArrayInput(bytes), reader);
+ DatumReader<GenericRecord> reader = new
GenericDatumReader<>(SpecificData.getForClass(clazz).getSchema(clazz));
+ FileReader<GenericRecord> fileReader = DataFileReader.openReader(new
SeekableByteArrayInput(bytes), reader);
ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize
metadata of type " + clazz);
- return fileReader.next();
+ return HoodieAvroUtils.convertToSpecificRecord(clazz, fileReader.next());
}
public static <T extends SpecificRecordBase> T
deserializeAvroMetadata(InputStream inputStream, Class<T> clazz)
throws IOException {
- DatumReader<T> reader = new SpecificDatumReader<>(clazz);
- try (DataFileStream<T> fileReader = new DataFileStream<>(inputStream,
reader)) {
+ DatumReader<GenericRecord> reader = new
GenericDatumReader<>(SpecificData.getForClass(clazz).getSchema(clazz));
+ try (DataFileStream<GenericRecord> fileReader = new
DataFileStream<>(inputStream, reader)) {
ValidationUtils.checkArgument(fileReader.hasNext(), "Could not
deserialize metadata of type " + clazz);
- return fileReader.next();
+ return HoodieAvroUtils.convertToSpecificRecord(clazz, fileReader.next());
}
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 4d5c3b84aa25..2371c5b2caed 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -24,6 +24,34 @@ import org.apache.hudi.avro.model.DateWrapper;
import org.apache.hudi.avro.model.DecimalWrapper;
import org.apache.hudi.avro.model.DoubleWrapper;
import org.apache.hudi.avro.model.FloatWrapper;
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieBootstrapFilePartitionInfo;
+import org.apache.hudi.avro.model.HoodieBootstrapIndexInfo;
+import org.apache.hudi.avro.model.HoodieBootstrapPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieCommitMetadata;
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieCompactionStrategy;
+import org.apache.hudi.avro.model.HoodieDeleteRecordList;
+import org.apache.hudi.avro.model.HoodieFSPermission;
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan;
+import org.apache.hudi.avro.model.HoodiePath;
+import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieWriteStat;
import org.apache.hudi.avro.model.IntWrapper;
import org.apache.hudi.avro.model.LocalDateWrapper;
import org.apache.hudi.avro.model.LongWrapper;
@@ -53,6 +81,9 @@ import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
import org.apache.avro.util.Utf8;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -1231,4 +1262,155 @@ public class TestHoodieAvroUtils {
boolean result =
HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema,
readerSchema);
assertEquals(expected, result);
}
+
+ /**
+ * Utility class for generating random GenericRecord instances.
+ */
+ private static class AvroTestUtils {
+ private static final Random RANDOM = new Random(42);
+
+ /**
+ * Generate a list of random GenericRecord instances
+ */
+ public static List<GenericRecord> generateRandomRecords(Schema schema, int
count) {
+ List<GenericRecord> records = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ records.add(generateRandomRecord(schema));
+ }
+ return records;
+ }
+
+ /**
+ * Generate a random GenericRecord for the given schema
+ */
+ public static GenericRecord generateRandomRecord(Schema schema) {
+ GenericRecord record = new GenericData.Record(schema);
+ for (Schema.Field field : schema.getFields()) {
+ Object value = generateRandomValue(field.schema(), field.defaultVal());
+ record.put(field.pos(), value);
+ }
+ return record;
+ }
+
+ /**
+ * Generate a random value for the given schema type
+ */
+ private static Object generateRandomValue(Schema schema, Object
defaultValue) {
+ // CASE 1: Handle default value
+ if (defaultValue != null
+ && !(defaultValue instanceof JsonProperties.Null)
+ && RANDOM.nextBoolean()) {
+ return defaultValue;
+ }
+ // Handle Union type.
+ Schema actualSchema = schema;
+ try {
+ actualSchema = resolveNullableSchema(schema);
+ } catch (Exception e) {
+ // If we can't resolve the schema, just use the original
+ // Op.
+ }
+ // CASE 2: Handle different types
+ switch (actualSchema.getType()) {
+ case NULL:
+ return null;
+ case BOOLEAN:
+ return RANDOM.nextBoolean();
+ case INT:
+ return RANDOM.nextInt(1000);
+ case LONG:
+ return RANDOM.nextLong() % 1000000L;
+ case FLOAT:
+ return RANDOM.nextFloat() * 100f;
+ case DOUBLE:
+ return RANDOM.nextDouble() * 1000.0;
+ case STRING:
+ return "test_string_" + RANDOM.nextInt(1000);
+ case BYTES:
+ byte[] bytes = new byte[RANDOM.nextInt(10) + 1];
+ RANDOM.nextBytes(bytes);
+ return ByteBuffer.wrap(bytes);
+ case RECORD:
+ return generateRandomRecord(actualSchema);
+ case ENUM:
+ List<String> symbols = actualSchema.getEnumSymbols();
+ return new GenericData.EnumSymbol(actualSchema,
symbols.get(RANDOM.nextInt(symbols.size())));
+ case ARRAY:
+ List<Object> array = new ArrayList<>();
+ int arraySize = RANDOM.nextInt(3) + 1;
+ for (int i = 0; i < arraySize; i++) {
+ array.add(generateRandomValue(actualSchema.getElementType(),
null));
+ }
+ return array;
+ case MAP:
+ Map<String, Object> map = new HashMap<>();
+ int mapSize = RANDOM.nextInt(3) + 1;
+ for (int i = 0; i < mapSize; i++) {
+ map.put("key_" + i,
generateRandomValue(actualSchema.getValueType(), null));
+ }
+ return map;
+ case FIXED:
+ byte[] fixedBytes = new byte[actualSchema.getFixedSize()];
+ RANDOM.nextBytes(fixedBytes);
+ return new GenericData.Fixed(actualSchema, fixedBytes);
+ default:
+ return null;
+ }
+ }
+ }
+
+ /**
+ * Test convertToSpecificRecord with multiple random records for each type
+ */
+ @ParameterizedTest
+ @MethodSource("provideAvroModelClasses")
+ void testConvertToSpecificRecordMultipleRecords(Class<? extends
SpecificRecord> recordClass) {
+ Schema schema = SpecificData.get().getSchema(recordClass);
+ List<GenericRecord> genericRecords =
AvroTestUtils.generateRandomRecords(schema, 3);
+ for (GenericRecord genericRecord : genericRecords) {
+ Class<? extends SpecificRecordBase> specificRecordBaseClass
+ = (Class<? extends SpecificRecordBase>) recordClass;
+ SpecificRecord specificRecord =
+ HoodieAvroUtils.convertToSpecificRecord(specificRecordBaseClass,
genericRecord);
+ assertEquals(recordClass, specificRecord.getClass());
+ GenericRecord copied = (GenericRecord)
GenericData.get().deepCopy(schema, specificRecord);
+ assertEquals(genericRecord, copied);
+ }
+ }
+
+ /**
+ * Provide all the Avro model classes to test
+ */
+ static Stream<Arguments> provideAvroModelClasses() {
+ return Stream.of(
+ Arguments.of(HoodieRollbackPartitionMetadata.class),
+ Arguments.of(HoodieSavepointPartitionMetadata.class),
+ Arguments.of(HoodieWriteStat.class),
+ Arguments.of(HoodieCleanPartitionMetadata.class),
+ Arguments.of(HoodieCleanFileInfo.class),
+ Arguments.of(HoodieActionInstant.class),
+ Arguments.of(HoodieCompactionStrategy.class),
+ Arguments.of(HoodieCompactionOperation.class),
+ Arguments.of(HoodieFSPermission.class),
+ Arguments.of(HoodiePath.class),
+ Arguments.of(HoodieFileStatus.class),
+ Arguments.of(HoodieBootstrapFilePartitionInfo.class),
+ Arguments.of(HoodieCompactionPlan.class),
+ Arguments.of(HoodieCleanerPlan.class),
+ Arguments.of(HoodieCleanMetadata.class),
+ Arguments.of(HoodieReplaceCommitMetadata.class),
+ Arguments.of(HoodieSavepointMetadata.class),
+ Arguments.of(HoodieMergeArchiveFilePlan.class),
+ Arguments.of(HoodieRollbackMetadata.class),
+ Arguments.of(HoodieBootstrapPartitionMetadata.class),
+ Arguments.of(HoodieBootstrapIndexInfo.class),
+ Arguments.of(HoodieIndexPlan.class),
+ Arguments.of(HoodieRequestedReplaceMetadata.class),
+ Arguments.of(HoodieRestoreMetadata.class),
+ Arguments.of(HoodieRestorePlan.class),
+ Arguments.of(HoodieRollbackPlan.class),
+ Arguments.of(HoodieDeleteRecordList.class),
+ Arguments.of(HoodieCommitMetadata.class)
+ );
+ }
}