This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.2.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 4a0a7f2797d62294a9b4650cc77f2a8d625b5b0d Author: Y Ethan Guo <[email protected]> AuthorDate: Tue May 19 17:20:41 2026 -0700 fix(spark): handle Avro 1.12 logical type values in Spark 4.1 read path (#18773) --- .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 145 +++++++++++++++++++-- .../apache/hudi/avro/HoodieAvroWrapperUtils.java | 26 +++- .../org/apache/hudi/avro/TestHoodieAvroUtils.java | 144 ++++++++++++++++++++ .../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala | 4 - .../sql/hudi/dml/others/TestMergeIntoTable.scala | 4 +- .../apache/spark/sql/avro/AvroDeserializer.scala | 22 ++++ .../hudi/TestSpark4_1AvroLogicalTypeBytes.scala | 127 ++++++++++++++++++ 7 files changed, 453 insertions(+), 19 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java index 654eb1660376..7c55f441d200 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java @@ -36,6 +36,7 @@ import org.apache.avro.AvroRuntimeException; import org.apache.avro.Conversions; import org.apache.avro.Conversions.DecimalConversion; import org.apache.avro.JsonProperties; +import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.LogicalTypes.Decimal; import org.apache.avro.Schema; @@ -70,8 +71,11 @@ import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.sql.Date; import java.sql.Timestamp; +import java.time.Instant; import java.time.LocalDate; +import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.ZoneOffset; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.Arrays; @@ -671,20 +675,42 @@ public class HoodieAvroUtils { * <p> * Decimal Data Type is converted to actual decimal value instead of bytes/fixed which is how it is * represented/stored in parquet. + * <p> + * <b>Avro version compatibility:</b> Avro 1.12.1 (pulled in by the Spark 4.1 profile) flipped the + * default of {@code GenericData}'s {@code fastReaderEnabled} flag from {@code false} to + * {@code true} (the {@code org.apache.avro.fastread} system property now defaults to + * {@code "true"}). With the fast reader enabled, {@code GenericDatumReader} applies the + * registered {@code Conversion}s during decode and materializes {@link LocalDate} for date, + * {@link Instant} for timestamp-millis / timestamp-micros, and {@link LocalDateTime} for + * local-timestamp-millis / local-timestamp-micros. Avro 1.12.0 (Spark 4.0) and Avro 1.11.x + * (Spark 3.5 and earlier) left the fast reader off by default, so those profiles exposed the raw + * {@link Integer} / {@link Long}. This method normalizes both forms to the same stable + * representation, so callers (precombine/ordering comparison, partition-path and record-key + * derivation, etc.) behave identically across Avro versions and across writer/reader Spark + * version combinations. The on-disk byte format is unaffected by this normalization: Avro's wire + * encoding for these logical types is fixed by spec to int / long and is identical across Avro + * versions; only the in-memory Java type returned by the reader changed. * * @param fieldSchema avro field schema * @param fieldValue avro field value * @return field value either converted (for certain data types) or as it is. */ public static Object convertValueForAvroLogicalTypes(Schema fieldSchema, Object fieldValue, boolean consistentLogicalTimestampEnabled) { - if (fieldSchema.getLogicalType() == LogicalTypes.date()) { - return LocalDate.ofEpochDay(Long.parseLong(fieldValue.toString())); - } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMillis() && consistentLogicalTimestampEnabled) { - return new Timestamp(Long.parseLong(fieldValue.toString())); - } else if (fieldSchema.getLogicalType() == LogicalTypes.timestampMicros() && consistentLogicalTimestampEnabled) { - return new Timestamp(Long.parseLong(fieldValue.toString()) / 1000); - } else if (fieldSchema.getLogicalType() instanceof LogicalTypes.Decimal) { - Decimal dc = (Decimal) fieldSchema.getLogicalType(); + LogicalType logicalType = fieldSchema.getLogicalType(); + if (logicalType == LogicalTypes.date()) { + return LocalDate.ofEpochDay(extractEpochDay(fieldValue)); + } else if (logicalType == LogicalTypes.timestampMillis()) { + long millis = extractEpochMillis(fieldValue); + return consistentLogicalTimestampEnabled ? new Timestamp(millis) : millis; + } else if (logicalType == LogicalTypes.timestampMicros()) { + long micros = extractEpochMicros(fieldValue); + return consistentLogicalTimestampEnabled ? new Timestamp(micros / 1000) : micros; + } else if (logicalType == LogicalTypes.localTimestampMillis()) { + return extractLocalEpochMillis(fieldValue); + } else if (logicalType == LogicalTypes.localTimestampMicros()) { + return extractLocalEpochMicros(fieldValue); + } else if (logicalType instanceof LogicalTypes.Decimal) { + Decimal dc = (Decimal) logicalType; DecimalConversion decimalConversion = new DecimalConversion(); if (fieldSchema.getType() == Schema.Type.FIXED) { return decimalConversion.fromFixed((GenericFixed) fieldValue, fieldSchema, @@ -700,6 +726,101 @@ public class HoodieAvroUtils { return fieldValue; } + // The extract* helpers below accept either the Avro primitive form (Integer / Long, returned by + // Avro 1.12.0 / 1.11.x) or the java.time form (returned by Avro 1.12.1 with its default + // fastReaderEnabled=true), and return the canonical primitive for the same underlying bytes. + // See the javadoc on convertValueForAvroLogicalTypes for context. + + private static long extractEpochDay(Object fieldValue) { + if (fieldValue instanceof LocalDate) { + return ((LocalDate) fieldValue).toEpochDay(); + } + if (fieldValue instanceof Number) { + return ((Number) fieldValue).longValue(); + } + return Long.parseLong(fieldValue.toString()); + } + + private static long extractEpochMillis(Object fieldValue) { + if (fieldValue instanceof Instant) { + return ((Instant) fieldValue).toEpochMilli(); + } + if (fieldValue instanceof Number) { + return ((Number) fieldValue).longValue(); + } + return Long.parseLong(fieldValue.toString()); + } + + private static long extractEpochMicros(Object fieldValue) { + if (fieldValue instanceof Instant) { + Instant instant = (Instant) fieldValue; + return Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1_000_000L), instant.getNano() / 1000L); + } + if (fieldValue instanceof Number) { + return ((Number) fieldValue).longValue(); + } + return Long.parseLong(fieldValue.toString()); + } + + private static long extractLocalEpochMillis(Object fieldValue) { + if (fieldValue instanceof LocalDateTime) { + return ((LocalDateTime) fieldValue).toInstant(ZoneOffset.UTC).toEpochMilli(); + } + if (fieldValue instanceof Number) { + return ((Number) fieldValue).longValue(); + } + return Long.parseLong(fieldValue.toString()); + } + + private static long extractLocalEpochMicros(Object fieldValue) { + if (fieldValue instanceof LocalDateTime) { + Instant instant = ((LocalDateTime) fieldValue).toInstant(ZoneOffset.UTC); + return Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1_000_000L), instant.getNano() / 1000L); + } + if (fieldValue instanceof Number) { + return ((Number) fieldValue).longValue(); + } + return Long.parseLong(fieldValue.toString()); + } + + /** + * If {@code schema} carries a date / timestamp logical type and {@code value} is in the + * java.time form ({@link LocalDate} / {@link Instant} / {@link LocalDateTime}), normalize it + * to the Avro primitive form ({@link Integer} for date, {@link Long} for timestamp-millis, + * timestamp-micros, local-timestamp-millis, and local-timestamp-micros). Used at the entry of + * schema-evolution rewrite paths whose legacy code does unguarded {@code (Integer)} / + * {@code (Long)} casts on field values, which would otherwise fail under Avro 1.12.1 (Spark + * 4.1) where the {@code fastReaderEnabled} default is {@code true} and {@code GenericDatumReader} + * materializes java.time types instead of primitives. For non-logical-type schemas, primitive + * inputs, or null, this is a no-op. The on-disk byte format is unaffected - this is purely + * about in-memory Java type. + */ + private static Object normalizeAvroLogicalTypeToPrimitive(Object value, Schema schema) { + if (value == null || schema == null) { + return value; + } + LogicalType lt = schema.getLogicalType(); + if (lt == null) { + return value; + } + if (lt == LogicalTypes.date()) { + return value instanceof LocalDate ? (int) ((LocalDate) value).toEpochDay() : value; + } + if (lt == LogicalTypes.timestampMillis()) { + return value instanceof Instant ? extractEpochMillis(value) : value; + } + if (lt == LogicalTypes.timestampMicros()) { + return value instanceof Instant ? extractEpochMicros(value) : value; + } + if (lt == LogicalTypes.localTimestampMillis()) { + return value instanceof LocalDateTime ? extractLocalEpochMillis(value) : value; + } + if (lt == LogicalTypes.localTimestampMicros()) { + return value instanceof LocalDateTime ? extractLocalEpochMicros(value) : value; + } + return value; + } + /** * Gets record column values into object array. * @@ -931,6 +1052,14 @@ public class HoodieAvroUtils { } public static Object rewritePrimaryType(Object oldValue, Schema oldSchema, Schema newSchema) { + // Normalize any java.time form (LocalDate / Instant / LocalDateTime) to the Avro primitive + // form (Integer / Long) before doing any numeric / string conversion. The legacy branches + // below explicitly cast to Integer / Long; under Avro 1.12.1 (Spark 4.1 profile) the + // fastReaderEnabled default flipped to true and GenericDatumReader materializes java.time + // types for date / timestamp logical fields, which would break those casts. See + // convertValueForAvroLogicalTypes for the broader context — this is a read-side normalization + // only and does not affect on-disk byte format. + oldValue = normalizeAvroLogicalTypeToPrimitive(oldValue, oldSchema); if (oldSchema.getType() == newSchema.getType()) { switch (oldSchema.getType()) { case NULL: diff --git a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWrapperUtils.java b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWrapperUtils.java index 91590a0210d5..d0eaec5f05c9 100644 --- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWrapperUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroWrapperUtils.java @@ -206,14 +206,18 @@ public class HoodieAvroWrapperUtils { return null; } else if (DateWrapper.class.getSimpleName().equals(wrapperClassName)) { ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord); - return Date.valueOf(LocalDate.ofEpochDay((Integer) ((GenericRecord) avroValueWrapper).get(0))); + // Avro 1.12.1 (Spark 4.1 profile) defaults fastReaderEnabled=true, so GenericDatumReader returns + // java.time.LocalDate for date logical types; Avro 1.12.0 (Spark 4.0) and earlier return Integer. + // Accept both — see HoodieAvroUtils#convertValueForAvroLogicalTypes for the broader context. + return Date.valueOf(LocalDate.ofEpochDay(toEpochDay(((GenericRecord) avroValueWrapper).get(0)))); } else if (LocalDateWrapper.class.getSimpleName().equals(wrapperClassName)) { ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord); - return LocalDate.ofEpochDay((Integer) ((GenericRecord) avroValueWrapper).get(0)); + return LocalDate.ofEpochDay(toEpochDay(((GenericRecord) avroValueWrapper).get(0))); } else if (TimestampMicrosWrapper.class.getSimpleName().equals(wrapperClassName)) { ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord); - Instant instant = microsToInstant((Long) ((GenericRecord) avroValueWrapper).get(0)); - return Timestamp.from(instant); + // Avro 1.12.1 (Spark 4.1 profile) defaults fastReaderEnabled=true, so timestamp-micros decodes + // to java.time.Instant; Avro 1.12.0 (Spark 4.0) and earlier return Long. + return Timestamp.from(toInstantFromMicros(((GenericRecord) avroValueWrapper).get(0))); } else if (DecimalWrapper.class.getSimpleName().equals(wrapperClassName)) { Schema valueSchema = DecimalWrapper.SCHEMA$.getField("value").schema(); ValidationUtils.checkArgument(avroValueWrapper instanceof GenericRecord); @@ -340,4 +344,18 @@ public class HoodieAvroWrapperUtils { GenericRecord genRec = (GenericRecord) val; return (Comparable<?>) genRec.get("value"); } + + private static int toEpochDay(Object dateFieldValue) { + if (dateFieldValue instanceof LocalDate) { + return Math.toIntExact(((LocalDate) dateFieldValue).toEpochDay()); + } + return ((Number) dateFieldValue).intValue(); + } + + private static Instant toInstantFromMicros(Object timestampMicrosFieldValue) { + if (timestampMicrosFieldValue instanceof Instant) { + return (Instant) timestampMicrosFieldValue; + } + return microsToInstant(((Number) timestampMicrosFieldValue).longValue()); + } } diff --git a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java index e24501ec7754..0028b322b99e 100644 --- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java @@ -99,7 +99,10 @@ import java.math.RoundingMode; import java.nio.ByteBuffer; import java.sql.Date; import java.sql.Timestamp; +import java.time.Instant; import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.time.temporal.ChronoUnit; import java.util.ArrayDeque; import java.util.ArrayList; @@ -499,6 +502,147 @@ public class TestHoodieAvroUtils { assertEquals(0, buffer.position()); } + // Cross-Avro-version test fixtures: the same date / timestamp value expressed in both the Avro + // primitive form (Integer / Long) — what GenericDatumReader returns under Avro 1.12.0 / 1.11.x — + // and the java.time form (LocalDate / Instant / LocalDateTime) — what it returns under Avro + // 1.12.1 with its default fastReaderEnabled=true. The contract under test is that both forms + // produce identical downstream behavior in every Hudi read-side path that touches a logical-typed + // field. Sharing these fixtures across tests pins the same exact value through every path. + private static final long FIXTURE_EPOCH_MICROS = 1716163200_000000L + 123456L; // 2024-05-20T00:00:00.123456Z + private static final long FIXTURE_EPOCH_MILLIS = 1716163200_000L + 123L; // 2024-05-20T00:00:00.123Z + private static final int FIXTURE_EPOCH_DAY = (int) LocalDate.of(2024, 5, 20).toEpochDay(); + private static final Instant FIXTURE_MILLIS_INSTANT = Instant.ofEpochMilli(FIXTURE_EPOCH_MILLIS); + private static final Instant FIXTURE_MICROS_INSTANT = Instant.ofEpochSecond( + FIXTURE_EPOCH_MICROS / 1_000_000L, (FIXTURE_EPOCH_MICROS % 1_000_000L) * 1000L); + private static final LocalDate FIXTURE_LOCAL_DATE = LocalDate.ofEpochDay(FIXTURE_EPOCH_DAY); + private static final LocalDateTime FIXTURE_LOCAL_DT_MILLIS = + LocalDateTime.ofInstant(FIXTURE_MILLIS_INSTANT, ZoneOffset.UTC); + private static final LocalDateTime FIXTURE_LOCAL_DT_MICROS = + LocalDateTime.ofInstant(FIXTURE_MICROS_INSTANT, ZoneOffset.UTC); + + private static final Schema DATE_SCHEMA = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT)); + private static final Schema TS_MILLIS_SCHEMA = LogicalTypes.timestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); + private static final Schema TS_MICROS_SCHEMA = LogicalTypes.timestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); + private static final Schema LOCAL_TS_MILLIS_SCHEMA = LogicalTypes.localTimestampMillis().addToSchema(Schema.create(Schema.Type.LONG)); + private static final Schema LOCAL_TS_MICROS_SCHEMA = LogicalTypes.localTimestampMicros().addToSchema(Schema.create(Schema.Type.LONG)); + + /** + * Cross-Avro-version invariant: {@link HoodieAvroUtils#convertValueForAvroLogicalTypes} must produce + * the same canonical Java value whether the GenericRecord field holds the Avro primitive form + * ({@code Long}/{@code Integer}, returned by Avro 1.12.0 and 1.11.x) or the java.time form + * ({@code Instant}/{@code LocalDate}/{@code LocalDateTime}, returned by Avro 1.12.1 with its + * default {@code fastReaderEnabled=true}). This is the contract that lets a Spark 4.1 reader + * compare ordering values against records written by any earlier Spark profile without divergence. + */ + @Test + public void testConvertValueForAvroLogicalTypesCrossAvroVersion() { + // date + assertEquals(HoodieAvroUtils.convertValueForAvroLogicalTypes(DATE_SCHEMA, FIXTURE_EPOCH_DAY, false), + HoodieAvroUtils.convertValueForAvroLogicalTypes(DATE_SCHEMA, FIXTURE_LOCAL_DATE, false)); + + // timestamp-millis, consistent=false → epoch-millis Long + assertEquals(FIXTURE_EPOCH_MILLIS, HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MILLIS_SCHEMA, FIXTURE_EPOCH_MILLIS, false)); + assertEquals(FIXTURE_EPOCH_MILLIS, HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MILLIS_SCHEMA, FIXTURE_MILLIS_INSTANT, false)); + + // timestamp-millis, consistent=true → java.sql.Timestamp + assertEquals(HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MILLIS_SCHEMA, FIXTURE_EPOCH_MILLIS, true), + HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MILLIS_SCHEMA, FIXTURE_MILLIS_INSTANT, true)); + + // timestamp-micros, consistent=false → epoch-micros Long + assertEquals(FIXTURE_EPOCH_MICROS, HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MICROS_SCHEMA, FIXTURE_EPOCH_MICROS, false)); + assertEquals(FIXTURE_EPOCH_MICROS, HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MICROS_SCHEMA, FIXTURE_MICROS_INSTANT, false)); + + // timestamp-micros, consistent=true → java.sql.Timestamp (millis precision, matches Avro 1.11 behavior) + assertEquals(HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MICROS_SCHEMA, FIXTURE_EPOCH_MICROS, true), + HoodieAvroUtils.convertValueForAvroLogicalTypes(TS_MICROS_SCHEMA, FIXTURE_MICROS_INSTANT, true)); + + // local-timestamp-millis / local-timestamp-micros → Long + assertEquals(FIXTURE_EPOCH_MILLIS, HoodieAvroUtils.convertValueForAvroLogicalTypes(LOCAL_TS_MILLIS_SCHEMA, FIXTURE_EPOCH_MILLIS, false)); + assertEquals(FIXTURE_EPOCH_MILLIS, HoodieAvroUtils.convertValueForAvroLogicalTypes(LOCAL_TS_MILLIS_SCHEMA, FIXTURE_LOCAL_DT_MILLIS, false)); + assertEquals(FIXTURE_EPOCH_MICROS, HoodieAvroUtils.convertValueForAvroLogicalTypes(LOCAL_TS_MICROS_SCHEMA, FIXTURE_EPOCH_MICROS, false)); + assertEquals(FIXTURE_EPOCH_MICROS, HoodieAvroUtils.convertValueForAvroLogicalTypes(LOCAL_TS_MICROS_SCHEMA, FIXTURE_LOCAL_DT_MICROS, false)); + } + + /** + * Cross-Avro-version invariant for ordering-value extraction: a record whose timestamp/date field + * holds the java.time form (Avro 1.12.1 fast reader) must yield the same comparable ordering value + * as one holding the primitive form (Avro 1.12.0 / 1.11.x). Without this property, + * {@code DefaultHoodieRecordPayload.compareOrderingVal} throws ClassCastException when one side of + * the comparison was read via Avro 1.12.1 and the other built from a Long (which is what Hudi's + * Spark→Avro serializer always produces). + */ + @Test + public void testGetNestedFieldValOrderingInvariantAcrossAvroVersions() { + String schemaStr = "{\"type\":\"record\",\"name\":\"r\",\"fields\":[" + + "{\"name\":\"id\",\"type\":\"string\"}," + + "{\"name\":\"ts\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}}," + + "{\"name\":\"d\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}}]}"; + Schema schema = new Schema.Parser().parse(schemaStr); + + GenericRecord avro111Form = new GenericData.Record(schema); + avro111Form.put("id", "k"); + avro111Form.put("ts", FIXTURE_EPOCH_MICROS); + avro111Form.put("d", FIXTURE_EPOCH_DAY); + + GenericRecord avro112Form = new GenericData.Record(schema); + avro112Form.put("id", "k"); + avro112Form.put("ts", FIXTURE_MICROS_INSTANT); + avro112Form.put("d", FIXTURE_LOCAL_DATE); + + Object ts111 = HoodieAvroUtils.getNestedFieldVal(avro111Form, "ts", true, false); + Object ts112 = HoodieAvroUtils.getNestedFieldVal(avro112Form, "ts", true, false); + assertEquals(ts111, ts112); + // ordering compareTo must be symmetric and produce 0 — this is exactly what + // DefaultHoodieRecordPayload.compareOrderingVal relies on. + assertEquals(0, ((Comparable<Object>) ts111).compareTo(ts112)); + assertEquals(0, ((Comparable<Object>) ts112).compareTo(ts111)); + + Object d111 = HoodieAvroUtils.getNestedFieldVal(avro111Form, "d", true, false); + Object d112 = HoodieAvroUtils.getNestedFieldVal(avro112Form, "d", true, false); + assertEquals(d111, d112); + assertEquals(0, ((Comparable<Object>) d111).compareTo(d112)); + } + + /** + * Cross-Avro-version invariant for {@link HoodieAvroUtils#rewritePrimaryType}: schema-evolution + * paths that legacy-cast {@code (Integer) oldValue} / {@code (Long) oldValue} (e.g. ALTER COLUMN + * TYPE from date → string, or timestamp-millis → timestamp-micros) must accept the java.time + * form (Avro 1.12.1 fast reader) as well as the primitive form (Avro 1.12.0 / 1.11.x), since the + * on-disk byte format is identical and a Spark 4.1 reader can be evolving records written by any + * version. + */ + @Test + public void testRewritePrimaryTypeCrossAvroVersion() { + Schema stringSchema = Schema.create(Schema.Type.STRING); + Schema longSchema = Schema.create(Schema.Type.LONG); + Schema floatSchema = Schema.create(Schema.Type.FLOAT); + Schema doubleSchema = Schema.create(Schema.Type.DOUBLE); + + // For each (oldSchema, target newSchema), both the primitive and java.time inputs must yield the + // same rewritten value — same on-disk semantics, no divergence between Spark profiles. + assertRewriteEquivalent(DATE_SCHEMA, stringSchema, FIXTURE_EPOCH_DAY, FIXTURE_LOCAL_DATE); + assertRewriteEquivalent(DATE_SCHEMA, longSchema, FIXTURE_EPOCH_DAY, FIXTURE_LOCAL_DATE); + assertRewriteEquivalent(DATE_SCHEMA, floatSchema, FIXTURE_EPOCH_DAY, FIXTURE_LOCAL_DATE); + assertRewriteEquivalent(DATE_SCHEMA, doubleSchema, FIXTURE_EPOCH_DAY, FIXTURE_LOCAL_DATE); + + assertRewriteEquivalent(TS_MILLIS_SCHEMA, stringSchema, FIXTURE_EPOCH_MILLIS, FIXTURE_MILLIS_INSTANT); + assertRewriteEquivalent(TS_MILLIS_SCHEMA, floatSchema, FIXTURE_EPOCH_MILLIS, FIXTURE_MILLIS_INSTANT); + assertRewriteEquivalent(TS_MILLIS_SCHEMA, doubleSchema, FIXTURE_EPOCH_MILLIS, FIXTURE_MILLIS_INSTANT); + + // In-place logical-type changes: the LONG → LONG branches in rewritePrimaryType explicitly cast + // to (Long) and would fail on Instant / LocalDateTime under Avro 1.12.1. + assertRewriteEquivalent(TS_MILLIS_SCHEMA, TS_MICROS_SCHEMA, FIXTURE_EPOCH_MILLIS, FIXTURE_MILLIS_INSTANT); + assertRewriteEquivalent(TS_MICROS_SCHEMA, TS_MILLIS_SCHEMA, FIXTURE_EPOCH_MICROS, FIXTURE_MICROS_INSTANT); + assertRewriteEquivalent(LOCAL_TS_MILLIS_SCHEMA, LOCAL_TS_MICROS_SCHEMA, FIXTURE_EPOCH_MILLIS, FIXTURE_LOCAL_DT_MILLIS); + assertRewriteEquivalent(LOCAL_TS_MICROS_SCHEMA, LOCAL_TS_MILLIS_SCHEMA, FIXTURE_EPOCH_MICROS, FIXTURE_LOCAL_DT_MICROS); + } + + private static void assertRewriteEquivalent(Schema oldSchema, Schema newSchema, + Object avro111Value, Object avro112Value) { + assertEquals(HoodieAvroUtils.rewritePrimaryType(avro111Value, oldSchema, newSchema), + HoodieAvroUtils.rewritePrimaryType(avro112Value, oldSchema, newSchema)); + } + @Test public void testReWriteAvroRecordWithNewSchema() { Schema nestedSchema = new Schema.Parser().parse(SCHEMA_WITH_NESTED_FIELD_STR); diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala index 39c71016ceee..a03a098b2d9d 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala @@ -75,8 +75,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } test("Test alter column types") { - // TODO: Fix reading ordering field with logical type on Spark 4.1 (https://github.com/apache/hudi/issues/18606) - assume(!HoodieSparkUtils.gteqSpark4_1, "Disabled on Spark 4.1 - see HUDI-18606") withRecordType()(withTempDir { tmp => Seq("cow", "mor").foreach { tableType => val tableName = generateTableName @@ -148,8 +146,6 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { } test("Test alter column types 2") { - // TODO: Fix reading ordering field with logical type on Spark 4.1 (https://github.com/apache/hudi/issues/18606) - assume(!HoodieSparkUtils.gteqSpark4_1, "Disabled on Spark 4.1 - see HUDI-18606") withTempDir { tmp => Seq("cow", "mor").foreach { tableType => val tableName = generateTableName diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala index b82b95af8662..16a7da260bcf 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/others/TestMergeIntoTable.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi.dml.others -import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieSparkUtils, ScalaAssertionSupport} +import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, ScalaAssertionSupport} import org.apache.hudi.DataSourceWriteOptions.SPARK_SQL_OPTIMIZED_WRITES import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig} import org.apache.hudi.common.table.timeline.HoodieTimeline @@ -1089,8 +1089,6 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo } test("Test Different Type of PreCombineField") { - // TODO: Fix reading ordering field with logical type on Spark 4.1 (https://github.com/apache/hudi/issues/18606) - assume(!HoodieSparkUtils.gteqSpark4_1, "Disabled on Spark 4.1 - see HUDI-18606") withTempDir { tmp => withSQLConf("hoodie.payload.combined.schema.validate" -> "true") { val typeAndValue = Seq( diff --git a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala index 9ae690f1512e..c3c75c94dd72 100644 --- a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala +++ b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala @@ -129,6 +129,26 @@ private[sql] class AvroDeserializer(rootAvroType: Schema, case (BOOLEAN, BooleanType) => (updater, ordinal, value) => updater.setBoolean(ordinal, value.asInstanceOf[Boolean]) + ////////////////////////////////////////////////////////////////////////////////////////////// + // BEGIN Hudi customization for Avro 1.12.1 (Spark 4.1) + // + // The Spark 4.1 profile pulls in Avro 1.12.1, which flipped the default of + // `GenericData`'s `fastReaderEnabled` flag from false to true (the + // `org.apache.avro.fastread` system property now defaults to "true"). With the fast reader + // enabled, `GenericDatumReader` applies the registered `Conversion`s and materializes + // `java.time.LocalDate` for date, `java.time.Instant` for timestamp-millis / + // timestamp-micros, and `java.time.LocalDateTime` for local-timestamp-millis / + // local-timestamp-micros. Avro 1.12.0 (Spark 4.0) and Avro 1.11.x (Spark 3.5 and earlier) + // left the fast reader off, so those profiles exposed the raw `Integer` / `Long`. The + // blanket `value.asInstanceOf[Long]` / `asInstanceOf[Int]` used by the upstream Spark 4.0 + // deserializer fails on the java.time forms. The fallbacks below accept either form and + // normalize to Catalyst's epoch-micros Long / epoch-day Int. + // + // This change is read-side only — the on-wire encoding for these logical types is fixed by + // the Avro spec (int / long) and is identical across Avro versions, so storage bytes are + // unaffected and writer/reader compatibility across Spark profiles is preserved. + ////////////////////////////////////////////////////////////////////////////////////////////// + case (INT, IntegerType) => (updater, ordinal, value) => value match { case localDate: java.time.LocalDate => @@ -196,6 +216,8 @@ private[sql] class AvroDeserializer(rootAvroType: Schema, case other => throw new IncompatibleSchemaException(errorPrefix + s"Avro logical type $other cannot be converted to SQL type ${TimestampNTZType.sql}.") } + // END Hudi customization for Avro 1.12.1 (Spark 4.1) + ////////////////////////////////////////////////////////////////////////////////////////////// // Before we upgrade Avro to 1.8 for logical type support, spark-avro converts Long to Date. // For backward compatibility, we still keep this conversion. diff --git a/hudi-spark-datasource/hudi-spark4.1.x/src/test/scala/org/apache/hudi/TestSpark4_1AvroLogicalTypeBytes.scala b/hudi-spark-datasource/hudi-spark4.1.x/src/test/scala/org/apache/hudi/TestSpark4_1AvroLogicalTypeBytes.scala new file mode 100644 index 000000000000..afb3492f0c2a --- /dev/null +++ b/hudi-spark-datasource/hudi-spark4.1.x/src/test/scala/org/apache/hudi/TestSpark4_1AvroLogicalTypeBytes.scala @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi + +import org.apache.avro.Schema +import org.apache.avro.generic.{GenericDatumWriter, GenericRecord} +import org.apache.avro.io.EncoderFactory +import org.apache.spark.sql.avro.HoodieSpark4_1AvroSerializer +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} +import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue} +import org.junit.jupiter.api.Test + +import java.io.ByteArrayOutputStream +import java.time.LocalDate + +/** + * Validates the storage-byte invariant under the Spark 4.1 profile, which pulls in Avro 1.12.1. + * + * The Avro on-wire encoding for `date`, `timestamp-millis`, and `timestamp-micros` logical types + * is fixed by spec to int / long, and is identical across all Avro versions Hudi supports + * (1.11.x, 1.12.0, 1.12.1). As long as Hudi's Spark→Avro path (`HoodieSpark4_1AvroSerializer` → + * `AvroSerializer`) emits raw `java.lang.Long` / `java.lang.Integer` into the `GenericRecord` — + * and never `java.time.Instant` / `java.time.LocalDate` — the bytes Hudi writes on Spark 4.1 are + * bit-identical to what it writes on Spark 3.5 / 4.0, preserving forward/backward compatibility + * for readers across profiles. + * + * This test pins that invariant down: + * 1. The serializer outputs raw primitives in the GenericRecord (not java.time types). + * 2. `GenericDatumWriter` produces the expected canonical bytes for known values. + */ +class TestSpark4_1AvroLogicalTypeBytes { + + // 2024-05-20T00:00:00.123456Z + private val epochMicros: Long = 1716163200_000000L + 123456L + private val epochMillis: Long = 1716163200_000L + 123L + private val epochDay: Int = LocalDate.of(2024, 5, 20).toEpochDay.toInt + + private val avroSchema: Schema = new Schema.Parser().parse( + """{"type":"record","name":"r","fields":[ + |{"name":"d","type":{"type":"int","logicalType":"date"}}, + |{"name":"ts_millis","type":{"type":"long","logicalType":"timestamp-millis"}}, + |{"name":"ts_micros","type":{"type":"long","logicalType":"timestamp-micros"}} + |]}""".stripMargin) + + private val sparkSchema: StructType = StructType(Seq( + StructField("d", DataTypes.DateType, nullable = false), + StructField("ts_millis", DataTypes.TimestampType, nullable = false), + StructField("ts_micros", DataTypes.TimestampType, nullable = false) + )) + + // Catalyst representation: epoch-day (Int) for DateType, epoch-micros (Long) for TimestampType. + // ts_millis is also passed as micros in catalyst — AvroSerializer converts to millis on write. + private def serializeFixtureRow(): GenericRecord = { + val serializer = new HoodieSpark4_1AvroSerializer(sparkSchema, avroSchema, nullable = false) + serializer.serialize(InternalRow(epochDay, epochMillis * 1000L, epochMicros)).asInstanceOf[GenericRecord] + } + + @Test + def testSerializerEmitsPrimitivesNotJavaTime(): Unit = { + val record = serializeFixtureRow() + + // The on-disk byte stability for cross-Spark-version compatibility hinges on the GenericRecord + // holding the Avro primitive form (Integer / Long), not the java.time form. If the serializer + // ever started emitting Instants/LocalDates, Avro 1.12.1's GenericDatumWriter would route the + // value through a Conversion before encoding. Even though the resulting bytes would still be + // spec-compliant, that path is harder to reason about, so we pin the primitive contract here. + assertTrue(record.get("d").isInstanceOf[java.lang.Integer], + s"expected raw Integer for date logical type, got ${record.get("d").getClass}") + assertTrue(record.get("ts_millis").isInstanceOf[java.lang.Long], + s"expected raw Long for timestamp-millis logical type, got ${record.get("ts_millis").getClass}") + assertTrue(record.get("ts_micros").isInstanceOf[java.lang.Long], + s"expected raw Long for timestamp-micros logical type, got ${record.get("ts_micros").getClass}") + + assertEquals(epochDay, record.get("d")) + assertEquals(epochMillis, record.get("ts_millis")) + assertEquals(epochMicros, record.get("ts_micros")) + } + + @Test + def testEncodedBytesMatchAvroSpec(): Unit = { + val baos = new ByteArrayOutputStream() + val encoder = EncoderFactory.get().binaryEncoder(baos, null) + val writer = new GenericDatumWriter[GenericRecord](avroSchema) + writer.write(serializeFixtureRow(), encoder) + encoder.flush() + val bytes = baos.toByteArray + + // Compute the expected canonical bytes manually using Avro's zig-zag varlong encoding (spec). + // This computation is independent of the running Avro version and proves that whatever bytes + // Avro 1.12.1 writes here equal what Avro 1.12.0 / 1.11.x would write. + val expected = new ByteArrayOutputStream() + writeZigZagLong(expected, epochDay.toLong) + writeZigZagLong(expected, epochMillis) + writeZigZagLong(expected, epochMicros) + assertEquals(expected.toByteArray.toSeq, bytes.toSeq, + "encoded bytes must match the spec-defined Avro encoding regardless of the Avro version on the classpath") + } + + // Avro's variable-length zig-zag long encoding, per the Avro spec — used here as an independent + // reference so the test does not depend on any Avro classes for the expected-bytes computation. + private def writeZigZagLong(out: ByteArrayOutputStream, value: Long): Unit = { + var n = (value << 1) ^ (value >> 63) + while ((n & ~0x7FL) != 0) { + out.write(((n & 0x7F) | 0x80).toInt) + n >>>= 7 + } + out.write(n.toInt) + } +}
