This is an automated email from the ASF dual-hosted git repository.

zhouyuan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new faad82cdeb [CORE] Fix Delta 4.0 Spark 4.1 package build (#12078)
faad82cdeb is described below

commit faad82cdeb126b336891a1aeebf7158335971e31
Author: Mohammad Linjawi <[email protected]>
AuthorDate: Tue May 19 11:09:19 2026 +0300

    [CORE] Fix Delta 4.0 Spark 4.1 package build (#12078)
    
    * Fix Delta 4.0 Spark 4.1 package build
    
    * Use Delta 4.1 for Spark 4.1 profile
    
    * Align Delta tests with Spark 4.1 errors
    
    * Add Spark 4.1 Delta compile coverage
    
    * Trigger CI
    
    * Address Spark 4.1 Delta review comments
    
    * Move Spark 4.1 MemoryStream shim to Delta tests
    
    * Move MemoryStream compatibility to Spark 4.1 shim
    
    ---------
    
    Co-authored-by: Mohammad Linjawi <[email protected]>
---
 .github/workflows/velox_backend_x86.yml                |  2 +-
 .../spark/sql/delta/GlutenDeltaParquetFileFormat.scala |  6 +++---
 .../sql/delta/files/GlutenDeltaFileFormatWriter.scala  | 16 ++++++++--------
 .../sql/delta/DeltaInsertIntoTableSuiteShims.scala     | 17 ++++++++++++++---
 pom.xml                                                |  4 ++--
 .../spark/sql/execution/streaming/MemoryStream.scala   | 18 +++++++++++++-----
 6 files changed, 41 insertions(+), 22 deletions(-)

diff --git a/.github/workflows/velox_backend_x86.yml 
b/.github/workflows/velox_backend_x86.yml
index 1e4b6fb24a..eb4c60a56f 100644
--- a/.github/workflows/velox_backend_x86.yml
+++ b/.github/workflows/velox_backend_x86.yml
@@ -1432,7 +1432,7 @@ jobs:
           export PATH=$JAVA_HOME/bin:$PATH
           java -version
           $MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 
-Pbackends-velox \
-          -Pspark-ut 
-DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
+          -Pspark-ut -Pdelta 
-DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
           
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.spark.tags.SlowHiveTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
       - name: Upload test report
         if: always()
diff --git 
a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
 
b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
index 6df44e779d..137956f5c2 100644
--- 
a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
+++ 
b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.delta
 
-import org.apache.spark.internal.{LoggingShims, MDC}
+import org.apache.spark.internal.{Logging, MDC => SparkMDC}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat._
@@ -62,7 +62,7 @@ case class GlutenDeltaParquetFileFormat(
                                    tablePath: Option[String] = None,
                                    isCDCRead: Boolean = false)
   extends GlutenParquetFileFormat
-    with LoggingShims {
+    with Logging {
   // Validate either we have all arguments for DV enabled read or none of them.
   if (hasTablePath) {
     SparkSession.getActiveSession.map { session =>
@@ -528,7 +528,7 @@ case class GlutenDeltaParquetFileFormat(
       case AlwaysTrue() => Some(AlwaysTrue())
       case AlwaysFalse() => Some(AlwaysFalse())
       case _ =>
-        logError(log"Failed to translate filter ${MDC(DeltaLogKeys.FILTER, 
filter)}")
+        logError(log"Failed to translate filter 
${SparkMDC.of(DeltaLogKeys.FILTER, filter)}")
         None
     }
   }
diff --git 
a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
 
b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
index f609a6130b..73b9a8fdba 100644
--- 
a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
+++ 
b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
@@ -24,7 +24,7 @@ import 
org.apache.gluten.execution.datasource.GlutenFormatFactory
 import org.apache.gluten.extension.columnar.transition.{Convention, 
Transitions}
 
 import org.apache.spark._
-import org.apache.spark.internal.{LoggingShims, MDC}
+import org.apache.spark.internal.{Logging, MDC => SparkMDC}
 import org.apache.spark.internal.io.{FileCommitProtocol, 
SparkHadoopWriterUtils}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.sql.SparkSession
@@ -63,7 +63,7 @@ import java.util.{Date, UUID}
  *  values to data files. Specifically L123-126, L132, and L140 where it adds 
option
  *  WRITE_PARTITION_COLUMNS
  */
-object GlutenDeltaFileFormatWriter extends LoggingShims {
+object GlutenDeltaFileFormatWriter extends Logging {
 
   /**
    * A variable used in tests to check whether the output ordering of the 
query matches the
@@ -343,20 +343,20 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
       val ret = f
       val commitMsgs = ret.map(_.commitMsg)
 
-      logInfo(log"Start to commit write Job ${MDC(DeltaLogKeys.JOB_ID, 
description.uuid)}.")
+      logInfo(log"Start to commit write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, 
description.uuid)}.")
       val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, 
commitMsgs) }
-      logInfo(log"Write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)} 
committed. " +
-        log"Elapsed time: ${MDC(DeltaLogKeys.DURATION, duration)} ms.")
+      logInfo(log"Write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, 
description.uuid)} committed. " +
+        log"Elapsed time: ${SparkMDC.of(DeltaLogKeys.DURATION, duration)} ms.")
 
       processStats(description.statsTrackers, ret.map(_.summary.stats), 
duration)
       logInfo(log"Finished processing stats for write job " +
-        log"${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.")
+        log"${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}.")
 
       // return a set of all the partition paths that were updated during this 
job
       ret.map(_.summary.updatedPartitions).reduceOption(_ ++ 
_).getOrElse(Set.empty)
     } catch {
       case cause: Throwable =>
-        logError(log"Aborting job ${MDC(DeltaLogKeys.JOB_ID, 
description.uuid)}", cause)
+        logError(log"Aborting job ${SparkMDC.of(DeltaLogKeys.JOB_ID, 
description.uuid)}", cause)
         committer.abortJob(job)
         throw cause
     }
@@ -490,7 +490,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
       })(catchBlock = {
         // If there is an error, abort the task
         dataWriter.abort()
-        logError(log"Job ${MDC(DeltaLogKeys.JOB_ID, jobId)} aborted.")
+        logError(log"Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, jobId)} aborted.")
       }, finallyBlock = {
         dataWriter.close()
       })
diff --git 
a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
 
b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
index a9ae449213..85cde8a79b 100644
--- 
a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
+++ 
b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
@@ -17,8 +17,19 @@
 package org.apache.spark.sql.delta
 
 object DeltaInsertIntoTableSuiteShims {
-  val INSERT_INTO_TMP_VIEW_ERROR_MSG = "[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]"
+  private val isSpark41 = org.apache.spark.SPARK_VERSION.startsWith("4.1")
 
-  // Spark 4.0.1 reports non-constant defaults with NOT_CONSTANT.
-  val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG = 
"INVALID_DEFAULT_VALUE.NOT_CONSTANT"
+  val INSERT_INTO_TMP_VIEW_ERROR_MSG =
+    if (isSpark41) {
+      "[TABLE_OR_VIEW_NOT_FOUND]"
+    } else {
+      "[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]"
+    }
+
+  val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG =
+    if (isSpark41) {
+      "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION"
+    } else {
+      "INVALID_DEFAULT_VALUE.NOT_CONSTANT"
+    }
 }
diff --git a/pom.xml b/pom.xml
index 2be98cafe1..bb061c7434 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1351,8 +1351,8 @@
         
<sparkshim.artifactId>spark-sql-columnar-shims-spark41</sparkshim.artifactId>
         <spark.version>4.1.1</spark.version>
         <iceberg.version>1.10.0</iceberg.version>
-        <delta.package.name>delta-spark</delta.package.name>
-        <delta.version>4.0.0</delta.version>
+        <delta.package.name>delta-spark_4.1</delta.package.name>
+        <delta.version>4.1.0</delta.version>
         <delta.binary.version>40</delta.binary.version>
         <hudi.version>1.1.0</hudi.version>
         <fasterxml.version>2.18.2</fasterxml.version>
diff --git 
a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala
similarity index 57%
copy from 
backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
copy to 
shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala
index a9ae449213..4a675a26d9 100644
--- 
a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
+++ 
b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala
@@ -14,11 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.spark.sql.delta
+package org.apache.spark.sql.execution.streaming
 
-object DeltaInsertIntoTableSuiteShims {
-  val INSERT_INTO_TMP_VIEW_ERROR_MSG = "[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]"
+import org.apache.spark.sql.{Encoder, SQLContext}
+import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream => 
RuntimeMemoryStream}
 
-  // Spark 4.0.1 reports non-constant defaults with NOT_CONSTANT.
-  val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG = 
"INVALID_DEFAULT_VALUE.NOT_CONSTANT"
+object MemoryStream {
+  def apply[A: Encoder](implicit sqlContext: SQLContext): 
RuntimeMemoryStream[A] = {
+    RuntimeMemoryStream[A](implicitly[Encoder[A]], sqlContext.sparkSession)
+  }
+
+  def apply[A: Encoder](
+      numPartitions: Int)(
+      implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = {
+    RuntimeMemoryStream[A](numPartitions)(implicitly[Encoder[A]], 
sqlContext.sparkSession)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to