This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit d2ba868c5147e975094ca01fd1d5dcb6d1e64a18
Author: voonhous <[email protected]>
AuthorDate: Wed Apr 29 12:19:17 2026 +0800

    fix(schema): Handle BLOB and VARIANT in Hive-reader 
rewriteRecordWithNewSchema (#18580)
    
    Fixes issue: #18578
    
    HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchemaInternal switches
    on newSchema.getType() and only named RECORD/ENUM/ARRAY/MAP/UNION. BLOB
    (#18108) and VARIANT (#17833) are Hudi logical types physically stored as
    Avro records but exposed as distinct HoodieSchemaTypes, so a new schema
    typed BLOB/VARIANT fell through to rewritePrimaryType and threw
    "cannot support rewrite value for schema type".
    
    This reproduces on the Hive read path whenever Hive projects from its
    HMS-derived struct shape (record name = column name, type field = plain
    STRING) onto Hudi's canonical BLOB schema (record "blob", type = ENUM
    blob_storage_type, logicalType "blob") - the exact signature seen in
    ITTestCustomTypeHiveSync#testBlobTypeWithHiveSyncSQL. VECTOR was fine by
    accident because it maps to Avro FIXED.
    
    Add case BLOB and case VARIANT fallthrough to the existing RECORD body.
    Inner field layouts are fixed by BlobLogicalType.validate /
    VariantLogicalType.validate, so field-by-name iteration is correct. The
    existing ENUM case at line 137 already handles the STRING -> ENUM
    conversion for the BLOB "type" field.
    
    Tests pin the fix without Spark / Hive / Testcontainers - they call
    HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema directly with
    synthetic schemas that mirror the E2E failure signature, for both BLOB
    and VARIANT.
---
 .../hudi/common/schema/HoodieSchemaTestUtils.java  | 29 ++++++++
 .../utils/HoodieArrayWritableSchemaUtils.java      |  3 +
 .../utils/TestHoodieArrayWritableSchemaUtils.java  | 80 ++++++++++++++++++++++
 3 files changed, 112 insertions(+)

diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java
index 387dfca41865..faba63d66690 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java
@@ -67,4 +67,33 @@ public class HoodieSchemaTestUtils {
   public static HoodieSchema createNullableRecord(String name, 
HoodieSchemaField... fields) {
     return HoodieSchema.createNullable(HoodieSchema.createRecord(name, null, 
null, false, Arrays.asList(fields)));
   }
+
+  /**
+   * Mirrors the canonical BLOB field layout ({@link HoodieSchema.Blob}) but 
without the blob
+   * logicalType attached and with the {@code type} field as plain STRING 
rather than ENUM.
+   * Represents what the pre-fix SQL INSERT path committed when the BLOB 
StructField metadata
+   * was stripped by Spark's TableOutputResolver Cast.
+   */
+  public static HoodieSchema createPlainBlobRecord(String recordName) {
+    HoodieSchema reference = HoodieSchema.createRecord("reference", null, 
null, false, Arrays.asList(
+        HoodieSchemaField.of("external_path", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+        HoodieSchemaField.of("offset", 
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.LONG)), null, 
null),
+        HoodieSchemaField.of("length", 
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.LONG)), null, 
null),
+        HoodieSchemaField.of("managed", 
HoodieSchema.create(HoodieSchemaType.BOOLEAN), null, null)));
+    return HoodieSchema.createRecord(recordName, null, null, false, 
Arrays.asList(
+        HoodieSchemaField.of("type", 
HoodieSchema.create(HoodieSchemaType.STRING), null, null),
+        HoodieSchemaField.of("data", 
HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.BYTES)), null, 
JsonProperties.NULL_VALUE),
+        HoodieSchemaField.of("reference", 
HoodieSchema.createNullable(reference), null, JsonProperties.NULL_VALUE)));
+  }
+
+  /**
+   * Canonical VARIANT field layout without the variant logicalType attached. 
Represents a plain
+   * record that would fail to rewrite into a canonical VARIANT schema on the 
Hive read path
+   * before the BLOB/VARIANT dispatch fix.
+   */
+  public static HoodieSchema createPlainVariantRecord(String recordName) {
+    return HoodieSchema.createRecord(recordName, null, null, false, 
Arrays.asList(
+        HoodieSchemaField.of("metadata", 
HoodieSchema.create(HoodieSchemaType.BYTES), null, null),
+        HoodieSchemaField.of("value", 
HoodieSchema.create(HoodieSchemaType.BYTES), null, null)));
+  }
 }
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java
index 7e9c336846d8..a468de4f52a5 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java
@@ -80,6 +80,9 @@ public class HoodieArrayWritableSchemaUtils {
 
   private static Writable rewriteRecordWithNewSchemaInternal(Writable 
writable, HoodieSchema oldSchema, HoodieSchema newSchema, Map<String, String> 
renameCols, Deque<String> fieldNames) {
     switch (newSchema.getType()) {
+      // BLOB/VARIANT are physically records; share the RECORD field-by-name 
rewrite.
+      case BLOB:
+      case VARIANT:
       case RECORD:
         if (!(writable instanceof ArrayWritable)) {
           throw new SchemaCompatibilityException(String.format("Cannot rewrite 
%s as a record", writable.getClass().getName()));
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
index b6220a0caccd..16188b520074 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java
@@ -22,6 +22,7 @@ package org.apache.hudi.hadoop.utils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.engine.RecordContext;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaTestUtils;
 import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -47,6 +48,7 @@ import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
@@ -383,6 +385,84 @@ public class TestHoodieArrayWritableSchemaUtils {
     throw new IllegalArgumentException("Unsupported decimal object: " + 
value.getClass() + " -> " + value);
   }
 
+  @Test
+  void testRewriteBlobToBlobProjectionEquivalentShortCircuits() {
+    HoodieSchema blob = HoodieSchema.createBlob();
+    ArrayWritable reference = new ArrayWritable(Writable.class, new Writable[]{
+        new Text("blobs/path-1"),
+        new LongWritable(0L),
+        new LongWritable(11L),
+        new BooleanWritable(false)
+    });
+    ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
+        new Text("OUT_OF_LINE"),
+        NullWritable.get(),
+        reference
+    });
+    ArrayWritable result = 
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(
+        record, blob, blob, Collections.emptyMap());
+    // Same HoodieSchema on both sides -> areSchemasProjectionEquivalent 
short-circuits.
+    assertSame(record, result);
+  }
+
+  @Test
+  void testRewritePlainBlobRecordToCanonicalBlobSchema() {
+    HoodieSchema oldSchema = 
HoodieSchemaTestUtils.createPlainBlobRecord("blob_data");
+    HoodieSchema newSchema = HoodieSchema.createBlob();
+    ArrayWritable reference = new ArrayWritable(Writable.class, new Writable[]{
+        new Text("blobs/path-1"),
+        new LongWritable(0L),
+        new LongWritable(11L),
+        new BooleanWritable(false)
+    });
+    ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
+        new Text("OUT_OF_LINE"),
+        NullWritable.get(),
+        reference
+    });
+
+    ArrayWritable rewritten = 
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(
+        record, oldSchema, newSchema, Collections.emptyMap());
+
+    // Type field must have been converted from STRING (Text) to ENUM 
(BytesWritable).
+    assertInstanceOf(BytesWritable.class, rewritten.get()[0]);
+    assertEquals("OUT_OF_LINE", new String(((BytesWritable) 
rewritten.get()[0]).copyBytes()));
+    // Data stays null, reference record passes through unchanged field-wise.
+    assertEquals(NullWritable.get(), rewritten.get()[1]);
+    ArrayWritable rewrittenRef = (ArrayWritable) rewritten.get()[2];
+    assertEquals(new Text("blobs/path-1"), rewrittenRef.get()[0]);
+    assertEquals(new LongWritable(0L), rewrittenRef.get()[1]);
+    assertEquals(new LongWritable(11L), rewrittenRef.get()[2]);
+    assertEquals(new BooleanWritable(false), rewrittenRef.get()[3]);
+  }
+
+  @Test
+  void testRewriteVariantToVariantProjectionEquivalentShortCircuits() {
+    HoodieSchema variant = HoodieSchema.createVariant();
+    ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{
+        new BytesWritable(new byte[]{1, 2, 3}),
+        new BytesWritable(new byte[]{4, 5, 6})
+    });
+    ArrayWritable result = 
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(
+        record, variant, variant, Collections.emptyMap());
+    assertSame(record, result);
+  }
+
+  @Test
+  void testRewritePlainVariantRecordToCanonicalVariantSchema() {
+    HoodieSchema oldSchema = 
HoodieSchemaTestUtils.createPlainVariantRecord("variant_data");
+    HoodieSchema newSchema = HoodieSchema.createVariant();
+    BytesWritable metadata = new BytesWritable(new byte[]{1, 2, 3});
+    BytesWritable value = new BytesWritable(new byte[]{4, 5, 6});
+    ArrayWritable record = new ArrayWritable(Writable.class, new 
Writable[]{metadata, value});
+
+    ArrayWritable rewritten = 
HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema(
+        record, oldSchema, newSchema, Collections.emptyMap());
+
+    assertEquals(metadata, rewritten.get()[0]);
+    assertEquals(value, rewritten.get()[1]);
+  }
+
   private ObjectInspector getWritableOIForType(TypeInfo typeInfo) {
     switch (typeInfo.getCategory()) {
       case PRIMITIVE:

Reply via email to