This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.2.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit d2ba868c5147e975094ca01fd1d5dcb6d1e64a18 Author: voonhous <[email protected]> AuthorDate: Wed Apr 29 12:19:17 2026 +0800 fix(schema): Handle BLOB and VARIANT in Hive-reader rewriteRecordWithNewSchema (#18580) Fixes issue: #18578 HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchemaInternal switches on newSchema.getType() and only named RECORD/ENUM/ARRAY/MAP/UNION. BLOB (#18108) and VARIANT (#17833) are Hudi logical types physically stored as Avro records but exposed as distinct HoodieSchemaTypes, so a new schema typed BLOB/VARIANT fell through to rewritePrimaryType and threw "cannot support rewrite value for schema type". This reproduces on the Hive read path whenever Hive projects from its HMS-derived struct shape (record name = column name, type field = plain STRING) onto Hudi's canonical BLOB schema (record "blob", type = ENUM blob_storage_type, logicalType "blob") - the exact signature seen in ITTestCustomTypeHiveSync#testBlobTypeWithHiveSyncSQL. VECTOR was fine by accident because it maps to Avro FIXED. Add case BLOB and case VARIANT fallthrough to the existing RECORD body. Inner field layouts are fixed by BlobLogicalType.validate / VariantLogicalType.validate, so field-by-name iteration is correct. The existing ENUM case at line 137 already handles the STRING -> ENUM conversion for the BLOB "type" field. Tests pin the fix without Spark / Hive / Testcontainers - they call HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema directly with synthetic schemas that mirror the E2E failure signature, for both BLOB and VARIANT. --- .../hudi/common/schema/HoodieSchemaTestUtils.java | 29 ++++++++ .../utils/HoodieArrayWritableSchemaUtils.java | 3 + .../utils/TestHoodieArrayWritableSchemaUtils.java | 80 ++++++++++++++++++++++ 3 files changed, 112 insertions(+) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java index 387dfca41865..faba63d66690 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/schema/HoodieSchemaTestUtils.java @@ -67,4 +67,33 @@ public class HoodieSchemaTestUtils { public static HoodieSchema createNullableRecord(String name, HoodieSchemaField... fields) { return HoodieSchema.createNullable(HoodieSchema.createRecord(name, null, null, false, Arrays.asList(fields))); } + + /** + * Mirrors the canonical BLOB field layout ({@link HoodieSchema.Blob}) but without the blob + * logicalType attached and with the {@code type} field as plain STRING rather than ENUM. + * Represents what the pre-fix SQL INSERT path committed when the BLOB StructField metadata + * was stripped by Spark's TableOutputResolver Cast. + */ + public static HoodieSchema createPlainBlobRecord(String recordName) { + HoodieSchema reference = HoodieSchema.createRecord("reference", null, null, false, Arrays.asList( + HoodieSchemaField.of("external_path", HoodieSchema.create(HoodieSchemaType.STRING), null, null), + HoodieSchemaField.of("offset", HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.LONG)), null, null), + HoodieSchemaField.of("length", HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.LONG)), null, null), + HoodieSchemaField.of("managed", HoodieSchema.create(HoodieSchemaType.BOOLEAN), null, null))); + return HoodieSchema.createRecord(recordName, null, null, false, Arrays.asList( + HoodieSchemaField.of("type", HoodieSchema.create(HoodieSchemaType.STRING), null, null), + HoodieSchemaField.of("data", HoodieSchema.createNullable(HoodieSchema.create(HoodieSchemaType.BYTES)), null, JsonProperties.NULL_VALUE), + HoodieSchemaField.of("reference", HoodieSchema.createNullable(reference), null, JsonProperties.NULL_VALUE))); + } + + /** + * Canonical VARIANT field layout without the variant logicalType attached. Represents a plain + * record that would fail to rewrite into a canonical VARIANT schema on the Hive read path + * before the BLOB/VARIANT dispatch fix. + */ + public static HoodieSchema createPlainVariantRecord(String recordName) { + return HoodieSchema.createRecord(recordName, null, null, false, Arrays.asList( + HoodieSchemaField.of("metadata", HoodieSchema.create(HoodieSchemaType.BYTES), null, null), + HoodieSchemaField.of("value", HoodieSchema.create(HoodieSchemaType.BYTES), null, null))); + } } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java index 7e9c336846d8..a468de4f52a5 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/utils/HoodieArrayWritableSchemaUtils.java @@ -80,6 +80,9 @@ public class HoodieArrayWritableSchemaUtils { private static Writable rewriteRecordWithNewSchemaInternal(Writable writable, HoodieSchema oldSchema, HoodieSchema newSchema, Map<String, String> renameCols, Deque<String> fieldNames) { switch (newSchema.getType()) { + // BLOB/VARIANT are physically records; share the RECORD field-by-name rewrite. + case BLOB: + case VARIANT: case RECORD: if (!(writable instanceof ArrayWritable)) { throw new SchemaCompatibilityException(String.format("Cannot rewrite %s as a record", writable.getClass().getName())); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java index b6220a0caccd..16188b520074 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/utils/TestHoodieArrayWritableSchemaUtils.java @@ -22,6 +22,7 @@ package org.apache.hudi.hadoop.utils; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.engine.RecordContext; import org.apache.hudi.common.schema.HoodieSchema; +import org.apache.hudi.common.schema.HoodieSchemaTestUtils; import org.apache.hudi.common.schema.HoodieSchemaType; import org.apache.hudi.common.schema.HoodieSchemaUtils; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; @@ -47,6 +48,7 @@ import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -383,6 +385,84 @@ public class TestHoodieArrayWritableSchemaUtils { throw new IllegalArgumentException("Unsupported decimal object: " + value.getClass() + " -> " + value); } + @Test + void testRewriteBlobToBlobProjectionEquivalentShortCircuits() { + HoodieSchema blob = HoodieSchema.createBlob(); + ArrayWritable reference = new ArrayWritable(Writable.class, new Writable[]{ + new Text("blobs/path-1"), + new LongWritable(0L), + new LongWritable(11L), + new BooleanWritable(false) + }); + ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ + new Text("OUT_OF_LINE"), + NullWritable.get(), + reference + }); + ArrayWritable result = HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema( + record, blob, blob, Collections.emptyMap()); + // Same HoodieSchema on both sides -> areSchemasProjectionEquivalent short-circuits. + assertSame(record, result); + } + + @Test + void testRewritePlainBlobRecordToCanonicalBlobSchema() { + HoodieSchema oldSchema = HoodieSchemaTestUtils.createPlainBlobRecord("blob_data"); + HoodieSchema newSchema = HoodieSchema.createBlob(); + ArrayWritable reference = new ArrayWritable(Writable.class, new Writable[]{ + new Text("blobs/path-1"), + new LongWritable(0L), + new LongWritable(11L), + new BooleanWritable(false) + }); + ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ + new Text("OUT_OF_LINE"), + NullWritable.get(), + reference + }); + + ArrayWritable rewritten = HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema( + record, oldSchema, newSchema, Collections.emptyMap()); + + // Type field must have been converted from STRING (Text) to ENUM (BytesWritable). + assertInstanceOf(BytesWritable.class, rewritten.get()[0]); + assertEquals("OUT_OF_LINE", new String(((BytesWritable) rewritten.get()[0]).copyBytes())); + // Data stays null, reference record passes through unchanged field-wise. + assertEquals(NullWritable.get(), rewritten.get()[1]); + ArrayWritable rewrittenRef = (ArrayWritable) rewritten.get()[2]; + assertEquals(new Text("blobs/path-1"), rewrittenRef.get()[0]); + assertEquals(new LongWritable(0L), rewrittenRef.get()[1]); + assertEquals(new LongWritable(11L), rewrittenRef.get()[2]); + assertEquals(new BooleanWritable(false), rewrittenRef.get()[3]); + } + + @Test + void testRewriteVariantToVariantProjectionEquivalentShortCircuits() { + HoodieSchema variant = HoodieSchema.createVariant(); + ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{ + new BytesWritable(new byte[]{1, 2, 3}), + new BytesWritable(new byte[]{4, 5, 6}) + }); + ArrayWritable result = HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema( + record, variant, variant, Collections.emptyMap()); + assertSame(record, result); + } + + @Test + void testRewritePlainVariantRecordToCanonicalVariantSchema() { + HoodieSchema oldSchema = HoodieSchemaTestUtils.createPlainVariantRecord("variant_data"); + HoodieSchema newSchema = HoodieSchema.createVariant(); + BytesWritable metadata = new BytesWritable(new byte[]{1, 2, 3}); + BytesWritable value = new BytesWritable(new byte[]{4, 5, 6}); + ArrayWritable record = new ArrayWritable(Writable.class, new Writable[]{metadata, value}); + + ArrayWritable rewritten = HoodieArrayWritableSchemaUtils.rewriteRecordWithNewSchema( + record, oldSchema, newSchema, Collections.emptyMap()); + + assertEquals(metadata, rewritten.get()[0]); + assertEquals(value, rewritten.get()[1]); + } + private ObjectInspector getWritableOIForType(TypeInfo typeInfo) { switch (typeInfo.getCategory()) { case PRIMITIVE:
