This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a0712c9d9117 fix: (HUDI-9119) Fix Avro SpecificRecord generation 
process (#13990)
a0712c9d9117 is described below

commit a0712c9d9117c0881c4ad51d354bc09dc85be212
Author: Lin Liu <[email protected]>
AuthorDate: Sun Sep 28 21:32:39 2025 -0700

    fix: (HUDI-9119) Fix Avro SpecificRecord generation process (#13990)
    
    
    ---------
    
    Co-authored-by: Timothy Brown <[email protected]>
---
 .../hudi/utils/TestMetadataConversionUtils.java    |   4 +-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java |  52 ++++++
 .../common/table/log/block/HoodieDeleteBlock.java  |   8 +-
 .../table/timeline/TimelineMetadataUtils.java      |  17 +-
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  | 182 +++++++++++++++++++++
 5 files changed, 252 insertions(+), 11 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
index fb7790ef3304..01af9d3fc3ff 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/TestMetadataConversionUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utils;
 
 import org.apache.hudi.avro.model.HoodieActionInstant;
 import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
@@ -401,8 +402,9 @@ public class TestMetadataConversionUtils extends 
HoodieCommonTestHarness {
   }
 
   private void createCleanMetadata(String instantTime) throws IOException {
+    HoodieCleanFileInfo fileInfo = new HoodieCleanFileInfo("file1", false);
     HoodieCleanerPlan cleanerPlan = new HoodieCleanerPlan(new 
HoodieActionInstant("", "", ""),
-        "", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, new 
HashMap<>(), new ArrayList<>(), Collections.EMPTY_MAP);
+        "", "", new HashMap<>(), CleanPlanV2MigrationHandler.VERSION, 
Collections.singletonMap("key", Collections.singletonList(fileInfo)), new 
ArrayList<>(), Collections.EMPTY_MAP);
     HoodieCleanStat cleanStats = new HoodieCleanStat(
         HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS,
         HoodieTestUtils.DEFAULT_PARTITION_PATHS[new 
Random().nextInt(HoodieTestUtils.DEFAULT_PARTITION_PATHS.length)],
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 5c73673736a1..61792e15f7b9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -56,6 +56,7 @@ import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.io.JsonEncoder;
+import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificRecordBase;
 
 import javax.annotation.Nullable;
@@ -1700,4 +1701,55 @@ public class HoodieAvroUtils {
     }
     return defaultValue;
   }
+
+  public static <T extends SpecificRecordBase> T 
convertToSpecificRecord(Class<T> clazz, GenericRecord genericRecord) {
+    SpecificData specificData = new SpecificData(clazz.getClassLoader());
+    return convertToSpecificRecord(clazz, genericRecord, specificData);
+  }
+
+  private static <T extends SpecificRecordBase> T 
convertToSpecificRecord(Class<T> clazz, GenericRecord genericRecord, 
SpecificData specificData) {
+    try {
+      if (genericRecord == null) {
+        return null;
+      }
+      T specificRecord = clazz.newInstance();
+      Schema schema = SpecificData.getForClass(clazz).getSchema(clazz);
+      for (Field field : schema.getFields()) {
+        Object value = genericRecord.get(field.pos());
+        if (value == null) {
+          specificRecord.put(field.pos(), null);
+          continue;
+        }
+        value = convertFieldToSpecificRecordValue(field.schema(), value, 
specificData);
+        specificRecord.put(field.pos(), value);
+      }
+      return specificRecord;
+    } catch (InstantiationException | IllegalAccessException e) {
+      throw new HoodieException("Failed to convert to SpecificRecord " + 
clazz.getName(), e);
+    }
+  }
+
+  private static Object convertFieldToSpecificRecordValue(Schema fieldSchema, 
Object value, SpecificData specificData) {
+    Schema resolvedFieldSchema = getActualSchemaFromUnion(fieldSchema, value);
+    switch (resolvedFieldSchema.getType()) {
+      case RECORD:
+        value = 
convertToSpecificRecord(specificData.getClass(resolvedFieldSchema), 
(GenericRecord) value, specificData);
+        break;
+      case ARRAY:
+        value = ((List<?>) value).stream().map(element -> 
convertFieldToSpecificRecordValue(resolvedFieldSchema.getElementType(), 
element, specificData)).collect(Collectors.toList());
+        break;
+      case MAP:
+        value = ((Map<?, ?>) 
value).entrySet().stream().collect(Collectors.toMap(
+            Map.Entry::getKey,
+            entry -> {
+              Object mapValue = entry.getValue();
+              return 
convertFieldToSpecificRecordValue(resolvedFieldSchema.getValueType(), mapValue, 
specificData);
+            }
+        ));
+        break;
+      default:
+        // no conversion required
+    }
+    return value;
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
index 8f648e8efc64..54e7f301e6a8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.table.log.block;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieDeleteRecord;
 import org.apache.hudi.avro.model.HoodieDeleteRecordList;
 import org.apache.hudi.common.fs.SizeAwareDataInputStream;
@@ -31,13 +32,14 @@ import org.apache.hudi.io.SeekableDataInputStream;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.util.Lazy;
 
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.specific.SpecificDatumReader;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -163,9 +165,9 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
     } else if (version == 2) {
       return SerializationUtils.deserialize(data);
     } else {
-      DatumReader<HoodieDeleteRecordList> reader = new 
SpecificDatumReader<>(HoodieDeleteRecordList.class);
+      DatumReader<GenericRecord> reader = new 
GenericDatumReader<>(HoodieDeleteRecordList.SCHEMA$);
       BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, 0, 
data.length, null);
-      List<HoodieDeleteRecord> deleteRecordList = reader.read(null, decoder)
+      List<HoodieDeleteRecord> deleteRecordList = 
HoodieAvroUtils.convertToSpecificRecord(HoodieDeleteRecordList.class, 
reader.read(null, decoder))
           .getDeleteRecordList();
       return deleteRecordList.stream()
           .map(record -> DeleteRecord.create(
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
index ddcc8820fcf8..ff1939c2a38a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/TimelineMetadataUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.common.table.timeline;
 
+import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieInstantInfo;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -35,9 +36,11 @@ import org.apache.avro.file.DataFileStream;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.file.FileReader;
 import org.apache.avro.file.SeekableByteArrayInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DatumWriter;
-import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.specific.SpecificData;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.specific.SpecificRecordBase;
 
@@ -123,18 +126,18 @@ public class TimelineMetadataUtils {
 
   public static <T extends SpecificRecordBase> T 
deserializeAvroMetadataLegacy(byte[] bytes, Class<T> clazz)
       throws IOException {
-    DatumReader<T> reader = new SpecificDatumReader<>(clazz);
-    FileReader<T> fileReader = DataFileReader.openReader(new 
SeekableByteArrayInput(bytes), reader);
+    DatumReader<GenericRecord> reader = new 
GenericDatumReader<>(SpecificData.getForClass(clazz).getSchema(clazz));
+    FileReader<GenericRecord> fileReader = DataFileReader.openReader(new 
SeekableByteArrayInput(bytes), reader);
     ValidationUtils.checkArgument(fileReader.hasNext(), "Could not deserialize 
metadata of type " + clazz);
-    return fileReader.next();
+    return HoodieAvroUtils.convertToSpecificRecord(clazz, fileReader.next());
   }
 
   public static <T extends SpecificRecordBase> T 
deserializeAvroMetadata(InputStream inputStream, Class<T> clazz)
       throws IOException {
-    DatumReader<T> reader = new SpecificDatumReader<>(clazz);
-    try (DataFileStream<T> fileReader = new DataFileStream<>(inputStream, 
reader)) {
+    DatumReader<GenericRecord> reader = new 
GenericDatumReader<>(SpecificData.getForClass(clazz).getSchema(clazz));
+    try (DataFileStream<GenericRecord> fileReader = new 
DataFileStream<>(inputStream, reader)) {
       ValidationUtils.checkArgument(fileReader.hasNext(), "Could not 
deserialize metadata of type " + clazz);
-      return fileReader.next();
+      return HoodieAvroUtils.convertToSpecificRecord(clazz, fileReader.next());
     }
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index 4d5c3b84aa25..2371c5b2caed 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -24,6 +24,34 @@ import org.apache.hudi.avro.model.DateWrapper;
 import org.apache.hudi.avro.model.DecimalWrapper;
 import org.apache.hudi.avro.model.DoubleWrapper;
 import org.apache.hudi.avro.model.FloatWrapper;
+import org.apache.hudi.avro.model.HoodieActionInstant;
+import org.apache.hudi.avro.model.HoodieBootstrapFilePartitionInfo;
+import org.apache.hudi.avro.model.HoodieBootstrapIndexInfo;
+import org.apache.hudi.avro.model.HoodieBootstrapPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieCleanFileInfo;
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieCommitMetadata;
+import org.apache.hudi.avro.model.HoodieCompactionOperation;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.avro.model.HoodieCompactionStrategy;
+import org.apache.hudi.avro.model.HoodieDeleteRecordList;
+import org.apache.hudi.avro.model.HoodieFSPermission;
+import org.apache.hudi.avro.model.HoodieFileStatus;
+import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.avro.model.HoodieMergeArchiveFilePlan;
+import org.apache.hudi.avro.model.HoodiePath;
+import org.apache.hudi.avro.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
+import org.apache.hudi.avro.model.HoodieRestoreMetadata;
+import org.apache.hudi.avro.model.HoodieRestorePlan;
+import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieRollbackPlan;
+import org.apache.hudi.avro.model.HoodieSavepointMetadata;
+import org.apache.hudi.avro.model.HoodieSavepointPartitionMetadata;
+import org.apache.hudi.avro.model.HoodieWriteStat;
 import org.apache.hudi.avro.model.IntWrapper;
 import org.apache.hudi.avro.model.LocalDateWrapper;
 import org.apache.hudi.avro.model.LongWrapper;
@@ -53,6 +81,9 @@ import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.BinaryEncoder;
 import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.specific.SpecificData;
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.avro.util.Utf8;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -1231,4 +1262,155 @@ public class TestHoodieAvroUtils {
     boolean result = 
HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema, 
readerSchema);
     assertEquals(expected, result);
   }
+
+  /**
+   * Utility class for generating random GenericRecord instances.
+   */
+  private static class AvroTestUtils {
+    private static final Random RANDOM = new Random(42);
+
+    /**
+     * Generate a list of random GenericRecord instances
+     */
+    public static List<GenericRecord> generateRandomRecords(Schema schema, int 
count) {
+      List<GenericRecord> records = new ArrayList<>();
+      for (int i = 0; i < count; i++) {
+        records.add(generateRandomRecord(schema));
+      }
+      return records;
+    }
+
+    /**
+     * Generate a random GenericRecord for the given schema
+     */
+    public static GenericRecord generateRandomRecord(Schema schema) {
+      GenericRecord record = new GenericData.Record(schema);
+      for (Schema.Field field : schema.getFields()) {
+        Object value = generateRandomValue(field.schema(), field.defaultVal());
+        record.put(field.pos(), value);
+      }
+      return record;
+    }
+
+    /**
+     * Generate a random value for the given schema type
+     */
+    private static Object generateRandomValue(Schema schema, Object 
defaultValue) {
+      // CASE 1: Handle default value
+      if (defaultValue != null
+          && !(defaultValue instanceof JsonProperties.Null)
+          && RANDOM.nextBoolean()) {
+        return defaultValue;
+      }
+      // Handle Union type.
+      Schema actualSchema = schema;
+      try {
+        actualSchema = resolveNullableSchema(schema);
+      } catch (Exception e) {
+        // If we can't resolve the schema, just use the original
+        // Op.
+      }
+      // CASE 2: Handle different types
+      switch (actualSchema.getType()) {
+        case NULL:
+          return null;
+        case BOOLEAN:
+          return RANDOM.nextBoolean();
+        case INT:
+          return RANDOM.nextInt(1000);
+        case LONG:
+          return RANDOM.nextLong() % 1000000L;
+        case FLOAT:
+          return RANDOM.nextFloat() * 100f;
+        case DOUBLE:
+          return RANDOM.nextDouble() * 1000.0;
+        case STRING:
+          return "test_string_" + RANDOM.nextInt(1000);
+        case BYTES:
+          byte[] bytes = new byte[RANDOM.nextInt(10) + 1];
+          RANDOM.nextBytes(bytes);
+          return ByteBuffer.wrap(bytes);
+        case RECORD:
+          return generateRandomRecord(actualSchema);
+        case ENUM:
+          List<String> symbols = actualSchema.getEnumSymbols();
+          return new GenericData.EnumSymbol(actualSchema, 
symbols.get(RANDOM.nextInt(symbols.size())));
+        case ARRAY:
+          List<Object> array = new ArrayList<>();
+          int arraySize = RANDOM.nextInt(3) + 1;
+          for (int i = 0; i < arraySize; i++) {
+            array.add(generateRandomValue(actualSchema.getElementType(), 
null));
+          }
+          return array;
+        case MAP:
+          Map<String, Object> map = new HashMap<>();
+          int mapSize = RANDOM.nextInt(3) + 1;
+          for (int i = 0; i < mapSize; i++) {
+            map.put("key_" + i, 
generateRandomValue(actualSchema.getValueType(), null));
+          }
+          return map;
+        case FIXED:
+          byte[] fixedBytes = new byte[actualSchema.getFixedSize()];
+          RANDOM.nextBytes(fixedBytes);
+          return new GenericData.Fixed(actualSchema, fixedBytes);
+        default:
+          return null;
+      }
+    }
+  }
+
+  /**
+   * Test convertToSpecificRecord with multiple random records for each type
+   */
+  @ParameterizedTest
+  @MethodSource("provideAvroModelClasses")
+  void testConvertToSpecificRecordMultipleRecords(Class<? extends 
SpecificRecord> recordClass) {
+    Schema schema = SpecificData.get().getSchema(recordClass);
+    List<GenericRecord> genericRecords = 
AvroTestUtils.generateRandomRecords(schema, 3);
+    for (GenericRecord genericRecord : genericRecords) {
+      Class<? extends SpecificRecordBase> specificRecordBaseClass
+          = (Class<? extends SpecificRecordBase>) recordClass;
+      SpecificRecord specificRecord =
+          HoodieAvroUtils.convertToSpecificRecord(specificRecordBaseClass, 
genericRecord);
+      assertEquals(recordClass, specificRecord.getClass());
+      GenericRecord copied = (GenericRecord) 
GenericData.get().deepCopy(schema, specificRecord);
+      assertEquals(genericRecord, copied);
+    }
+  }
+
+  /**
+   * Provide all the Avro model classes to test
+   */
+  static Stream<Arguments> provideAvroModelClasses() {
+    return Stream.of(
+        Arguments.of(HoodieRollbackPartitionMetadata.class),
+        Arguments.of(HoodieSavepointPartitionMetadata.class),
+        Arguments.of(HoodieWriteStat.class),
+        Arguments.of(HoodieCleanPartitionMetadata.class),
+        Arguments.of(HoodieCleanFileInfo.class),
+        Arguments.of(HoodieActionInstant.class),
+        Arguments.of(HoodieCompactionStrategy.class),
+        Arguments.of(HoodieCompactionOperation.class),
+        Arguments.of(HoodieFSPermission.class),
+        Arguments.of(HoodiePath.class),
+        Arguments.of(HoodieFileStatus.class),
+        Arguments.of(HoodieBootstrapFilePartitionInfo.class),
+        Arguments.of(HoodieCompactionPlan.class),
+        Arguments.of(HoodieCleanerPlan.class),
+        Arguments.of(HoodieCleanMetadata.class),
+        Arguments.of(HoodieReplaceCommitMetadata.class),
+        Arguments.of(HoodieSavepointMetadata.class),
+        Arguments.of(HoodieMergeArchiveFilePlan.class),
+        Arguments.of(HoodieRollbackMetadata.class),
+        Arguments.of(HoodieBootstrapPartitionMetadata.class),
+        Arguments.of(HoodieBootstrapIndexInfo.class),
+        Arguments.of(HoodieIndexPlan.class),
+        Arguments.of(HoodieRequestedReplaceMetadata.class),
+        Arguments.of(HoodieRestoreMetadata.class),
+        Arguments.of(HoodieRestorePlan.class),
+        Arguments.of(HoodieRollbackPlan.class),
+        Arguments.of(HoodieDeleteRecordList.class),
+        Arguments.of(HoodieCommitMetadata.class)
+    );
+  }
 }

Reply via email to