This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit bb001f243b957adc6c544c59c2de50eb03a67e32
Author: Y Ethan Guo <[email protected]>
AuthorDate: Sat May 2 05:52:48 2026 -0700

    fix: Honor SparkSession overrides for rebase mode and timezone in 
compaction tasks (#18675)
    
    * Honor SparkSession overrides for rebase mode and timezone in compaction 
tasks
    
    When MOR compaction runs outside a Spark SQL execution context (e.g. a
    standalone CompactTask runner), `SQLConf.get` on the executor task thread
    returns a fresh fallback `SQLConf` with default values, not the user's
    SparkSession overrides. As a result, `Spark{3_3,3_4,3_5,4_0}Adapter
    .getDateTimeRebaseMode()` resolved to `EXCEPTION` even when the user had
    set `spark.sql.parquet.datetimeRebaseModeInWrite=LEGACY`, producing
    `SparkUpgradeException [INCONSISTENT_BEHAVIOR_CROSS_VERSION
    .WRITE_ANCIENT_DATETIME]` during compaction of MOR tables containing
    pre-1900 timestamps. The same gap affected
    `HoodieRowParquetWriteSupport.init()`'s `sessionLocalTimeZone` read.
    
    Adapter and WriteSupport now resolve the value in this order:
      1. SQLConf override (so `spark.conf.set(...)` on the SparkSession takes
         effect on the driver and inside SQL execution contexts).
      2. SparkConf via SparkEnv.get.conf (broadcast to every executor at
         startup, so user-set keys are honored on executor tasks running
         outside a SQL execution context).
      3. The ConfigEntry's own default (or SQLConf.sessionLocalTimeZone for
         the timezone helper).
    
    Adds TestSparkAdapterRebaseModePropagation (3 methods) covering rebase
    mode and timezone propagation into vanilla parallelize().map() task
    closures. Each test fails without the fix.
    
    * Apply fix to Spark4_1Adapter; use flatMap+Option to avoid null inside 
Option
    
    * Use SQLConf.getConf(entry, null) instead of getConfString(key, null)
    
    * Make Spark4_1 consistent with 3.x/4.0; add default-behavior test; trim 
scaladocs
    
    * Add unit test for resolveSessionLocalTimeZone in hudi-spark-client
    
    * Add SQLConf-override test method to lift coverage on new code
    
    * Drop redundant public modifiers from JUnit 5 test class and methods
    
    * Read expected default from SQLConf so test works on Spark 3.x and 4.1
    
    * Document why init() coverage lives in hudi-spark integration tests
    
    * Inline single-use Parquet metadata keys; keep only timeZone constant
    
    * Add SparkConf-branch test for resolveSessionLocalTimeZone
---
 .../storage/row/HoodieRowParquetWriteSupport.java  |  42 +++++-
 .../row/TestHoodieRowParquetWriteSupport.java      | 120 ++++++++++++++++
 .../TestSparkAdapterRebaseModeDefault.scala        |  97 +++++++++++++
 .../TestSparkAdapterRebaseModePropagation.scala    | 157 +++++++++++++++++++++
 .../apache/spark/sql/adapter/Spark3_3Adapter.scala |   9 +-
 .../apache/spark/sql/adapter/Spark3_4Adapter.scala |   9 +-
 .../apache/spark/sql/adapter/Spark3_5Adapter.scala |  17 ++-
 .../apache/spark/sql/adapter/Spark4_0Adapter.scala |   9 +-
 .../apache/spark/sql/adapter/Spark4_1Adapter.scala |  11 +-
 9 files changed, 465 insertions(+), 6 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
index 646fb2330833..262d4acc8ac4 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
@@ -57,6 +57,7 @@ import org.apache.spark.sql.catalyst.util.MapData;
 import org.apache.spark.sql.execution.datasources.DataSourceUtils;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
+import org.apache.spark.SparkEnv;
 import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.ArrayType;
 import org.apache.spark.sql.types.DataType;
@@ -116,6 +117,9 @@ public class HoodieRowParquetWriteSupport extends 
WriteSupport<InternalRow> {
   private static final String MAP_KEY_NAME = "key";
   private static final String MAP_VALUE_NAME = "value";
 
+  private static final String SESSION_LOCAL_TIME_ZONE_KEY = 
"spark.sql.session.timeZone";
+  private static final String PARQUET_METADATA_TIME_ZONE_KEY = 
"org.apache.spark.timeZone";
+
   @Getter
   private final Configuration hadoopConf;
   private final Option<HoodieBloomFilterWriteSupport<UTF8String>> 
bloomFilterWriteSupportOpt;
@@ -312,13 +316,49 @@ public class HoodieRowParquetWriteSupport extends 
WriteSupport<InternalRow> {
     return writers;
   }
 
+  /**
+   * Resolves the session-local timezone string. {@code SQLConf.get()} on a 
Spark
+   * executor task thread that is NOT inside a SQL execution context returns a
+   * fresh fallback {@code SQLConf} with default values — meaning the user's
+   * {@code spark.sql.session.timeZone} override on the SparkSession is 
invisible.
+   * This method falls back to {@code SparkEnv.get().conf()} (SparkConf is
+   * broadcast to every executor) so the override is honored in 
compaction-style
+   * code paths that dispatch via vanilla {@code parallelize().map()}.
+   *
+   * <p>Visible-for-testing: unit tests covering the 
SQLConf-not-propagated-to-executor
+   * behavior invoke this method directly from inside a Spark task closure 
without
+   * reflection.
+   */
+  public static String resolveSessionLocalTimeZone() {
+    // Resolution order:
+    //   1. SQLConf override (so `spark.conf.set("spark.sql.session.timeZone",
+    //      ...)` on the SparkSession takes effect on the driver and inside
+    //      Spark SQL execution contexts).
+    //   2. SparkConf (SparkEnv.get.conf) — broadcast to every executor at
+    //      startup, so the override is honored on executor tasks outside a SQL
+    //      execution context.
+    //   3. SQLConf.get.sessionLocalTimeZone — the JVM-default fallback.
+    String fromSqlConf = 
SQLConf.get().getConfString(SESSION_LOCAL_TIME_ZONE_KEY, null);
+    if (fromSqlConf != null) {
+      return fromSqlConf;
+    }
+    SparkEnv env = SparkEnv.get();
+    if (env != null) {
+      String fromSparkConf = env.conf().get(SESSION_LOCAL_TIME_ZONE_KEY, null);
+      if (fromSparkConf != null) {
+        return fromSparkConf;
+      }
+    }
+    return SQLConf.get().sessionLocalTimeZone();
+  }
+
   @Override
   public WriteContext init(Configuration configuration) {
     Map<String, String> metadata = new HashMap<>();
     metadata.put("org.apache.spark.version", 
VersionUtils.shortVersion(HoodieSparkUtils.getSparkVersion()));
     if 
(SparkAdapterSupport$.MODULE$.sparkAdapter().isLegacyBehaviorPolicy(datetimeRebaseMode))
 {
       metadata.put("org.apache.spark.legacyDateTime", "");
-      metadata.put("org.apache.spark.timeZone", 
SQLConf.get().sessionLocalTimeZone());
+      metadata.put(PARQUET_METADATA_TIME_ZONE_KEY, 
resolveSessionLocalTimeZone());
     }
     String vectorMeta = HoodieSchema.buildVectorColumnsMetadataValue(schema);
     if (!vectorMeta.isEmpty()) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowParquetWriteSupport.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowParquetWriteSupport.java
new file mode 100644
index 000000000000..0c39e99bdf89
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowParquetWriteSupport.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage.row;
+
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.TimeZone;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+/**
+ * Coverage for {@link 
HoodieRowParquetWriteSupport#resolveSessionLocalTimeZone()}.
+ * The full {@code init()} method is exercised end-to-end by integration tests 
in
+ * {@code hudi-spark-datasource/hudi-spark} (e.g. {@code 
TestHoodieInternalRowParquetWriter},
+ * {@code TestSparkAdapterRebaseModePropagation}). Constructing the write 
support
+ * directly from this module is not feasible because it requires a Spark
+ * adapter (e.g. {@code Spark3_5Adapter}) that is not on the test classpath
+ * here — the adapters live in sibling modules that depend on this one.
+ */
+class TestHoodieRowParquetWriteSupport extends HoodieClientTestBase {
+
+  private static final String SESSION_LOCAL_TIME_ZONE_KEY = 
"spark.sql.session.timeZone";
+
+  @Test
+  void testResolveSessionLocalTimeZoneWithoutOverride() {
+    String expected = TimeZone.getDefault().getID();
+
+    // Driver thread.
+    assertEquals(expected, 
HoodieRowParquetWriteSupport.resolveSessionLocalTimeZone(),
+        "driver-side helper did not return the JVM default timezone");
+
+    // Executor task threads via vanilla parallelize().map() — outside any
+    // Spark SQL execution context — exercise the SparkEnv-fallback branch.
+    List<String> seen = jsc.parallelize(Arrays.asList(1, 2, 3, 4), 4)
+        .map(i -> HoodieRowParquetWriteSupport.resolveSessionLocalTimeZone())
+        .collect();
+    for (int i = 0; i < seen.size(); i++) {
+      assertEquals(expected, seen.get(i),
+          "executor task #" + i + " resolved sessionLocalTimeZone to '" + 
seen.get(i)
+              + "' with no overrides; expected the JVM default ('" + expected 
+ "')");
+    }
+  }
+
+  @Test
+  void testResolveSessionLocalTimeZoneWithSqlConfOverride() {
+    // Pick a non-JVM-default zone so we can distinguish "fix worked" from
+    // "fell back to JVM default".
+    String jvmDefault = TimeZone.getDefault().getID();
+    String customTz = "Asia/Tokyo".equals(jvmDefault) ? "Pacific/Auckland" : 
"Asia/Tokyo";
+
+    sqlContext.sparkSession().conf().set(SESSION_LOCAL_TIME_ZONE_KEY, 
customTz);
+    try {
+      // Driver SQLConf carries the override; helper must return it via the 
first branch.
+      assertEquals(customTz, 
HoodieRowParquetWriteSupport.resolveSessionLocalTimeZone(),
+          "helper did not return the SQLConf override on the driver");
+    } finally {
+      sqlContext.sparkSession().conf().unset(SESSION_LOCAL_TIME_ZONE_KEY);
+    }
+  }
+
+  /**
+   * SparkConf branch: when the override lives in SparkConf (broadcast to every
+   * executor) but is absent from the current thread's SQLConf, the helper
+   * must return it via {@code SparkEnv.get.conf}. Mirrors a compaction task
+   * thread that is outside any SQL execution context — exactly the
+   * production scenario the fix targets.
+   */
+  @Test
+  void testResolveSessionLocalTimeZoneWithSparkConfOverride() {
+    String jvmDefault = TimeZone.getDefault().getID();
+    String customTz = "Asia/Tokyo".equals(jvmDefault) ? "Pacific/Auckland" : 
"Asia/Tokyo";
+    assertNotEquals(customTz, jvmDefault,
+        "test setup is fragile if customTz matches the JVM default");
+
+    // Inject the key into SparkConf and remove it from the SparkSession's
+    // SQLConf so only the SparkEnv branch can satisfy the lookup.
+    jsc.sc().conf().set(SESSION_LOCAL_TIME_ZONE_KEY, customTz);
+    sqlContext.sparkSession().conf().unset(SESSION_LOCAL_TIME_ZONE_KEY);
+    try {
+      // Driver: SQLConf branch returns null → SparkEnv branch returns 
customTz.
+      assertEquals(customTz, 
HoodieRowParquetWriteSupport.resolveSessionLocalTimeZone(),
+          "driver helper should fall back to SparkEnv when SQLConf is unset");
+
+      // Executor task threads: SQLConf.get returns the fallback default (no
+      // override), so the SparkEnv branch is the only one that can satisfy
+      // the lookup. SparkEnv is shared by driver and executors in local mode.
+      List<String> seen = jsc.parallelize(Arrays.asList(1, 2, 3, 4), 4)
+          .map(i -> HoodieRowParquetWriteSupport.resolveSessionLocalTimeZone())
+          .collect();
+      for (int i = 0; i < seen.size(); i++) {
+        assertEquals(customTz, seen.get(i),
+            "executor task #" + i + " should resolve to SparkConf override '"
+                + customTz + "', got '" + seen.get(i) + "'");
+      }
+    } finally {
+      jsc.sc().conf().remove(SESSION_LOCAL_TIME_ZONE_KEY);
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkAdapterRebaseModeDefault.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkAdapterRebaseModeDefault.scala
new file mode 100644
index 000000000000..6d4260ef778b
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkAdapterRebaseModeDefault.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.HoodieConversionUtils.toJavaOption
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.io.storage.row.HoodieRowParquetWriteSupport
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.util.JFunction
+
+import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.apache.spark.sql.internal.SQLConf
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.Assertions.assertEquals
+
+import java.util.function.Consumer
+
+/**
+ * Regression: with no override on SparkSession or SparkConf, the resolution
+ * chain falls through to the documented defaults (driver and executor).
+ */
+class TestSparkAdapterRebaseModeDefault extends HoodieSparkClientTestBase {
+
+  var spark: SparkSession = _
+
+  override def getSparkSessionExtensionsInjector: 
HOption[Consumer[SparkSessionExtensions]] =
+    toJavaOption(Some(JFunction.toJavaConsumer((rcv: SparkSessionExtensions) =>
+      new HoodieSparkSessionExtension().apply(rcv))))
+
+  @BeforeEach override def setUp(): Unit = {
+    // Deliberately set no rebase or timezone overrides in extraConf; the
+    // session is created with whatever defaults Spark ships with.
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+  }
+
+  @AfterEach override def tearDown(): Unit = {
+    cleanupSparkContexts()
+    cleanupFileSystem()
+  }
+
+  /**
+   * Adapter falls through to the ConfigEntry default. The default differs by
+   * Spark version (EXCEPTION on 3.x, CORRECTED on 4.1+), so we read it
+   * dynamically from SQLConf.
+   */
+  @Test
+  def testRebaseModeDefaultsToConfigEntryDefault(): Unit = {
+    val expected = 
SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE).toString
+
+    assertEquals(expected,
+      SparkAdapterSupport.sparkAdapter.getDateTimeRebaseMode().toString,
+      "driver-side adapter did not return the ConfigEntry default")
+
+    val seenOnExecutor: Array[String] = spark.sparkContext.parallelize(1 to 4, 
4).map { _ =>
+      SparkAdapterSupport.sparkAdapter.getDateTimeRebaseMode().toString
+    }.collect()
+
+    seenOnExecutor.zipWithIndex.foreach { case (mode, i) =>
+      assertEquals(expected, mode,
+        s"executor task #$i resolved rebase mode to '$mode' with no overrides 
set; " +
+          s"expected the ConfigEntry default ('$expected').")
+    }
+  }
+
+  /** Timezone helper falls through to SQLConf's session-local timezone (JVM 
default). */
+  @Test
+  def testSessionLocalTimeZoneDefaultsToJvmDefault(): Unit = {
+    val expected = java.util.TimeZone.getDefault.getID
+
+    assertEquals(expected,
+      HoodieRowParquetWriteSupport.resolveSessionLocalTimeZone(),
+      "driver-side helper did not return the JVM default timezone")
+
+    val seen: Array[String] = spark.sparkContext.parallelize(1 to 4, 4).map { 
_ =>
+      HoodieRowParquetWriteSupport.resolveSessionLocalTimeZone()
+    }.collect()
+
+    seen.zipWithIndex.foreach { case (tz, i) =>
+      assertEquals(expected, tz,
+        s"executor task #$i resolved sessionLocalTimeZone to '$tz' with no 
overrides; " +
+          s"expected the JVM default ('$expected').")
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkAdapterRebaseModePropagation.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkAdapterRebaseModePropagation.scala
new file mode 100644
index 000000000000..0093b20fc2fd
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestSparkAdapterRebaseModePropagation.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package org.apache.hudi.functional
+
+import org.apache.hudi.HoodieConversionUtils.toJavaOption
+import org.apache.hudi.SparkAdapterSupport
+import org.apache.hudi.common.util.{Option => HOption}
+import org.apache.hudi.io.storage.row.HoodieRowParquetWriteSupport
+import org.apache.hudi.testutils.HoodieSparkClientTestBase
+import org.apache.hudi.util.JFunction
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
+import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, 
assertTrue}
+
+import java.util.function.Consumer
+
+/**
+ * User-set rebase mode and session timezone must reach Spark executor tasks
+ * dispatched outside a SQL execution context. Each test fails without the
+ * SparkEnv.get.conf fallback.
+ */
+class TestSparkAdapterRebaseModePropagation extends HoodieSparkClientTestBase {
+
+  var spark: SparkSession = _
+
+  override def getSparkSessionExtensionsInjector: 
HOption[Consumer[SparkSessionExtensions]] =
+    toJavaOption(Some(JFunction.toJavaConsumer((rcv: SparkSessionExtensions) =>
+      new HoodieSparkSessionExtension().apply(rcv))))
+
+  // Use a non-default timezone so we can prove that the user override is what
+  // the executor sees, not the JVM default that any fresh SQLConf would 
return.
+  // We pick whichever of two unrelated zones isn't the JVM default, so the 
test
+  // works on any developer's machine (or CI box).
+  private val customTimeZone: String = {
+    val jvmDefault = java.util.TimeZone.getDefault.getID
+    if (jvmDefault == "Asia/Tokyo") "Pacific/Auckland" else "Asia/Tokyo"
+  }
+
+  @BeforeEach override def setUp(): Unit = {
+    // Customer-style config: LEGACY rebase + a non-default session timezone in
+    // SparkConf (broadcast to all executors) AND propagated into the driver
+    // SparkSession's SQLConf at startup.
+    extraConf.put("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY")
+    extraConf.put("spark.sql.parquet.datetimeRebaseModeInRead", "LEGACY")
+    extraConf.put("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY")
+    extraConf.put("spark.sql.parquet.int96RebaseModeInRead", "LEGACY")
+    extraConf.put("spark.sql.session.timeZone", customTimeZone)
+
+    initPath()
+    initSparkContexts()
+    spark = sqlContext.sparkSession
+  }
+
+  @AfterEach override def tearDown(): Unit = {
+    cleanupSparkContexts()
+    cleanupFileSystem()
+  }
+
+  @Test
+  def testRebaseModeReachesExecutorTask(): Unit = {
+    // Sanity check #1: SparkConf carries LEGACY (so SparkEnv.get.conf
+    // on the executor will see it after the fix).
+    assertEquals(
+      "LEGACY",
+      
spark.sparkContext.getConf.get("spark.sql.parquet.datetimeRebaseModeInWrite"),
+      "SparkConf does not carry LEGACY — extraConf setup failed"
+    )
+
+    // Sanity check #2: Driver session's SQLConf has LEGACY.
+    assertEquals(
+      "LEGACY",
+      spark.conf.get("spark.sql.parquet.datetimeRebaseModeInWrite"),
+      "driver SparkSession SQLConf does not see LEGACY"
+    )
+
+    // Sanity check #3: The Hudi adapter, called from the driver, returns 
LEGACY.
+    val driverMode = 
SparkAdapterSupport.sparkAdapter.getDateTimeRebaseMode().toString
+    assertEquals(
+      "LEGACY", driverMode,
+      s"adapter on driver returned $driverMode — expected LEGACY"
+    )
+
+    // Now the actual bug check: ask the adapter on EXECUTOR task threads.
+    // We use raw RDD.map (NOT Dataset.map) so we are NOT inside a Spark SQL
+    // execution context — Spark will not auto-propagate SQLConf for us.
+    // This mirrors HoodieCompactor.compact:146's 
`context.parallelize(...).map(...)`.
+    val seenOnExecutor: Array[String] = spark.sparkContext.parallelize(1 to 4, 
4).map { _ =>
+      SparkAdapterSupport.sparkAdapter.getDateTimeRebaseMode().toString
+    }.collect()
+
+    seenOnExecutor.zipWithIndex.foreach { case (mode, i) =>
+      assertEquals(
+        "LEGACY", mode,
+        s"executor task #$i resolved rebase mode to '$mode' despite the driver 
" +
+          "session being LEGACY and SparkConf carrying LEGACY. This is the " +
+          "SQLConf-not-propagated-to-executor bug. " +
+          "Fix: have the adapter's getDateTimeRebaseMode() fall back to " +
+          "SparkEnv.get.conf when SQLConf.get does not have the override."
+      )
+    }
+  }
+
+  /** SparkConf must be reachable on executor task threads via 
SparkEnv.get.conf. */
+  @Test
+  def testSparkConfIsVisibleOnExecutorViaSparkEnv(): Unit = {
+    val key = "spark.sql.parquet.datetimeRebaseModeInWrite"
+    val seen: Array[String] = spark.sparkContext.parallelize(1 to 4, 4).map { 
_ =>
+      Option(SparkEnv.get).map(_.conf.get(key, "<absent>")).getOrElse("<no 
SparkEnv>")
+    }.collect()
+    seen.zipWithIndex.foreach { case (v, i) =>
+      assertEquals(
+        "LEGACY", v,
+        s"task #$i could not read $key from SparkEnv.get.conf; got '$v'. " +
+          "If this fails, the adapter SparkEnv fallback path is unreachable on 
this Spark version."
+      )
+    }
+  }
+
+  /** Same shape as the rebase-mode test, for 
`HoodieRowParquetWriteSupport.resolveSessionLocalTimeZone`. */
+  @Test
+  def testSessionLocalTimeZoneReachesExecutorTask(): Unit = {
+    // Sanity: customTimeZone is set on the driver session.
+    assertEquals(customTimeZone, spark.conf.get("spark.sql.session.timeZone"))
+    assertNotEquals(customTimeZone, java.util.TimeZone.getDefault.getID,
+      "test setup is fragile: customTimeZone matches the JVM default, so we " +
+        "cannot distinguish 'fix worked' from 'fell back to JVM default'")
+
+    // Sanity: helper returns the override when called on the driver thread.
+    assertEquals(customTimeZone, 
HoodieRowParquetWriteSupport.resolveSessionLocalTimeZone())
+
+    // Bug check: helper returns the override when called from an executor 
task,
+    // not the JVM default.
+    val seen: Array[String] = spark.sparkContext.parallelize(1 to 4, 4).map { 
_ =>
+      HoodieRowParquetWriteSupport.resolveSessionLocalTimeZone()
+    }.collect()
+    seen.zipWithIndex.foreach { case (tz, i) =>
+      assertEquals(
+        customTimeZone, tz,
+        s"executor task #$i resolved sessionLocalTimeZone to '$tz', not the " +
+          s"user override '$customTimeZone'. Same SQLConf-not-propagated bug " 
+
+          "as the rebase mode read. Fix: use SparkEnv.get.conf as a fallback."
+      )
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
index 06af2d316371..a4cb5b70e72e 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_3Adapter.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.storage.StorageConfiguration
 import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.schema.MessageType
+import org.apache.spark.SparkEnv
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.avro._
@@ -178,7 +179,13 @@ class Spark3_3Adapter extends BaseSpark3Adapter {
 
 
   override def getDateTimeRebaseMode(): LegacyBehaviorPolicy.Value = {
-    
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE))
+    // See Spark3_5Adapter.getDateTimeRebaseMode for the rationale.
+    val fromSqlConf = 
Option(SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE, null))
+    val fromSparkConf = Option(SparkEnv.get)
+      .flatMap(env => 
Option(env.conf.get(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, null)))
+    LegacyBehaviorPolicy.withName(
+      fromSqlConf.orElse(fromSparkConf)
+        .getOrElse(SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE)))
   }
 
   override def isLegacyBehaviorPolicy(value: Object): Boolean = {
diff --git 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
index 21a2294e134d..923c8ac91959 100644
--- 
a/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.4.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_4Adapter.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.storage.StorageConfiguration
 import org.apache.avro.Schema
 import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.schema.MessageType
+import org.apache.spark.SparkEnv
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.avro._
@@ -178,7 +179,13 @@ class Spark3_4Adapter extends BaseSpark3Adapter {
   }
 
   override def getDateTimeRebaseMode(): LegacyBehaviorPolicy.Value = {
-    
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE))
+    // See Spark3_5Adapter.getDateTimeRebaseMode for the rationale.
+    val fromSqlConf = 
Option(SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE, null))
+    val fromSparkConf = Option(SparkEnv.get)
+      .flatMap(env => 
Option(env.conf.get(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, null)))
+    LegacyBehaviorPolicy.withName(
+      fromSqlConf.orElse(fromSparkConf)
+        .getOrElse(SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE)))
   }
 
   override def isLegacyBehaviorPolicy(value: Object): Boolean = {
diff --git 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
index ab5d18024e42..a2dfe3e7be60 100644
--- 
a/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.5.x/src/main/scala/org/apache/spark/sql/adapter/Spark3_5Adapter.scala
@@ -23,6 +23,7 @@ import org.apache.hudi.storage.StorageConfiguration
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.schema.MessageType
+import org.apache.spark.SparkEnv
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.avro._
@@ -193,7 +194,21 @@ class Spark3_5Adapter extends BaseSpark3Adapter {
   }
 
   override def getDateTimeRebaseMode(): LegacyBehaviorPolicy.Value = {
-    
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE))
+    // Resolution order:
+    //   1. SQLConf override (so `spark.conf.set(...)` on the SparkSession 
takes
+    //      effect on the driver and inside Spark SQL execution contexts).
+    //   2. SparkConf (SparkEnv.get.conf) — broadcast to every executor at
+    //      startup, so user-set `spark.sql.parquet.datetimeRebaseModeInWrite`
+    //      is honored on executor tasks outside a SQL execution context (e.g.
+    //      compaction dispatched via vanilla
+    //      `JavaSparkContext.parallelize(...).map(...)`).
+    //   3. The ConfigEntry's own default.
+    val fromSqlConf = 
Option(SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE, null))
+    val fromSparkConf = Option(SparkEnv.get)
+      .flatMap(env => 
Option(env.conf.get(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, null)))
+    LegacyBehaviorPolicy.withName(
+      fromSqlConf.orElse(fromSparkConf)
+        .getOrElse(SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE)))
   }
 
   override def isLegacyBehaviorPolicy(value: Object): Boolean = {
diff --git 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
index abaf24834868..a4eca808af79 100644
--- 
a/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.0.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_0Adapter.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkEnv
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.avro._
@@ -223,7 +224,13 @@ class Spark4_0Adapter extends BaseSpark4Adapter {
   }
 
   override def getDateTimeRebaseMode(): LegacyBehaviorPolicy.Value = {
-    
LegacyBehaviorPolicy.withName(SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE))
+    // See Spark3_5Adapter.getDateTimeRebaseMode for the rationale.
+    val fromSqlConf = 
Option(SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE, null))
+    val fromSparkConf = Option(SparkEnv.get)
+      .flatMap(env => 
Option(env.conf.get(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, null)))
+    LegacyBehaviorPolicy.withName(
+      fromSqlConf.orElse(fromSparkConf)
+        .getOrElse(SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE)))
   }
 
   override def isLegacyBehaviorPolicy(value: Object): Boolean = {
diff --git 
a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala
 
b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala
index 8c1612fce070..b910e79747da 100644
--- 
a/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala
+++ 
b/hudi-spark-datasource/hudi-spark4.1.x/src/main/scala/org/apache/spark/sql/adapter/Spark4_1Adapter.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.cdc.HoodieCDCFileSplit
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.spark.SparkEnv
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.avro._
@@ -226,7 +227,15 @@ class Spark4_1Adapter extends BaseSpark4Adapter {
   }
 
   override def getDateTimeRebaseMode(): LegacyBehaviorPolicy.Value = {
-    SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE)
+    // See Spark3_5Adapter.getDateTimeRebaseMode for the rationale. In Spark 
4.1
+    // the ConfigEntry returns LegacyBehaviorPolicy.Value directly, so the
+    // SparkConf string is parsed via withName before the orElse chain.
+    val fromSqlConf = 
Option(SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE, null))
+    val fromSparkConf = Option(SparkEnv.get)
+      .flatMap(env => 
Option(env.conf.get(SQLConf.PARQUET_REBASE_MODE_IN_WRITE.key, null)))
+      .map(LegacyBehaviorPolicy.withName)
+    fromSqlConf.orElse(fromSparkConf)
+      .getOrElse(SQLConf.get.getConf(SQLConf.PARQUET_REBASE_MODE_IN_WRITE))
   }
 
   override def isLegacyBehaviorPolicy(value: Object): Boolean = {

Reply via email to