This is an automated email from the ASF dual-hosted git repository.

zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 4fa7bdc712 [VL] Delta: Offload Delta OPTIMIZE compaction command 
transactions (#12024)
4fa7bdc712 is described below

commit 4fa7bdc712a19b6d5c6c9a811c4280bd35980ad1
Author: Mohammad Linjawi <[email protected]>
AuthorDate: Fri May 8 17:48:18 2026 +0300

    [VL] Delta: Offload Delta OPTIMIZE compaction command transactions (#12024)
---
 .../datasources/v2/DeltaWriteOperators.scala       |  19 +-
 .../datasources/v2/OffloadDeltaCommand.scala       |  22 +-
 .../spark/sql/delta/DeltaNativeWriteSuite.scala    | 287 +++++++++++++++++++++
 .../spark/sql/delta/DeltaNativeWriteSuite.scala    | 221 +++++++++++++++-
 .../gluten/utils/velox/VeloxTestSettings.scala     |   3 +
 5 files changed, 546 insertions(+), 6 deletions(-)

diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala
index 832cc084bb..ebd9e7a3e4 100644
--- 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.delta.{GlutenOptimisticTransaction, 
OptimisticTransaction, TransactionExecutionObserver}
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
+import org.apache.spark.sql.execution.command.{LeafRunnableCommand, 
RunnableCommand}
 import org.apache.spark.sql.execution.metric.SQLMetric
 
 case class GlutenDeltaLeafV2CommandExec(delegate: LeafV2CommandExec) extends 
LeafV2CommandExec {
@@ -59,6 +59,23 @@ case class GlutenDeltaLeafRunnableCommand(delegate: 
LeafRunnableCommand)
   override def nodeName: String = "GlutenDelta " + delegate.nodeName
 }
 
+case class GlutenDeltaRunnableCommand(delegate: RunnableCommand) extends 
LeafRunnableCommand {
+  override lazy val metrics: Map[String, SQLMetric] = delegate.metrics
+
+  override def output: Seq[Attribute] = {
+    delegate.output
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    TransactionExecutionObserver.withObserver(
+      DeltaV2WriteOperators.UseColumnarDeltaTransactionLog) {
+      delegate.run(sparkSession)
+    }
+  }
+
+  override def nodeName: String = "GlutenDelta " + delegate.nodeName
+}
+
 object DeltaV2WriteOperators {
   object UseColumnarDeltaTransactionLog extends TransactionExecutionObserver {
     override def startingTransaction(f: => OptimisticTransaction): 
OptimisticTransaction = {
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
index 4f3e96585c..f4b74e4dbe 100644
--- 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
@@ -20,13 +20,14 @@ import org.apache.gluten.config.VeloxDeltaConfig
 import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
 
 import org.apache.spark.sql.delta.catalog.DeltaCatalog
-import org.apache.spark.sql.delta.commands.{DeleteCommand, UpdateCommand}
+import org.apache.spark.sql.delta.commands.{DeleteCommand, DeltaCommand, 
OptimizeTableCommand, UpdateCommand}
+import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils
 import org.apache.spark.sql.delta.sources.DeltaDataSource
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.ExecutedCommandExec
 import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
 
-case class OffloadDeltaCommand() extends OffloadSingleNode {
+case class OffloadDeltaCommand() extends OffloadSingleNode with DeltaCommand {
   override def offload(plan: SparkPlan): SparkPlan = {
     if (!VeloxDeltaConfig.get.enableNativeWrite) {
       return plan
@@ -36,6 +37,8 @@ case class OffloadDeltaCommand() extends OffloadSingleNode {
         ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(uc))
       case ExecutedCommandExec(dc: DeleteCommand) =>
         ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(dc))
+      case ExecutedCommandExec(optimize: OptimizeTableCommand) if 
shouldOffloadOptimize(optimize) =>
+        ExecutedCommandExec(GlutenDeltaRunnableCommand(optimize))
       case ExecutedCommandExec(s @ SaveIntoDataSourceCommand(_, _: 
DeltaDataSource, _, _)) =>
         ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(s))
       case ctas: AtomicCreateTableAsSelectExec if 
ctas.catalog.isInstanceOf[DeltaCatalog] =>
@@ -45,4 +48,19 @@ case class OffloadDeltaCommand() extends OffloadSingleNode {
       case other => other
     }
   }
+
+  // Currently only plain OPTIMIZE bin-packing is supported for command 
offload. OPTIMIZE
+  // variants with layout-specific semantics, such as ZORDER, REORG, OPTIMIZE 
FULL, or
+  // liquid clustering, continue to use Delta's original command path.
+  private def shouldOffloadOptimize(optimize: OptimizeTableCommand): Boolean = 
{
+    optimize.zOrderBy.isEmpty &&
+    optimize.optimizeContext.reorg.isEmpty &&
+    !optimize.optimizeContext.isFull &&
+    !isClusteredOptimize(optimize)
+  }
+
+  private def isClusteredOptimize(optimize: OptimizeTableCommand): Boolean = {
+    val snapshot = getDeltaTable(optimize.child, "OPTIMIZE").update()
+    ClusteredTableUtils.isSupported(snapshot.protocol)
+  }
 }
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala
new file mode 100644
index 0000000000..2620571186
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.VeloxDeltaConfig
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.delta.actions.AddFile
+import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import 
org.apache.spark.sql.execution.datasources.v2.{GlutenDeltaLeafRunnableCommand, 
GlutenDeltaLeafV2CommandExec, GlutenDeltaRunnableCommand}
+import org.apache.spark.sql.internal.SQLConf
+
+class DeltaNativeWriteSuite extends DeltaSQLCommandTest {
+
+  import testImplicits._
+
+  private lazy val isMac = sys.props
+    .get("os.name")
+    .exists(_.toLowerCase(java.util.Locale.ROOT).contains("mac"))
+
+  private def withNativeWriteOffloadConf(f: => Unit): Unit = {
+    val confs = Seq(
+      SQLConf.ANSI_ENABLED.key -> "false",
+      SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC",
+      GlutenConfig.GLUTEN_ANSI_FALLBACK_ENABLED.key -> "false",
+      DeltaSQLConf.DELTA_COLLECT_STATS.key -> "false"
+    ) ++
+      (if (isMac) {
+         Seq(GlutenConfig.NATIVE_VALIDATION_ENABLED.key -> "false")
+       } else {
+         Seq.empty
+       })
+
+    withSQLConf(confs: _*) {
+      f
+    }
+  }
+
+  private def hasGlutenDeltaWriteCommand(plan: SparkPlan): Boolean = {
+    val nativeClassMatch = plan
+      .collectFirst {
+        case ExecutedCommandExec(_: GlutenDeltaLeafRunnableCommand) => true
+        case ExecutedCommandExec(_: GlutenDeltaRunnableCommand) => true
+        case _: GlutenDeltaLeafV2CommandExec => true
+      }
+      .getOrElse(false)
+
+    val nativeNodeMatch = plan
+      .collectFirst {
+        case p if p.nodeName.startsWith("Execute GlutenDelta ") => true
+        case p if p.nodeName.startsWith("GlutenDelta ") => true
+      }
+      .getOrElse(false)
+
+    val nativeTreeMatch = plan.treeString.contains("GlutenDelta ")
+
+    nativeClassMatch || nativeNodeMatch || nativeTreeMatch
+  }
+
+  private def assertContainsNativeWriteCommand(plan: SparkPlan, context: 
String): Unit = {
+    assert(
+      hasGlutenDeltaWriteCommand(plan),
+      s"Expected native delta write command for $context, but got 
plan:\n${plan.treeString}"
+    )
+  }
+
+  private def assertNoNativeWriteCommand(plan: SparkPlan, context: String): 
Unit = {
+    assert(
+      !hasGlutenDeltaWriteCommand(plan),
+      s"Expected no native delta write command for $context, but got 
plan:\n${plan.treeString}"
+    )
+  }
+
+  private def files(deltaLog: DeltaLog): Set[AddFile] = {
+    deltaLog.update().allFiles.collect().toSet
+  }
+
+  private def collectOptimizeMetrics(df: DataFrame): OptimizeMetrics = {
+    val metrics = df.select("metrics.*").as[OptimizeMetrics].collect()
+    assert(metrics.length == 1, s"Expected one OPTIMIZE result row, got 
${metrics.length}")
+    metrics.head
+  }
+
+  private def assertOptimizeCommit(deltaLog: DeltaLog, context: String): Unit 
= {
+    val latestCommit = deltaLog.history.getHistory(Some(1)).head
+    assert(
+      latestCommit.operation == "OPTIMIZE",
+      s"Expected latest Delta operation for $context to be OPTIMIZE, got " +
+        latestCommit.operation)
+  }
+
+  private def assertCompactionMetrics(
+      metrics: OptimizeMetrics,
+      beforeFileCount: Int,
+      afterFileCount: Int,
+      context: String,
+      expectedPartitionsOptimized: Option[Long] = None): Unit = {
+    assert(metrics.numFilesRemoved > 0, s"Expected files removed for $context")
+    assert(metrics.numFilesAdded > 0, s"Expected files added for $context")
+    assert(
+      afterFileCount < beforeFileCount,
+      s"Expected fewer active files after $context, before=$beforeFileCount 
after=$afterFileCount")
+    assert(
+      metrics.numFilesRemoved > metrics.numFilesAdded,
+      s"Expected $context to compact to fewer files, 
removed=${metrics.numFilesRemoved} " +
+        s"added=${metrics.numFilesAdded}"
+    )
+    assert(
+      metrics.filesRemoved.totalFiles == metrics.numFilesRemoved,
+      s"Removed file metrics did not match numFilesRemoved for $context")
+    assert(
+      metrics.filesAdded.totalFiles == metrics.numFilesAdded,
+      s"Added file metrics did not match numFilesAdded for $context")
+    assert(metrics.filesRemoved.totalSize > 0, s"Expected removed file size 
metrics for $context")
+    assert(metrics.filesAdded.totalSize > 0, s"Expected added file size 
metrics for $context")
+    assert(metrics.numBatches > 0, s"Expected at least one optimize batch for 
$context")
+    expectedPartitionsOptimized.foreach {
+      expected =>
+        assert(
+          metrics.partitionsOptimized == expected,
+          s"Expected $expected optimized partitions for $context, got " +
+            metrics.partitionsOptimized)
+    }
+  }
+
+  test("native delta optimize command should be offloaded") {
+    withNativeWriteOffloadConf {
+      withTempDir {
+        dir =>
+          val path = dir.getCanonicalPath
+          spark.range(0, 32, 1, 
4).toDF("id").write.format("delta").mode("append").save(path)
+          spark.range(32, 64, 1, 
4).toDF("id").write.format("delta").mode("append").save(path)
+
+          val deltaLog = DeltaLog.forTable(spark, path)
+          val beforeFiles = files(deltaLog)
+
+          val optimizeDf = sql(s"OPTIMIZE delta.`$path`")
+          
assertContainsNativeWriteCommand(optimizeDf.queryExecution.executedPlan, 
"OPTIMIZE")
+          val metrics = collectOptimizeMetrics(optimizeDf)
+
+          val afterFiles = files(deltaLog)
+          assertCompactionMetrics(metrics, beforeFiles.size, afterFiles.size, 
"path OPTIMIZE")
+          assertOptimizeCommit(deltaLog, "path OPTIMIZE")
+          val result = spark.read.format("delta").load(path)
+          assert(result.collect().map(_.getLong(0)).toSet == (0L until 
64L).toSet)
+      }
+    }
+  }
+
+  test("native delta optimize table command should be offloaded") {
+    withNativeWriteOffloadConf {
+      withTable("delta_native_optimize_table") {
+        spark
+          .range(0, 32, 1, 4)
+          .toDF("id")
+          .write
+          .format("delta")
+          .mode("overwrite")
+          .saveAsTable("delta_native_optimize_table")
+        spark
+          .range(32, 64, 1, 4)
+          .toDF("id")
+          .write
+          .format("delta")
+          .mode("append")
+          .saveAsTable("delta_native_optimize_table")
+
+        val deltaLog = DeltaLog.forTable(spark, 
TableIdentifier("delta_native_optimize_table"))
+        val beforeFiles = files(deltaLog)
+
+        val optimizeDf = sql("OPTIMIZE delta_native_optimize_table")
+        
assertContainsNativeWriteCommand(optimizeDf.queryExecution.executedPlan, 
"OPTIMIZE table")
+        val metrics = collectOptimizeMetrics(optimizeDf)
+
+        val afterFiles = files(deltaLog)
+        assertCompactionMetrics(metrics, beforeFiles.size, afterFiles.size, 
"table OPTIMIZE")
+        assertOptimizeCommit(deltaLog, "table OPTIMIZE")
+        val result = spark.read.table("delta_native_optimize_table")
+        assert(result.collect().map(_.getLong(0)).toSet == (0L until 
64L).toSet)
+      }
+    }
+  }
+
+  test("native delta optimize partition predicate command should be 
offloaded") {
+    withNativeWriteOffloadConf {
+      withTempDir {
+        dir =>
+          val path = dir.getCanonicalPath
+          spark
+            .range(0, 20, 1, 4)
+            .selectExpr("id", "cast(id % 2 as int) as part")
+            .write
+            .format("delta")
+            .partitionBy("part")
+            .mode("append")
+            .save(path)
+          spark
+            .range(20, 40, 1, 4)
+            .selectExpr("id", "cast(id % 2 as int) as part")
+            .write
+            .format("delta")
+            .partitionBy("part")
+            .mode("append")
+            .save(path)
+
+          val deltaLog = DeltaLog.forTable(spark, path)
+          val beforeFiles = files(deltaLog)
+          val beforePart0Paths = beforeFiles
+            .filter(_.partitionValues.get("part").contains("0"))
+            .map(_.path)
+          val beforePart1Count = 
beforeFiles.count(_.partitionValues.get("part").contains("1"))
+
+          val optimizeDf = sql(s"OPTIMIZE delta.`$path` WHERE part = 1")
+          assertContainsNativeWriteCommand(
+            optimizeDf.queryExecution.executedPlan,
+            "OPTIMIZE WHERE")
+          val metrics = collectOptimizeMetrics(optimizeDf)
+
+          val afterFiles = files(deltaLog)
+          val afterPart0Paths = afterFiles
+            .filter(_.partitionValues.get("part").contains("0"))
+            .map(_.path)
+          val afterPart1Count = 
afterFiles.count(_.partitionValues.get("part").contains("1"))
+          assert(
+            beforePart0Paths.subsetOf(afterPart0Paths),
+            "OPTIMIZE WHERE part = 1 should not remove files from part = 0")
+          assert(
+            afterPart1Count < beforePart1Count,
+            s"Expected fewer active files in part = 1, 
before=$beforePart1Count " +
+              s"after=$afterPart1Count")
+          assertCompactionMetrics(
+            metrics,
+            beforeFiles.size,
+            afterFiles.size,
+            "partition predicate OPTIMIZE",
+            expectedPartitionsOptimized = Some(1L))
+          assertOptimizeCommit(deltaLog, "partition predicate OPTIMIZE")
+          val result = spark.read.format("delta").load(path)
+          assert(result.select("id").collect().map(_.getLong(0)).toSet == (0L 
until 40L).toSet)
+          assert(result.where("part = 0").count() == 20)
+          assert(result.where("part = 1").count() == 20)
+      }
+    }
+  }
+
+  test("delta optimize command should not be offloaded when native write is 
disabled") {
+    withNativeWriteOffloadConf {
+      withTempDir {
+        dir =>
+          val path = dir.getCanonicalPath
+          spark.range(0, 10, 1, 
2).toDF("id").write.format("delta").mode("append").save(path)
+          spark.range(10, 20, 1, 
2).toDF("id").write.format("delta").mode("append").save(path)
+
+          withSQLConf(VeloxDeltaConfig.ENABLE_NATIVE_WRITE.key -> "false") {
+            val optimizeDf = sql(s"OPTIMIZE delta.`$path`")
+            assertNoNativeWriteCommand(
+              optimizeDf.queryExecution.executedPlan,
+              "OPTIMIZE with native write disabled")
+            optimizeDf.collect()
+          }
+
+          val result = spark.read.format("delta").load(path)
+          assert(result.collect().map(_.getLong(0)).toSet == (0L until 
20L).toSet)
+      }
+    }
+  }
+}
diff --git 
a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala
 
b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala
index 0b5423ab22..bca0a66d1a 100644
--- 
a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala
+++ 
b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala
@@ -19,17 +19,20 @@ package org.apache.spark.sql.delta
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.config.VeloxDeltaConfig
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.delta.actions.AddFile
+import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics
 import org.apache.spark.sql.delta.sources.DeltaSQLConf
 import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
 import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command.ExecutedCommandExec
-import 
org.apache.spark.sql.execution.datasources.v2.{GlutenDeltaLeafRunnableCommand, 
GlutenDeltaLeafV2CommandExec}
+import 
org.apache.spark.sql.execution.datasources.v2.{GlutenDeltaLeafRunnableCommand, 
GlutenDeltaLeafV2CommandExec, GlutenDeltaRunnableCommand}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.util.QueryExecutionListener
 
-import java.util.concurrent.CopyOnWriteArrayList
+import java.util.concurrent.{CopyOnWriteArrayList, TimeUnit}
 
 import scala.jdk.CollectionConverters._
 
@@ -86,6 +89,7 @@ class DeltaNativeWriteSuite extends DeltaSQLCommandTest {
     val nativeClassMatch = plan
       .collectFirst {
         case ExecutedCommandExec(_: GlutenDeltaLeafRunnableCommand) => true
+        case ExecutedCommandExec(_: GlutenDeltaRunnableCommand) => true
         case _: GlutenDeltaLeafV2CommandExec => true
       }
       .getOrElse(false)
@@ -115,6 +119,21 @@ class DeltaNativeWriteSuite extends DeltaSQLCommandTest {
     spark.listenerManager.register(listener)
     try {
       action
+      val deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10)
+      var lastSize = -1
+      var stableSince = System.nanoTime()
+      while (System.nanoTime() < deadline) {
+        val size = plans.size()
+        val now = System.nanoTime()
+        if (size != lastSize) {
+          lastSize = size
+          stableSince = now
+        }
+        if (size > 0 && now - stableSince >= 
TimeUnit.MILLISECONDS.toNanos(500)) {
+          return plans.asScala.toSeq
+        }
+        Thread.sleep(50)
+      }
     } finally {
       spark.listenerManager.unregister(listener)
     }
@@ -137,6 +156,58 @@ class DeltaNativeWriteSuite extends DeltaSQLCommandTest {
     )
   }
 
+  private def files(deltaLog: DeltaLog): Set[AddFile] = {
+    deltaLog.update().allFiles.collect().toSet
+  }
+
+  private def collectOptimizeMetrics(df: DataFrame): OptimizeMetrics = {
+    val metrics = df.select("metrics.*").as[OptimizeMetrics].collect()
+    assert(metrics.length == 1, s"Expected one OPTIMIZE result row, got 
${metrics.length}")
+    metrics.head
+  }
+
+  private def assertOptimizeCommit(deltaLog: DeltaLog, context: String): Unit 
= {
+    val latestCommit = deltaLog.history.getHistory(Some(1)).head
+    assert(
+      latestCommit.operation == "OPTIMIZE",
+      s"Expected latest Delta operation for $context to be OPTIMIZE, got " +
+        latestCommit.operation)
+  }
+
+  private def assertCompactionMetrics(
+      metrics: OptimizeMetrics,
+      beforeFileCount: Int,
+      afterFileCount: Int,
+      context: String,
+      expectedPartitionsOptimized: Option[Long] = None): Unit = {
+    assert(metrics.numFilesRemoved > 0, s"Expected files removed for $context")
+    assert(metrics.numFilesAdded > 0, s"Expected files added for $context")
+    assert(
+      afterFileCount < beforeFileCount,
+      s"Expected fewer active files after $context, before=$beforeFileCount 
after=$afterFileCount")
+    assert(
+      metrics.numFilesRemoved > metrics.numFilesAdded,
+      s"Expected $context to compact to fewer files, 
removed=${metrics.numFilesRemoved} " +
+        s"added=${metrics.numFilesAdded}"
+    )
+    assert(
+      metrics.filesRemoved.totalFiles == metrics.numFilesRemoved,
+      s"Removed file metrics did not match numFilesRemoved for $context")
+    assert(
+      metrics.filesAdded.totalFiles == metrics.numFilesAdded,
+      s"Added file metrics did not match numFilesAdded for $context")
+    assert(metrics.filesRemoved.totalSize > 0, s"Expected removed file size 
metrics for $context")
+    assert(metrics.filesAdded.totalSize > 0, s"Expected added file size 
metrics for $context")
+    assert(metrics.numBatches > 0, s"Expected at least one optimize batch for 
$context")
+    expectedPartitionsOptimized.foreach {
+      expected =>
+        assert(
+          metrics.partitionsOptimized == expected,
+          s"Expected $expected optimized partitions for $context, got " +
+            metrics.partitionsOptimized)
+    }
+  }
+
   test("native delta delete command should be offloaded") {
     withNativeWriteOffloadConf {
       withTempDir {
@@ -275,6 +346,150 @@ class DeltaNativeWriteSuite extends DeltaSQLCommandTest {
     }
   }
 
+  test("native delta optimize command should be offloaded") {
+    withNativeWriteOffloadConf {
+      withTempDir {
+        dir =>
+          val path = dir.getCanonicalPath
+          spark.range(0, 32, 1, 
4).toDF("id").write.format("delta").mode("append").save(path)
+          spark.range(32, 64, 1, 
4).toDF("id").write.format("delta").mode("append").save(path)
+
+          val deltaLog = DeltaLog.forTable(spark, path)
+          val beforeFiles = files(deltaLog)
+
+          val optimizeDf = sql(s"OPTIMIZE delta.`$path`")
+          
assertContainsNativeWriteCommand(Seq(optimizeDf.queryExecution.executedPlan), 
"OPTIMIZE")
+          val metrics = collectOptimizeMetrics(optimizeDf)
+
+          val afterFiles = files(deltaLog)
+          assertCompactionMetrics(metrics, beforeFiles.size, afterFiles.size, 
"path OPTIMIZE")
+          assertOptimizeCommit(deltaLog, "path OPTIMIZE")
+          val result = spark.read.format("delta").load(path)
+          assert(result.collect().map(_.getLong(0)).toSet == (0L until 
64L).toSet)
+      }
+    }
+  }
+
+  test("native delta optimize table command should be offloaded") {
+    withNativeWriteOffloadConf {
+      withTable("delta_native_optimize_table") {
+        spark
+          .range(0, 32, 1, 4)
+          .toDF("id")
+          .write
+          .format("delta")
+          .mode("overwrite")
+          .saveAsTable("delta_native_optimize_table")
+        spark
+          .range(32, 64, 1, 4)
+          .toDF("id")
+          .write
+          .format("delta")
+          .mode("append")
+          .saveAsTable("delta_native_optimize_table")
+
+        val deltaLog = DeltaLog.forTable(spark, 
TableIdentifier("delta_native_optimize_table"))
+        val beforeFiles = files(deltaLog)
+
+        val optimizeDf = sql("OPTIMIZE delta_native_optimize_table")
+        assertContainsNativeWriteCommand(
+          Seq(optimizeDf.queryExecution.executedPlan),
+          "OPTIMIZE table")
+        val metrics = collectOptimizeMetrics(optimizeDf)
+
+        val afterFiles = files(deltaLog)
+        assertCompactionMetrics(metrics, beforeFiles.size, afterFiles.size, 
"table OPTIMIZE")
+        assertOptimizeCommit(deltaLog, "table OPTIMIZE")
+        val result = spark.read.table("delta_native_optimize_table")
+        assert(result.collect().map(_.getLong(0)).toSet == (0L until 
64L).toSet)
+      }
+    }
+  }
+
+  test("native delta optimize partition predicate command should be 
offloaded") {
+    withNativeWriteOffloadConf {
+      withTempDir {
+        dir =>
+          val path = dir.getCanonicalPath
+          spark
+            .range(0, 20, 1, 4)
+            .selectExpr("id", "cast(id % 2 as int) as part")
+            .write
+            .format("delta")
+            .partitionBy("part")
+            .mode("append")
+            .save(path)
+          spark
+            .range(20, 40, 1, 4)
+            .selectExpr("id", "cast(id % 2 as int) as part")
+            .write
+            .format("delta")
+            .partitionBy("part")
+            .mode("append")
+            .save(path)
+
+          val deltaLog = DeltaLog.forTable(spark, path)
+          val beforeFiles = files(deltaLog)
+          val beforePart0Paths = beforeFiles
+            .filter(_.partitionValues.get("part").contains("0"))
+            .map(_.path)
+          val beforePart1Count = 
beforeFiles.count(_.partitionValues.get("part").contains("1"))
+
+          val optimizeDf = sql(s"OPTIMIZE delta.`$path` WHERE part = 1")
+          assertContainsNativeWriteCommand(
+            Seq(optimizeDf.queryExecution.executedPlan),
+            "OPTIMIZE WHERE")
+          val metrics = collectOptimizeMetrics(optimizeDf)
+
+          val afterFiles = files(deltaLog)
+          val afterPart0Paths = afterFiles
+            .filter(_.partitionValues.get("part").contains("0"))
+            .map(_.path)
+          val afterPart1Count = 
afterFiles.count(_.partitionValues.get("part").contains("1"))
+          assert(
+            beforePart0Paths.subsetOf(afterPart0Paths),
+            "OPTIMIZE WHERE part = 1 should not remove files from part = 0")
+          assert(
+            afterPart1Count < beforePart1Count,
+            s"Expected fewer active files in part = 1, 
before=$beforePart1Count " +
+              s"after=$afterPart1Count")
+          assertCompactionMetrics(
+            metrics,
+            beforeFiles.size,
+            afterFiles.size,
+            "partition predicate OPTIMIZE",
+            expectedPartitionsOptimized = Some(1L))
+          assertOptimizeCommit(deltaLog, "partition predicate OPTIMIZE")
+          val result = spark.read.format("delta").load(path)
+          assert(result.select("id").collect().map(_.getLong(0)).toSet == (0L 
until 40L).toSet)
+          assert(result.where("part = 0").count() == 20)
+          assert(result.where("part = 1").count() == 20)
+      }
+    }
+  }
+
+  test("delta optimize command should not be offloaded when native write is 
disabled") {
+    withNativeWriteOffloadConf {
+      withTempDir {
+        dir =>
+          val path = dir.getCanonicalPath
+          spark.range(0, 10, 1, 
2).toDF("id").write.format("delta").mode("append").save(path)
+          spark.range(10, 20, 1, 
2).toDF("id").write.format("delta").mode("append").save(path)
+
+          withSQLConf(VeloxDeltaConfig.ENABLE_NATIVE_WRITE.key -> "false") {
+            val optimizeDf = sql(s"OPTIMIZE delta.`$path`")
+            assertNoNativeWriteCommand(
+              Seq(optimizeDf.queryExecution.executedPlan),
+              "OPTIMIZE with native write disabled")
+            optimizeDf.collect()
+          }
+
+          val result = spark.read.format("delta").load(path)
+          assert(result.collect().map(_.getLong(0)).toSet == (0L until 
20L).toSet)
+      }
+    }
+  }
+
   test("delta save command should not be offloaded when native write is 
disabled") {
     withNativeWriteOffloadConf {
       withTempDir {
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 29a1bfd685..71b12e340e 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -703,6 +703,9 @@ class VeloxTestSettings extends BackendTestSettings {
     .exclude("SPARK-37779: ColumnarToRowExec should be canonicalizable after 
being (de)serialized")
   enableSuite[GlutenSparkPlannerSuite]
   enableSuite[GlutenSparkScriptTransformationSuite]
+    // Flaky in CI containers for Spark 4.1: intermittently fails with
+    // `/tmp/test-resource*.py: Permission denied` and can crash JVM.
+    .exclude("SPARK-33934: Add SparkFile's root dir to env property PATH")
   enableSuite[GlutenSparkSqlParserSuite]
   enableSuite[GlutenUnsafeFixedWidthAggregationMapSuite]
   enableSuite[GlutenUnsafeKVExternalSorterSuite]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to