This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 4a0a7f2797d62294a9b4650cc77f2a8d625b5b0d
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue May 19 17:20:41 2026 -0700

    fix(spark): handle Avro 1.12 logical type values in Spark 4.1 read path 
(#18773)
---
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 145 +++++++++++++++++++--
 .../apache/hudi/avro/HoodieAvroWrapperUtils.java   |  26 +++-
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  | 144 ++++++++++++++++++++
 .../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala  |   4 -
 .../sql/hudi/dml/others/TestMergeIntoTable.scala   |   4 +-
 .../apache/spark/sql/avro/AvroDeserializer.scala   |  22 ++++
 .../hudi/TestSpark4_1AvroLogicalTypeBytes.scala    | 127 ++++++++++++++++++
 7 files changed, 453 insertions(+), 19 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 654eb1660376..7c55f441d200 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -36,6 +36,7 @@ import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Conversions;
 import org.apache.avro.Conversions.DecimalConversion;
 import org.apache.avro.JsonProperties;
+import org.apache.avro.LogicalType;
 import org.apache.avro.LogicalTypes;
 import org.apache.avro.LogicalTypes.Decimal;
 import org.apache.avro.Schema;
@@ -70,8 +71,11 @@ import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.time.Instant;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -671,20 +675,42 @@ public class HoodieAvroUtils {
    * <p>
    * Decimal Data Type is converted to actual decimal value instead of 
bytes/fixed which is how it is
    * represented/stored in parquet.
+   * <p>
+   * <b>Avro version compatibility:</b> Avro 1.12.1 (pulled in by the Spark 
4.1 profile) flipped the
+   * default of {@code GenericData}'s {@code fastReaderEnabled} flag from 
{@code false} to
+   * {@code true} (the {@code org.apache.avro.fastread} system property now 
defaults to
+   * {@code "true"}). With the fast reader enabled, {@code GenericDatumReader} 
applies the
+   * registered {@code Conversion}s during decode and materializes {@link 
LocalDate} for date,
+   * {@link Instant} for timestamp-millis / timestamp-micros, and {@link 
LocalDateTime} for
+   * local-timestamp-millis / local-timestamp-micros. Avro 1.12.0 (Spark 4.0) 
and Avro 1.11.x
+   * (Spark 3.5 and earlier) left the fast reader off by default, so those 
profiles exposed the raw
+   * {@link Integer} / {@link Long}. This method normalizes both forms to the 
same stable
+   * representation, so callers (precombine/ordering comparison, 
partition-path and record-key
+   * derivation, etc.) behave identically across Avro versions and across 
writer/reader Spark
+   * version combinations. The on-disk byte format is unaffected by this 
normalization: Avro's wire
+   * encoding for these logical types is fixed by spec to int / long and is 
identical across Avro
+   * versions; only the in-memory Java type returned by the reader changed.
    *
    * @param fieldSchema avro field schema
    * @param fieldValue  avro field value
    * @return field value either converted (for certain data types) or as it is.
    */
   public static Object convertValueForAvroLogicalTypes(Schema fieldSchema, 
Object fieldValue, boolean consistentLogicalTimestampEnabled) {
-    if (fieldSchema.getLogicalType() == LogicalTypes.date()) {
-      return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString()));
-    } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMillis() 
&& consistentLogicalTimestampEnabled) {
-      return new Timestamp(Long.parseLong(fieldValue.toString()));
-    } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMicros() 
&& consistentLogicalTimestampEnabled) {
-      return new Timestamp(Long.parseLong(fieldValue.toString()) / 1000);
-    } else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) {
-      Decimal dc = (Decimal) fieldSchema.getLogicalType();
+    LogicalType logicalType = fieldSchema.getLogicalType();
+    if (logicalType == LogicalTypes.date()) {
+      return LocalDate.ofEpochDay(extractEpochDay(fieldValue));
+    } else if (logicalType == LogicalTypes.timestampMillis()) {
+      long millis = extractEpochMillis(fieldValue);
+      return consistentLogicalTimestampEnabled ? new Timestamp(millis) : 
millis;
+    } else if (logicalType == LogicalTypes.timestampMicros()) {
+      long micros = extractEpochMicros(fieldValue);
+      return consistentLogicalTimestampEnabled ? new Timestamp(micros / 1000) 
: micros;
+    } else if (logicalType == LogicalTypes.localTimestampMillis()) {
+      return extractLocalEpochMillis(fieldValue);
+    } else if (logicalType == LogicalTypes.localTimestampMicros()) {
+      return extractLocalEpochMicros(fieldValue);
+    } else if (logicalType instanceof LogicalTypes.Decimal) {
+      Decimal dc = (Decimal) logicalType;
       DecimalConversion decimalConversion = new DecimalConversion();
       if (fieldSchema.getType() == Schema.Type.FIXED) {
         return decimalConversion.fromFixed((GenericFixed) fieldValue, 
fieldSchema,
@@ -700,6 +726,101 @@ public class HoodieAvroUtils {
     return fieldValue;
   }
 
+  // The extract* helpers below accept either the Avro primitive form (Integer 
/ Long, returned by
+  // Avro 1.12.0 / 1.11.x) or the java.time form (returned by Avro 1.12.1 with 
its default
+  // fastReaderEnabled=true), and return the canonical primitive for the same 
underlying bytes.
+  // See the javadoc on convertValueForAvroLogicalTypes for context.
+
+  private static long extractEpochDay(Object fieldValue) {
+    if (fieldValue instanceof LocalDate) {
+      return ((LocalDate) fieldValue).toEpochDay();
+    }
+    if (fieldValue instanceof Number) {
+      return ((Number) fieldValue).longValue();
+    }
+    return Long.parseLong(fieldValue.toString());
+  }
+
+  private static long extractEpochMillis(Object fieldValue) {
+    if (fieldValue instanceof Instant) {
+      return ((Instant) fieldValue).toEpochMilli();
+    }
+    if (fieldValue instanceof Number) {
+      return ((Number) fieldValue).longValue();
+    }
+    return Long.parseLong(fieldValue.toString());
+  }
+
+  private static long extractEpochMicros(Object fieldValue) {
+    if (fieldValue instanceof Instant) {
+      Instant instant = (Instant) fieldValue;
+      return Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 
1_000_000L), instant.getNano() / 1000L);
+    }
+    if (fieldValue instanceof Number) {
+      return ((Number) fieldValue).longValue();
+    }
+    return Long.parseLong(fieldValue.toString());
+  }
+
+  private static long extractLocalEpochMillis(Object fieldValue) {
+    if (fieldValue instanceof LocalDateTime) {
+      return ((LocalDateTime) 
fieldValue).toInstant(ZoneOffset.UTC).toEpochMilli();
+    }
+    if (fieldValue instanceof Number) {
+      return ((Number) fieldValue).longValue();
+    }
+    return Long.parseLong(fieldValue.toString());
+  }
+
+  private static long extractLocalEpochMicros(Object fieldValue) {
+    if (fieldValue instanceof LocalDateTime) {
+      Instant instant = ((LocalDateTime) fieldValue).toInstant(ZoneOffset.UTC);
+      return Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 
1_000_000L), instant.getNano() / 1000L);
+    }
+    if (fieldValue instanceof Number) {
+      return ((Number) fieldValue).longValue();
+    }
+    return Long.parseLong(fieldValue.toString());
+  }
+
+  /**
+   * If {@code schema} carries a date / timestamp logical type and {@code 
value} is in the
+   * java.time form ({@link LocalDate} / {@link Instant} / {@link 
LocalDateTime}), normalize it
+   * to the Avro primitive form ({@link Integer} for date, {@link Long} for 
timestamp-millis,
+   * timestamp-micros, local-timestamp-millis, and local-timestamp-micros). 
Used at the entry of
+   * schema-evolution rewrite paths whose legacy code does unguarded {@code 
(Integer)} /
+   * {@code (Long)} casts on field values, which would otherwise fail under 
Avro 1.12.1 (Spark
+   * 4.1) where the {@code fastReaderEnabled} default is {@code true} and 
{@code GenericDatumReader}
+   * materializes java.time types instead of primitives. For non-logical-type 
schemas, primitive
+   * inputs, or null, this is a no-op. The on-disk byte format is unaffected - 
this is purely
+   * about in-memory Java type.
+   */
+  private static Object normalizeAvroLogicalTypeToPrimitive(Object value, 
Schema schema) {
+    if (value == null || schema == null) {
+      return value;
+    }
+    LogicalType lt = schema.getLogicalType();
+    if (lt == null) {
+      return value;
+    }
+    if (lt == LogicalTypes.date()) {
+      return value instanceof LocalDate ? (int) ((LocalDate) 
value).toEpochDay() : value;
+    }
+    if (lt == LogicalTypes.timestampMillis()) {
+      return value instanceof Instant ? extractEpochMillis(value) : value;
+    }
+    if (lt == LogicalTypes.timestampMicros()) {
+      return value instanceof Instant ? extractEpochMicros(value) : value;
+    }
+    if (lt == LogicalTypes.localTimestampMillis()) {
+      return value instanceof LocalDateTime ? extractLocalEpochMillis(value) : 
value;
+    }
+    if (lt == LogicalTypes.localTimestampMicros()) {
+      return value instanceof LocalDateTime ? extractLocalEpochMicros(value) : 
value;
+    }
+    return value;
+  }
+
   /**
    * Gets record column values into object array.
    *
@@ -931,6 +1052,14 @@ public class HoodieAvroUtils {
   }
 
   public static Object rewritePrimaryType(Object oldValue, Schema oldSchema, 
Schema newSchema) {
+    // Normalize any java.time form (LocalDate / Instant / LocalDateTime) to 
the Avro primitive
+    // form (Integer / Long) before doing any numeric / string conversion. The 
legacy branches
+    // below explicitly cast to Integer / Long; under Avro 1.12.1 (Spark 4.1 
profile) the
+    // fastReaderEnabled default flipped to true and GenericDatumReader 
materializes java.time
+    // types for date / timestamp logical fields, which would break those 
casts. See
+    // convertValueForAvroLogicalTypes for the broader context — this is a 
read-side normalization
+    // only and does not affect on-disk byte format.
+    oldValue = normalizeAvroLogicalTypeToPrimitive(oldValue, oldSchema);
     if (oldSchema.getType() == newSchema.getType()) {
       switch (oldSchema.getType()) {
         case NULL:
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWrapperUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWrapperUtils.java
index 91590a0210d5..d0eaec5f05c9 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWrapperUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWrapperUtils.java
@@ -206,14 +206,18 @@ public class HoodieAvroWrapperUtils {
       return null;
     } else if (DateWrapper.class.getSimpleName().equals(wrapperClassName)) {
       ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
-      return Date.valueOf(LocalDate.ofEpochDay((Integer) ((GenericRecord) 
avroValueWrapper).get(0)));
+      // Avro 1.12.1 (Spark 4.1 profile) defaults fastReaderEnabled=true, so 
GenericDatumReader returns
+      // java.time.LocalDate for date logical types; Avro 1.12.0 (Spark 4.0) 
and earlier return Integer.
+      // Accept both — see HoodieAvroUtils#convertValueForAvroLogicalTypes for 
the broader context.
+      return Date.valueOf(LocalDate.ofEpochDay(toEpochDay(((GenericRecord) 
avroValueWrapper).get(0))));
     } else if 
(LocalDateWrapper.class.getSimpleName().equals(wrapperClassName)) {
       ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
-      return LocalDate.ofEpochDay((Integer) ((GenericRecord) 
avroValueWrapper).get(0));
+      return LocalDate.ofEpochDay(toEpochDay(((GenericRecord) 
avroValueWrapper).get(0)));
     } else if 
(TimestampMicrosWrapper.class.getSimpleName().equals(wrapperClassName)) {
       ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
-      Instant instant = microsToInstant((Long) ((GenericRecord) 
avroValueWrapper).get(0));
-      return Timestamp.from(instant);
+      // Avro 1.12.1 (Spark 4.1 profile) defaults fastReaderEnabled=true, so 
timestamp-micros decodes
+      // to java.time.Instant; Avro 1.12.0 (Spark 4.0) and earlier return Long.
+      return Timestamp.from(toInstantFromMicros(((GenericRecord) 
avroValueWrapper).get(0)));
     } else if (DecimalWrapper.class.getSimpleName().equals(wrapperClassName)) {
       Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema();
       ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord);
@@ -340,4 +344,18 @@ public class HoodieAvroWrapperUtils {
     GenericRecord genRec = (GenericRecord) val;
     return (Comparable<?>) genRec.get("value");
   }
+
+  private static int toEpochDay(Object dateFieldValue) {
+    if (dateFieldValue instanceof LocalDate) {
+      return Math.toIntExact(((LocalDate) dateFieldValue).toEpochDay());
+    }
+    return ((Number) dateFieldValue).intValue();
+  }
+
+  private static Instant toInstantFromMicros(Object timestampMicrosFieldValue) 
{
+    if (timestampMicrosFieldValue instanceof Instant) {
+      return (Instant) timestampMicrosFieldValue;
+    }
+    return microsToInstant(((Number) timestampMicrosFieldValue).longValue());
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index e24501ec7754..0028b322b99e 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -99,7 +99,10 @@ import java.math.RoundingMode;
 import java.nio.ByteBuffer;
 import java.sql.Date;
 import java.sql.Timestamp;
+import java.time.Instant;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
 import java.time.temporal.ChronoUnit;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -499,6 +502,147 @@ public class TestHoodieAvroUtils {
     assertEquals(0, buffer.position());
   }
 
+  // Cross-Avro-version test fixtures: the same date / timestamp value 
expressed in both the Avro
+  // primitive form (Integer / Long) — what GenericDatumReader returns under 
Avro 1.12.0 / 1.11.x —
+  // and the java.time form (LocalDate / Instant / LocalDateTime) — what it 
returns under Avro
+  // 1.12.1 with its default fastReaderEnabled=true. The contract under test 
is that both forms
+  // produce identical downstream behavior in every Hudi read-side path that 
touches a logical-typed
+  // field. Sharing these fixtures across tests pins the same exact value 
through every path.
+  private static final long FIXTURE_EPOCH_MICROS = 1716163200_000000L + 
123456L; // 2024-05-20T00:00:00.123456Z
+  private static final long FIXTURE_EPOCH_MILLIS = 1716163200_000L + 123L;     
  // 2024-05-20T00:00:00.123Z
+  private static final int FIXTURE_EPOCH_DAY = (int) LocalDate.of(2024, 5, 
20).toEpochDay();
+  private static final Instant FIXTURE_MILLIS_INSTANT = 
Instant.ofEpochMilli(FIXTURE_EPOCH_MILLIS);
+  private static final Instant FIXTURE_MICROS_INSTANT = Instant.ofEpochSecond(
+      FIXTURE_EPOCH_MICROS / 1_000_000L, (FIXTURE_EPOCH_MICROS % 1_000_000L) * 
1000L);
+  private static final LocalDate FIXTURE_LOCAL_DATE = 
LocalDate.ofEpochDay(FIXTURE_EPOCH_DAY);
+  private static final LocalDateTime FIXTURE_LOCAL_DT_MILLIS =
+      LocalDateTime.ofInstant(FIXTURE_MILLIS_INSTANT, ZoneOffset.UTC);
+  private static final LocalDateTime FIXTURE_LOCAL_DT_MICROS =
+      LocalDateTime.ofInstant(FIXTURE_MICROS_INSTANT, ZoneOffset.UTC);
+
+  private static final Schema DATE_SCHEMA = 
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+  private static final Schema TS_MILLIS_SCHEMA = 
LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
+  private static final Schema TS_MICROS_SCHEMA = 
LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
+  private static final Schema LOCAL_TS_MILLIS_SCHEMA = 
LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG));
+  private static final Schema LOCAL_TS_MICROS_SCHEMA = 
LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG));
+
+  /**
+   * Cross-Avro-version invariant: {@link 
HoodieAvroUtils#convertValueForAvroLogicalTypes} must produce
+   * the same canonical Java value whether the GenericRecord field holds the 
Avro primitive form
+   * ({@code Long}/{@code Integer}, returned by Avro 1.12.0 and 1.11.x) or the 
java.time form
+   * ({@code Instant}/{@code LocalDate}/{@code LocalDateTime}, returned by 
Avro 1.12.1 with its
+   * default {@code fastReaderEnabled=true}). This is the contract that lets a 
Spark 4.1 reader
+   * compare ordering values against records written by any earlier Spark 
profile without divergence.
+   */
+  @Test
+  public void testConvertValueForAvroLogicalTypesCrossAvroVersion() {
+    // date
+    assertEquals(HoodieAvroUtils.convertValueForAvroLogicalTypes(DATE_SCHEMA, 
FIXTURE_EPOCH_DAY, false),
+        HoodieAvroUtils.convertValueForAvroLogicalTypes(DATE_SCHEMA, 
FIXTURE_LOCAL_DATE, false));
+
+    // timestamp-millis, consistent=false → epoch-millis Long
+    assertEquals(FIXTURE_EPOCH_MILLIS, 
HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MILLIS_SCHEMA, 
FIXTURE_EPOCH_MILLIS, false));
+    assertEquals(FIXTURE_EPOCH_MILLIS, 
HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MILLIS_SCHEMA, 
FIXTURE_MILLIS_INSTANT, false));
+
+    // timestamp-millis, consistent=true → java.sql.Timestamp
+    
assertEquals(HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MILLIS_SCHEMA, 
FIXTURE_EPOCH_MILLIS, true),
+        HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MILLIS_SCHEMA, 
FIXTURE_MILLIS_INSTANT, true));
+
+    // timestamp-micros, consistent=false → epoch-micros Long
+    assertEquals(FIXTURE_EPOCH_MICROS, 
HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MICROS_SCHEMA, 
FIXTURE_EPOCH_MICROS, false));
+    assertEquals(FIXTURE_EPOCH_MICROS, 
HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MICROS_SCHEMA, 
FIXTURE_MICROS_INSTANT, false));
+
+    // timestamp-micros, consistent=true → java.sql.Timestamp (millis 
precision, matches Avro 1.11 behavior)
+    
assertEquals(HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MICROS_SCHEMA, 
FIXTURE_EPOCH_MICROS, true),
+        HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MICROS_SCHEMA, 
FIXTURE_MICROS_INSTANT, true));
+
+    // local-timestamp-millis / local-timestamp-micros → Long
+    assertEquals(FIXTURE_EPOCH_MILLIS, 
HoodieAvroUtils.convertValueForAvroLogicalTypes(LOCAL_TS_MILLIS_SCHEMA, 
FIXTURE_EPOCH_MILLIS, false));
+    assertEquals(FIXTURE_EPOCH_MILLIS, 
HoodieAvroUtils.convertValueForAvroLogicalTypes(LOCAL_TS_MILLIS_SCHEMA, 
FIXTURE_LOCAL_DT_MILLIS, false));
+    assertEquals(FIXTURE_EPOCH_MICROS, 
HoodieAvroUtils.convertValueForAvroLogicalTypes(LOCAL_TS_MICROS_SCHEMA, 
FIXTURE_EPOCH_MICROS, false));
+    assertEquals(FIXTURE_EPOCH_MICROS, 
HoodieAvroUtils.convertValueForAvroLogicalTypes(LOCAL_TS_MICROS_SCHEMA, 
FIXTURE_LOCAL_DT_MICROS, false));
+  }
+
+  /**
+   * Cross-Avro-version invariant for ordering-value extraction: a record 
whose timestamp/date field
+   * holds the java.time form (Avro 1.12.1 fast reader) must yield the same 
comparable ordering value
+   * as one holding the primitive form (Avro 1.12.0 / 1.11.x). Without this 
property,
+   * {@code DefaultHoodieRecordPayload.compareOrderingVal} throws 
ClassCastException when one side of
+   * the comparison was read via Avro 1.12.1 and the other built from a Long 
(which is what Hudi's
+   * Spark→Avro serializer always produces).
+   */
+  @Test
+  public void testGetNestedFieldValOrderingInvariantAcrossAvroVersions() {
+    String schemaStr = "{\"type\":\"record\",\"name\":\"r\",\"fields\":["
+        + "{\"name\":\"id\",\"type\":\"string\"},"
+        + 
"{\"name\":\"ts\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}},"
+        + 
"{\"name\":\"d\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}}]}";
+    Schema schema = new Schema.Parser().parse(schemaStr);
+
+    GenericRecord avro111Form = new GenericData.Record(schema);
+    avro111Form.put("id", "k");
+    avro111Form.put("ts", FIXTURE_EPOCH_MICROS);
+    avro111Form.put("d", FIXTURE_EPOCH_DAY);
+
+    GenericRecord avro112Form = new GenericData.Record(schema);
+    avro112Form.put("id", "k");
+    avro112Form.put("ts", FIXTURE_MICROS_INSTANT);
+    avro112Form.put("d", FIXTURE_LOCAL_DATE);
+
+    Object ts111 = HoodieAvroUtils.getNestedFieldVal(avro111Form, "ts", true, 
false);
+    Object ts112 = HoodieAvroUtils.getNestedFieldVal(avro112Form, "ts", true, 
false);
+    assertEquals(ts111, ts112);
+    // ordering compareTo must be symmetric and produce 0 — this is exactly 
what
+    // DefaultHoodieRecordPayload.compareOrderingVal relies on.
+    assertEquals(0, ((Comparable<Object>) ts111).compareTo(ts112));
+    assertEquals(0, ((Comparable<Object>) ts112).compareTo(ts111));
+
+    Object d111 = HoodieAvroUtils.getNestedFieldVal(avro111Form, "d", true, 
false);
+    Object d112 = HoodieAvroUtils.getNestedFieldVal(avro112Form, "d", true, 
false);
+    assertEquals(d111, d112);
+    assertEquals(0, ((Comparable<Object>) d111).compareTo(d112));
+  }
+
+  /**
+   * Cross-Avro-version invariant for {@link 
HoodieAvroUtils#rewritePrimaryType}: schema-evolution
+   * paths that legacy-cast {@code (Integer) oldValue} / {@code (Long) 
oldValue} (e.g. ALTER COLUMN
+   * TYPE from date → string, or timestamp-millis → timestamp-micros) must 
accept the java.time
+   * form (Avro 1.12.1 fast reader) as well as the primitive form (Avro 1.12.0 
/ 1.11.x), since the
+   * on-disk byte format is identical and a Spark 4.1 reader can be evolving 
records written by any
+   * version.
+   */
+  @Test
+  public void testRewritePrimaryTypeCrossAvroVersion() {
+    Schema stringSchema = Schema.create(Schema.Type.STRING);
+    Schema longSchema = Schema.create(Schema.Type.LONG);
+    Schema floatSchema = Schema.create(Schema.Type.FLOAT);
+    Schema doubleSchema = Schema.create(Schema.Type.DOUBLE);
+
+    // For each (oldSchema, target newSchema), both the primitive and 
java.time inputs must yield the
+    // same rewritten value — same on-disk semantics, no divergence between 
Spark profiles.
+    assertRewriteEquivalent(DATE_SCHEMA, stringSchema, FIXTURE_EPOCH_DAY, 
FIXTURE_LOCAL_DATE);
+    assertRewriteEquivalent(DATE_SCHEMA, longSchema, FIXTURE_EPOCH_DAY, 
FIXTURE_LOCAL_DATE);
+    assertRewriteEquivalent(DATE_SCHEMA, floatSchema, FIXTURE_EPOCH_DAY, 
FIXTURE_LOCAL_DATE);
+    assertRewriteEquivalent(DATE_SCHEMA, doubleSchema, FIXTURE_EPOCH_DAY, 
FIXTURE_LOCAL_DATE);
+
+    assertRewriteEquivalent(TS_MILLIS_SCHEMA, stringSchema, 
FIXTURE_EPOCH_MILLIS, FIXTURE_MILLIS_INSTANT);
+    assertRewriteEquivalent(TS_MILLIS_SCHEMA, floatSchema, 
FIXTURE_EPOCH_MILLIS, FIXTURE_MILLIS_INSTANT);
+    assertRewriteEquivalent(TS_MILLIS_SCHEMA, doubleSchema, 
FIXTURE_EPOCH_MILLIS, FIXTURE_MILLIS_INSTANT);
+
+    // In-place logical-type changes: the LONG → LONG branches in 
rewritePrimaryType explicitly cast
+    // to (Long) and would fail on Instant / LocalDateTime under Avro 1.12.1.
+    assertRewriteEquivalent(TS_MILLIS_SCHEMA, TS_MICROS_SCHEMA, 
FIXTURE_EPOCH_MILLIS, FIXTURE_MILLIS_INSTANT);
+    assertRewriteEquivalent(TS_MICROS_SCHEMA, TS_MILLIS_SCHEMA, 
FIXTURE_EPOCH_MICROS, FIXTURE_MICROS_INSTANT);
+    assertRewriteEquivalent(LOCAL_TS_MILLIS_SCHEMA, LOCAL_TS_MICROS_SCHEMA, 
FIXTURE_EPOCH_MILLIS, FIXTURE_LOCAL_DT_MILLIS);
+    assertRewriteEquivalent(LOCAL_TS_MICROS_SCHEMA, LOCAL_TS_MILLIS_SCHEMA, 
FIXTURE_EPOCH_MICROS, FIXTURE_LOCAL_DT_MICROS);
+  }
+
+  private static void assertRewriteEquivalent(Schema oldSchema, Schema 
newSchema,
+                                              Object avro111Value, Object 
avro112Value) {
+    assertEquals(HoodieAvroUtils.rewritePrimaryType(avro111Value, oldSchema, 
newSchema),
+        HoodieAvroUtils.rewritePrimaryType(avro112Value, oldSchema, 
newSchema));
+  }
+
   @Test
   public void testReWriteAvroRecordWithNewSchema() {
     Schema nestedSchema = new 
Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD_STR);
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index 39c71016ceee..a03a098b2d9d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -75,8 +75,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
   }
 
   test("Test alter column types") {
-    // TODO: Fix reading ordering field with logical type on Spark 4.1 
(https://github.com/apache/hudi/issues/18606)
-    assume(!HoodieSparkUtils.gteqSpark4_1, "Disabled on Spark 4.1 - see 
HUDI-18606")
     withRecordType()(withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         val tableName = generateTableName
@@ -148,8 +146,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
   }
 
   test("Test alter column types 2") {
-    // TODO: Fix reading ordering field with logical type on Spark 4.1 
(https://github.com/apache/hudi/issues/18606)
-    assume(!HoodieSparkUtils.gteqSpark4_1, "Disabled on Spark 4.1 - see 
HUDI-18606")
     withTempDir { tmp =>
       Seq("cow", "mor").foreach { tableType =>
         val tableName = generateTableName
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
index b82b95af8662..16a7da260bcf 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala
@@ -19,7 +19,7 @@
 
 package org.apache.spark.sql.hudi.dml.others
 
-import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
HoodieSparkUtils, ScalaAssertionSupport}
+import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, 
ScalaAssertionSupport}
 import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES
 import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
 import org.apache.hudi.common.table.timeline.HoodieTimeline
@@ -1089,8 +1089,6 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase 
with ScalaAssertionSuppo
   }
 
   test("Test Different Type of PreCombineField") {
-    // TODO: Fix reading ordering field with logical type on Spark 4.1 
(https://github.com/apache/hudi/issues/18606)
-    assume(!HoodieSparkUtils.gteqSpark4_1, "Disabled on Spark 4.1 - see 
HUDI-18606")
     withTempDir { tmp =>
       withSQLConf("hoodie.payload.combined.schema.validate" -> "true") {
         val typeAndValue = Seq(
diff --git 
a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
 
b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 9ae690f1512e..c3c75c94dd72 100644
--- 
a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -129,6 +129,26 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
       case (BOOLEAN, BooleanType) => (updater, ordinal, value) =>
         updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
 
+      
//////////////////////////////////////////////////////////////////////////////////////////////
+      // BEGIN Hudi customization for Avro 1.12.1 (Spark 4.1)
+      //
+      // The Spark 4.1 profile pulls in Avro 1.12.1, which flipped the default 
of
+      // `GenericData`'s `fastReaderEnabled` flag from false to true (the
+      // `org.apache.avro.fastread` system property now defaults to "true"). 
With the fast reader
+      // enabled, `GenericDatumReader` applies the registered `Conversion`s 
and materializes
+      // `java.time.LocalDate` for date, `java.time.Instant` for 
timestamp-millis /
+      // timestamp-micros, and `java.time.LocalDateTime` for 
local-timestamp-millis /
+      // local-timestamp-micros. Avro 1.12.0 (Spark 4.0) and Avro 1.11.x 
(Spark 3.5 and earlier)
+      // left the fast reader off, so those profiles exposed the raw `Integer` 
/ `Long`. The
+      // blanket `value.asInstanceOf[Long]` / `asInstanceOf[Int]` used by the 
upstream Spark 4.0
+      // deserializer fails on the java.time forms. The fallbacks below accept 
either form and
+      // normalize to Catalyst's epoch-micros Long / epoch-day Int.
+      //
+      // This change is read-side only — the on-wire encoding for these 
logical types is fixed by
+      // the Avro spec (int / long) and is identical across Avro versions, so 
storage bytes are
+      // unaffected and writer/reader compatibility across Spark profiles is 
preserved.
+      
//////////////////////////////////////////////////////////////////////////////////////////////
+
       case (INT, IntegerType) => (updater, ordinal, value) =>
         value match {
           case localDate: java.time.LocalDate =>
@@ -196,6 +216,8 @@ private[sql] class AvroDeserializer(rootAvroType: Schema,
         case other => throw new IncompatibleSchemaException(errorPrefix +
           s"Avro logical type $other cannot be converted to SQL type 
${TimestampNTZType.sql}.")
       }
+      // END Hudi customization for Avro 1.12.1 (Spark 4.1)
+      
//////////////////////////////////////////////////////////////////////////////////////////////
 
       // Before we upgrade Avro to 1.8 for logical type support, spark-avro 
converts Long to Date.
       // For backward compatibility, we still keep this conversion.
diff --git 
a/hudi-spark-datasource/hudi-spark4.1.x/src/test/scala/org/apache/hudi/TestSpark4_1AvroLogicalTypeBytes.scala
 
b/hudi-spark-datasource/hudi-spark4.1.x/src/test/scala/org/apache/hudi/TestSpark4_1AvroLogicalTypeBytes.scala
new file mode 100644
index 000000000000..afb3492f0c2a
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark4.1.x/src/test/scala/org/apache/hudi/TestSpark4_1AvroLogicalTypeBytes.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi
+
+import org.apache.avro.Schema
+import org.apache.avro.generic.{GenericDatumWriter, GenericRecord}
+import org.apache.avro.io.EncoderFactory
+import org.apache.spark.sql.avro.HoodieSpark4_1AvroSerializer
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
+import org.junit.jupiter.api.Test
+
+import java.io.ByteArrayOutputStream
+import java.time.LocalDate
+
+/**
+ * Validates the storage-byte invariant under the Spark 4.1 profile, which 
pulls in Avro 1.12.1.
+ *
+ * The Avro on-wire encoding for `date`, `timestamp-millis`, and 
`timestamp-micros` logical types
+ * is fixed by spec to int / long, and is identical across all Avro versions 
Hudi supports
+ * (1.11.x, 1.12.0, 1.12.1). As long as Hudi's Spark→Avro path 
(`HoodieSpark4_1AvroSerializer` →
+ * `AvroSerializer`) emits raw `java.lang.Long` / `java.lang.Integer` into the 
`GenericRecord` —
+ * and never `java.time.Instant` / `java.time.LocalDate` — the bytes Hudi 
writes on Spark 4.1 are
+ * bit-identical to what it writes on Spark 3.5 / 4.0, preserving 
forward/backward compatibility
+ * for readers across profiles.
+ *
+ * This test pins that invariant down:
+ *   1. The serializer outputs raw primitives in the GenericRecord (not 
java.time types).
+ *   2. `GenericDatumWriter` produces the expected canonical bytes for known 
values.
+ */
+class TestSpark4_1AvroLogicalTypeBytes {
+
+  // 2024-05-20T00:00:00.123456Z
+  private val epochMicros: Long = 1716163200_000000L + 123456L
+  private val epochMillis: Long = 1716163200_000L + 123L
+  private val epochDay: Int = LocalDate.of(2024, 5, 20).toEpochDay.toInt
+
+  private val avroSchema: Schema = new Schema.Parser().parse(
+    """{"type":"record","name":"r","fields":[
+      |{"name":"d","type":{"type":"int","logicalType":"date"}},
+      
|{"name":"ts_millis","type":{"type":"long","logicalType":"timestamp-millis"}},
+      
|{"name":"ts_micros","type":{"type":"long","logicalType":"timestamp-micros"}}
+      |]}""".stripMargin)
+
+  private val sparkSchema: StructType = StructType(Seq(
+    StructField("d", DataTypes.DateType, nullable = false),
+    StructField("ts_millis", DataTypes.TimestampType, nullable = false),
+    StructField("ts_micros", DataTypes.TimestampType, nullable = false)
+  ))
+
+  // Catalyst representation: epoch-day (Int) for DateType, epoch-micros 
(Long) for TimestampType.
+  // ts_millis is also passed as micros in catalyst — AvroSerializer converts 
to millis on write.
+  private def serializeFixtureRow(): GenericRecord = {
+    val serializer = new HoodieSpark4_1AvroSerializer(sparkSchema, avroSchema, 
nullable = false)
+    serializer.serialize(InternalRow(epochDay, epochMillis * 1000L, 
epochMicros)).asInstanceOf[GenericRecord]
+  }
+
+  @Test
+  def testSerializerEmitsPrimitivesNotJavaTime(): Unit = {
+    val record = serializeFixtureRow()
+
+    // The on-disk byte stability for cross-Spark-version compatibility hinges 
on the GenericRecord
+    // holding the Avro primitive form (Integer / Long), not the java.time 
form. If the serializer
+    // ever started emitting Instants/LocalDates, Avro 1.12.1's 
GenericDatumWriter would route the
+    // value through a Conversion before encoding. Even though the resulting 
bytes would still be
+    // spec-compliant, that path is harder to reason about, so we pin the 
primitive contract here.
+    assertTrue(record.get("d").isInstanceOf[java.lang.Integer],
+      s"expected raw Integer for date logical type, got 
${record.get("d").getClass}")
+    assertTrue(record.get("ts_millis").isInstanceOf[java.lang.Long],
+      s"expected raw Long for timestamp-millis logical type, got 
${record.get("ts_millis").getClass}")
+    assertTrue(record.get("ts_micros").isInstanceOf[java.lang.Long],
+      s"expected raw Long for timestamp-micros logical type, got 
${record.get("ts_micros").getClass}")
+
+    assertEquals(epochDay, record.get("d"))
+    assertEquals(epochMillis, record.get("ts_millis"))
+    assertEquals(epochMicros, record.get("ts_micros"))
+  }
+
+  @Test
+  def testEncodedBytesMatchAvroSpec(): Unit = {
+    val baos = new ByteArrayOutputStream()
+    val encoder = EncoderFactory.get().binaryEncoder(baos, null)
+    val writer = new GenericDatumWriter[GenericRecord](avroSchema)
+    writer.write(serializeFixtureRow(), encoder)
+    encoder.flush()
+    val bytes = baos.toByteArray
+
+    // Compute the expected canonical bytes manually using Avro's zig-zag 
varlong encoding (spec).
+    // This computation is independent of the running Avro version and proves 
that whatever bytes
+    // Avro 1.12.1 writes here equal what Avro 1.12.0 / 1.11.x would write.
+    val expected = new ByteArrayOutputStream()
+    writeZigZagLong(expected, epochDay.toLong)
+    writeZigZagLong(expected, epochMillis)
+    writeZigZagLong(expected, epochMicros)
+    assertEquals(expected.toByteArray.toSeq, bytes.toSeq,
+      "encoded bytes must match the spec-defined Avro encoding regardless of 
the Avro version on the classpath")
+  }
+
+  // Avro's variable-length zig-zag long encoding, per the Avro spec — used 
here as an independent
+  // reference so the test does not depend on any Avro classes for the 
expected-bytes computation.
+  private def writeZigZagLong(out: ByteArrayOutputStream, value: Long): Unit = 
{
+    var n = (value << 1) ^ (value >> 63)
+    while ((n & ~0x7FL) != 0) {
+      out.write(((n & 0x7F) | 0x80).toInt)
+      n >>>= 7
+    }
+    out.write(n.toInt)
+  }
+}


Reply via email to