This is an automated email from the ASF dual-hosted git repository.
zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 4fa7bdc712 [VL] Delta: Offload Delta OPTIMIZE compaction command
transactions (#12024)
4fa7bdc712 is described below
commit 4fa7bdc712a19b6d5c6c9a811c4280bd35980ad1
Author: Mohammad Linjawi <[email protected]>
AuthorDate: Fri May 8 17:48:18 2026 +0300
[VL] Delta: Offload Delta OPTIMIZE compaction command transactions (#12024)
---
.../datasources/v2/DeltaWriteOperators.scala | 19 +-
.../datasources/v2/OffloadDeltaCommand.scala | 22 +-
.../spark/sql/delta/DeltaNativeWriteSuite.scala | 287 +++++++++++++++++++++
.../spark/sql/delta/DeltaNativeWriteSuite.scala | 221 +++++++++++++++-
.../gluten/utils/velox/VeloxTestSettings.scala | 3 +
5 files changed, 546 insertions(+), 6 deletions(-)
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala
index 832cc084bb..ebd9e7a3e4 100644
---
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/DeltaWriteOperators.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.delta.{GlutenOptimisticTransaction,
OptimisticTransaction, TransactionExecutionObserver}
-import org.apache.spark.sql.execution.command.LeafRunnableCommand
+import org.apache.spark.sql.execution.command.{LeafRunnableCommand,
RunnableCommand}
import org.apache.spark.sql.execution.metric.SQLMetric
case class GlutenDeltaLeafV2CommandExec(delegate: LeafV2CommandExec) extends
LeafV2CommandExec {
@@ -59,6 +59,23 @@ case class GlutenDeltaLeafRunnableCommand(delegate:
LeafRunnableCommand)
override def nodeName: String = "GlutenDelta " + delegate.nodeName
}
+case class GlutenDeltaRunnableCommand(delegate: RunnableCommand) extends
LeafRunnableCommand {
+ override lazy val metrics: Map[String, SQLMetric] = delegate.metrics
+
+ override def output: Seq[Attribute] = {
+ delegate.output
+ }
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ TransactionExecutionObserver.withObserver(
+ DeltaV2WriteOperators.UseColumnarDeltaTransactionLog) {
+ delegate.run(sparkSession)
+ }
+ }
+
+ override def nodeName: String = "GlutenDelta " + delegate.nodeName
+}
+
object DeltaV2WriteOperators {
object UseColumnarDeltaTransactionLog extends TransactionExecutionObserver {
override def startingTransaction(f: => OptimisticTransaction):
OptimisticTransaction = {
diff --git
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
index 4f3e96585c..f4b74e4dbe 100644
---
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
+++
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/execution/datasources/v2/OffloadDeltaCommand.scala
@@ -20,13 +20,14 @@ import org.apache.gluten.config.VeloxDeltaConfig
import org.apache.gluten.extension.columnar.offload.OffloadSingleNode
import org.apache.spark.sql.delta.catalog.DeltaCatalog
-import org.apache.spark.sql.delta.commands.{DeleteCommand, UpdateCommand}
+import org.apache.spark.sql.delta.commands.{DeleteCommand, DeltaCommand,
OptimizeTableCommand, UpdateCommand}
+import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils
import org.apache.spark.sql.delta.sources.DeltaDataSource
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand
-case class OffloadDeltaCommand() extends OffloadSingleNode {
+case class OffloadDeltaCommand() extends OffloadSingleNode with DeltaCommand {
override def offload(plan: SparkPlan): SparkPlan = {
if (!VeloxDeltaConfig.get.enableNativeWrite) {
return plan
@@ -36,6 +37,8 @@ case class OffloadDeltaCommand() extends OffloadSingleNode {
ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(uc))
case ExecutedCommandExec(dc: DeleteCommand) =>
ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(dc))
+ case ExecutedCommandExec(optimize: OptimizeTableCommand) if
shouldOffloadOptimize(optimize) =>
+ ExecutedCommandExec(GlutenDeltaRunnableCommand(optimize))
case ExecutedCommandExec(s @ SaveIntoDataSourceCommand(_, _:
DeltaDataSource, _, _)) =>
ExecutedCommandExec(GlutenDeltaLeafRunnableCommand(s))
case ctas: AtomicCreateTableAsSelectExec if
ctas.catalog.isInstanceOf[DeltaCatalog] =>
@@ -45,4 +48,19 @@ case class OffloadDeltaCommand() extends OffloadSingleNode {
case other => other
}
}
+
+ // Currently only plain OPTIMIZE bin-packing is supported for command
offload. OPTIMIZE
+ // variants with layout-specific semantics, such as ZORDER, REORG, OPTIMIZE
FULL, or
+ // liquid clustering, continue to use Delta's original command path.
+ private def shouldOffloadOptimize(optimize: OptimizeTableCommand): Boolean =
{
+ optimize.zOrderBy.isEmpty &&
+ optimize.optimizeContext.reorg.isEmpty &&
+ !optimize.optimizeContext.isFull &&
+ !isClusteredOptimize(optimize)
+ }
+
+ private def isClusteredOptimize(optimize: OptimizeTableCommand): Boolean = {
+ val snapshot = getDeltaTable(optimize.child, "OPTIMIZE").update()
+ ClusteredTableUtils.isSupported(snapshot.protocol)
+ }
}
diff --git
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala
new file mode 100644
index 0000000000..2620571186
--- /dev/null
+++
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta
+
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.config.VeloxDeltaConfig
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.delta.actions.AddFile
+import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import
org.apache.spark.sql.execution.datasources.v2.{GlutenDeltaLeafRunnableCommand,
GlutenDeltaLeafV2CommandExec, GlutenDeltaRunnableCommand}
+import org.apache.spark.sql.internal.SQLConf
+
+class DeltaNativeWriteSuite extends DeltaSQLCommandTest {
+
+ import testImplicits._
+
+ private lazy val isMac = sys.props
+ .get("os.name")
+ .exists(_.toLowerCase(java.util.Locale.ROOT).contains("mac"))
+
+ private def withNativeWriteOffloadConf(f: => Unit): Unit = {
+ val confs = Seq(
+ SQLConf.ANSI_ENABLED.key -> "false",
+ SQLConf.SESSION_LOCAL_TIMEZONE.key -> "UTC",
+ GlutenConfig.GLUTEN_ANSI_FALLBACK_ENABLED.key -> "false",
+ DeltaSQLConf.DELTA_COLLECT_STATS.key -> "false"
+ ) ++
+ (if (isMac) {
+ Seq(GlutenConfig.NATIVE_VALIDATION_ENABLED.key -> "false")
+ } else {
+ Seq.empty
+ })
+
+ withSQLConf(confs: _*) {
+ f
+ }
+ }
+
+ private def hasGlutenDeltaWriteCommand(plan: SparkPlan): Boolean = {
+ val nativeClassMatch = plan
+ .collectFirst {
+ case ExecutedCommandExec(_: GlutenDeltaLeafRunnableCommand) => true
+ case ExecutedCommandExec(_: GlutenDeltaRunnableCommand) => true
+ case _: GlutenDeltaLeafV2CommandExec => true
+ }
+ .getOrElse(false)
+
+ val nativeNodeMatch = plan
+ .collectFirst {
+ case p if p.nodeName.startsWith("Execute GlutenDelta ") => true
+ case p if p.nodeName.startsWith("GlutenDelta ") => true
+ }
+ .getOrElse(false)
+
+ val nativeTreeMatch = plan.treeString.contains("GlutenDelta ")
+
+ nativeClassMatch || nativeNodeMatch || nativeTreeMatch
+ }
+
+ private def assertContainsNativeWriteCommand(plan: SparkPlan, context:
String): Unit = {
+ assert(
+ hasGlutenDeltaWriteCommand(plan),
+ s"Expected native delta write command for $context, but got
plan:\n${plan.treeString}"
+ )
+ }
+
+ private def assertNoNativeWriteCommand(plan: SparkPlan, context: String):
Unit = {
+ assert(
+ !hasGlutenDeltaWriteCommand(plan),
+ s"Expected no native delta write command for $context, but got
plan:\n${plan.treeString}"
+ )
+ }
+
+ private def files(deltaLog: DeltaLog): Set[AddFile] = {
+ deltaLog.update().allFiles.collect().toSet
+ }
+
+ private def collectOptimizeMetrics(df: DataFrame): OptimizeMetrics = {
+ val metrics = df.select("metrics.*").as[OptimizeMetrics].collect()
+ assert(metrics.length == 1, s"Expected one OPTIMIZE result row, got
${metrics.length}")
+ metrics.head
+ }
+
+ private def assertOptimizeCommit(deltaLog: DeltaLog, context: String): Unit
= {
+ val latestCommit = deltaLog.history.getHistory(Some(1)).head
+ assert(
+ latestCommit.operation == "OPTIMIZE",
+ s"Expected latest Delta operation for $context to be OPTIMIZE, got " +
+ latestCommit.operation)
+ }
+
+ private def assertCompactionMetrics(
+ metrics: OptimizeMetrics,
+ beforeFileCount: Int,
+ afterFileCount: Int,
+ context: String,
+ expectedPartitionsOptimized: Option[Long] = None): Unit = {
+ assert(metrics.numFilesRemoved > 0, s"Expected files removed for $context")
+ assert(metrics.numFilesAdded > 0, s"Expected files added for $context")
+ assert(
+ afterFileCount < beforeFileCount,
+ s"Expected fewer active files after $context, before=$beforeFileCount
after=$afterFileCount")
+ assert(
+ metrics.numFilesRemoved > metrics.numFilesAdded,
+ s"Expected $context to compact to fewer files,
removed=${metrics.numFilesRemoved} " +
+ s"added=${metrics.numFilesAdded}"
+ )
+ assert(
+ metrics.filesRemoved.totalFiles == metrics.numFilesRemoved,
+ s"Removed file metrics did not match numFilesRemoved for $context")
+ assert(
+ metrics.filesAdded.totalFiles == metrics.numFilesAdded,
+ s"Added file metrics did not match numFilesAdded for $context")
+ assert(metrics.filesRemoved.totalSize > 0, s"Expected removed file size
metrics for $context")
+ assert(metrics.filesAdded.totalSize > 0, s"Expected added file size
metrics for $context")
+ assert(metrics.numBatches > 0, s"Expected at least one optimize batch for
$context")
+ expectedPartitionsOptimized.foreach {
+ expected =>
+ assert(
+ metrics.partitionsOptimized == expected,
+ s"Expected $expected optimized partitions for $context, got " +
+ metrics.partitionsOptimized)
+ }
+ }
+
+ test("native delta optimize command should be offloaded") {
+ withNativeWriteOffloadConf {
+ withTempDir {
+ dir =>
+ val path = dir.getCanonicalPath
+ spark.range(0, 32, 1,
4).toDF("id").write.format("delta").mode("append").save(path)
+ spark.range(32, 64, 1,
4).toDF("id").write.format("delta").mode("append").save(path)
+
+ val deltaLog = DeltaLog.forTable(spark, path)
+ val beforeFiles = files(deltaLog)
+
+ val optimizeDf = sql(s"OPTIMIZE delta.`$path`")
+
assertContainsNativeWriteCommand(optimizeDf.queryExecution.executedPlan,
"OPTIMIZE")
+ val metrics = collectOptimizeMetrics(optimizeDf)
+
+ val afterFiles = files(deltaLog)
+ assertCompactionMetrics(metrics, beforeFiles.size, afterFiles.size,
"path OPTIMIZE")
+ assertOptimizeCommit(deltaLog, "path OPTIMIZE")
+ val result = spark.read.format("delta").load(path)
+ assert(result.collect().map(_.getLong(0)).toSet == (0L until
64L).toSet)
+ }
+ }
+ }
+
+ test("native delta optimize table command should be offloaded") {
+ withNativeWriteOffloadConf {
+ withTable("delta_native_optimize_table") {
+ spark
+ .range(0, 32, 1, 4)
+ .toDF("id")
+ .write
+ .format("delta")
+ .mode("overwrite")
+ .saveAsTable("delta_native_optimize_table")
+ spark
+ .range(32, 64, 1, 4)
+ .toDF("id")
+ .write
+ .format("delta")
+ .mode("append")
+ .saveAsTable("delta_native_optimize_table")
+
+ val deltaLog = DeltaLog.forTable(spark,
TableIdentifier("delta_native_optimize_table"))
+ val beforeFiles = files(deltaLog)
+
+ val optimizeDf = sql("OPTIMIZE delta_native_optimize_table")
+
assertContainsNativeWriteCommand(optimizeDf.queryExecution.executedPlan,
"OPTIMIZE table")
+ val metrics = collectOptimizeMetrics(optimizeDf)
+
+ val afterFiles = files(deltaLog)
+ assertCompactionMetrics(metrics, beforeFiles.size, afterFiles.size,
"table OPTIMIZE")
+ assertOptimizeCommit(deltaLog, "table OPTIMIZE")
+ val result = spark.read.table("delta_native_optimize_table")
+ assert(result.collect().map(_.getLong(0)).toSet == (0L until
64L).toSet)
+ }
+ }
+ }
+
+ test("native delta optimize partition predicate command should be
offloaded") {
+ withNativeWriteOffloadConf {
+ withTempDir {
+ dir =>
+ val path = dir.getCanonicalPath
+ spark
+ .range(0, 20, 1, 4)
+ .selectExpr("id", "cast(id % 2 as int) as part")
+ .write
+ .format("delta")
+ .partitionBy("part")
+ .mode("append")
+ .save(path)
+ spark
+ .range(20, 40, 1, 4)
+ .selectExpr("id", "cast(id % 2 as int) as part")
+ .write
+ .format("delta")
+ .partitionBy("part")
+ .mode("append")
+ .save(path)
+
+ val deltaLog = DeltaLog.forTable(spark, path)
+ val beforeFiles = files(deltaLog)
+ val beforePart0Paths = beforeFiles
+ .filter(_.partitionValues.get("part").contains("0"))
+ .map(_.path)
+ val beforePart1Count =
beforeFiles.count(_.partitionValues.get("part").contains("1"))
+
+ val optimizeDf = sql(s"OPTIMIZE delta.`$path` WHERE part = 1")
+ assertContainsNativeWriteCommand(
+ optimizeDf.queryExecution.executedPlan,
+ "OPTIMIZE WHERE")
+ val metrics = collectOptimizeMetrics(optimizeDf)
+
+ val afterFiles = files(deltaLog)
+ val afterPart0Paths = afterFiles
+ .filter(_.partitionValues.get("part").contains("0"))
+ .map(_.path)
+ val afterPart1Count =
afterFiles.count(_.partitionValues.get("part").contains("1"))
+ assert(
+ beforePart0Paths.subsetOf(afterPart0Paths),
+ "OPTIMIZE WHERE part = 1 should not remove files from part = 0")
+ assert(
+ afterPart1Count < beforePart1Count,
+ s"Expected fewer active files in part = 1,
before=$beforePart1Count " +
+ s"after=$afterPart1Count")
+ assertCompactionMetrics(
+ metrics,
+ beforeFiles.size,
+ afterFiles.size,
+ "partition predicate OPTIMIZE",
+ expectedPartitionsOptimized = Some(1L))
+ assertOptimizeCommit(deltaLog, "partition predicate OPTIMIZE")
+ val result = spark.read.format("delta").load(path)
+ assert(result.select("id").collect().map(_.getLong(0)).toSet == (0L
until 40L).toSet)
+ assert(result.where("part = 0").count() == 20)
+ assert(result.where("part = 1").count() == 20)
+ }
+ }
+ }
+
+ test("delta optimize command should not be offloaded when native write is
disabled") {
+ withNativeWriteOffloadConf {
+ withTempDir {
+ dir =>
+ val path = dir.getCanonicalPath
+ spark.range(0, 10, 1,
2).toDF("id").write.format("delta").mode("append").save(path)
+ spark.range(10, 20, 1,
2).toDF("id").write.format("delta").mode("append").save(path)
+
+ withSQLConf(VeloxDeltaConfig.ENABLE_NATIVE_WRITE.key -> "false") {
+ val optimizeDf = sql(s"OPTIMIZE delta.`$path`")
+ assertNoNativeWriteCommand(
+ optimizeDf.queryExecution.executedPlan,
+ "OPTIMIZE with native write disabled")
+ optimizeDf.collect()
+ }
+
+ val result = spark.read.format("delta").load(path)
+ assert(result.collect().map(_.getLong(0)).toSet == (0L until
20L).toSet)
+ }
+ }
+ }
+}
diff --git
a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala
b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala
index 0b5423ab22..bca0a66d1a 100644
---
a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala
+++
b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaNativeWriteSuite.scala
@@ -19,17 +19,20 @@ package org.apache.spark.sql.delta
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.config.VeloxDeltaConfig
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.delta.actions.AddFile
+import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.ExecutedCommandExec
-import
org.apache.spark.sql.execution.datasources.v2.{GlutenDeltaLeafRunnableCommand,
GlutenDeltaLeafV2CommandExec}
+import
org.apache.spark.sql.execution.datasources.v2.{GlutenDeltaLeafRunnableCommand,
GlutenDeltaLeafV2CommandExec, GlutenDeltaRunnableCommand}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.QueryExecutionListener
-import java.util.concurrent.CopyOnWriteArrayList
+import java.util.concurrent.{CopyOnWriteArrayList, TimeUnit}
import scala.jdk.CollectionConverters._
@@ -86,6 +89,7 @@ class DeltaNativeWriteSuite extends DeltaSQLCommandTest {
val nativeClassMatch = plan
.collectFirst {
case ExecutedCommandExec(_: GlutenDeltaLeafRunnableCommand) => true
+ case ExecutedCommandExec(_: GlutenDeltaRunnableCommand) => true
case _: GlutenDeltaLeafV2CommandExec => true
}
.getOrElse(false)
@@ -115,6 +119,21 @@ class DeltaNativeWriteSuite extends DeltaSQLCommandTest {
spark.listenerManager.register(listener)
try {
action
+ val deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10)
+ var lastSize = -1
+ var stableSince = System.nanoTime()
+ while (System.nanoTime() < deadline) {
+ val size = plans.size()
+ val now = System.nanoTime()
+ if (size != lastSize) {
+ lastSize = size
+ stableSince = now
+ }
+ if (size > 0 && now - stableSince >=
TimeUnit.MILLISECONDS.toNanos(500)) {
+ return plans.asScala.toSeq
+ }
+ Thread.sleep(50)
+ }
} finally {
spark.listenerManager.unregister(listener)
}
@@ -137,6 +156,58 @@ class DeltaNativeWriteSuite extends DeltaSQLCommandTest {
)
}
+ private def files(deltaLog: DeltaLog): Set[AddFile] = {
+ deltaLog.update().allFiles.collect().toSet
+ }
+
+ private def collectOptimizeMetrics(df: DataFrame): OptimizeMetrics = {
+ val metrics = df.select("metrics.*").as[OptimizeMetrics].collect()
+ assert(metrics.length == 1, s"Expected one OPTIMIZE result row, got
${metrics.length}")
+ metrics.head
+ }
+
+ private def assertOptimizeCommit(deltaLog: DeltaLog, context: String): Unit
= {
+ val latestCommit = deltaLog.history.getHistory(Some(1)).head
+ assert(
+ latestCommit.operation == "OPTIMIZE",
+ s"Expected latest Delta operation for $context to be OPTIMIZE, got " +
+ latestCommit.operation)
+ }
+
+ private def assertCompactionMetrics(
+ metrics: OptimizeMetrics,
+ beforeFileCount: Int,
+ afterFileCount: Int,
+ context: String,
+ expectedPartitionsOptimized: Option[Long] = None): Unit = {
+ assert(metrics.numFilesRemoved > 0, s"Expected files removed for $context")
+ assert(metrics.numFilesAdded > 0, s"Expected files added for $context")
+ assert(
+ afterFileCount < beforeFileCount,
+ s"Expected fewer active files after $context, before=$beforeFileCount
after=$afterFileCount")
+ assert(
+ metrics.numFilesRemoved > metrics.numFilesAdded,
+ s"Expected $context to compact to fewer files,
removed=${metrics.numFilesRemoved} " +
+ s"added=${metrics.numFilesAdded}"
+ )
+ assert(
+ metrics.filesRemoved.totalFiles == metrics.numFilesRemoved,
+ s"Removed file metrics did not match numFilesRemoved for $context")
+ assert(
+ metrics.filesAdded.totalFiles == metrics.numFilesAdded,
+ s"Added file metrics did not match numFilesAdded for $context")
+ assert(metrics.filesRemoved.totalSize > 0, s"Expected removed file size
metrics for $context")
+ assert(metrics.filesAdded.totalSize > 0, s"Expected added file size
metrics for $context")
+ assert(metrics.numBatches > 0, s"Expected at least one optimize batch for
$context")
+ expectedPartitionsOptimized.foreach {
+ expected =>
+ assert(
+ metrics.partitionsOptimized == expected,
+ s"Expected $expected optimized partitions for $context, got " +
+ metrics.partitionsOptimized)
+ }
+ }
+
test("native delta delete command should be offloaded") {
withNativeWriteOffloadConf {
withTempDir {
@@ -275,6 +346,150 @@ class DeltaNativeWriteSuite extends DeltaSQLCommandTest {
}
}
+ test("native delta optimize command should be offloaded") {
+ withNativeWriteOffloadConf {
+ withTempDir {
+ dir =>
+ val path = dir.getCanonicalPath
+ spark.range(0, 32, 1,
4).toDF("id").write.format("delta").mode("append").save(path)
+ spark.range(32, 64, 1,
4).toDF("id").write.format("delta").mode("append").save(path)
+
+ val deltaLog = DeltaLog.forTable(spark, path)
+ val beforeFiles = files(deltaLog)
+
+ val optimizeDf = sql(s"OPTIMIZE delta.`$path`")
+
assertContainsNativeWriteCommand(Seq(optimizeDf.queryExecution.executedPlan),
"OPTIMIZE")
+ val metrics = collectOptimizeMetrics(optimizeDf)
+
+ val afterFiles = files(deltaLog)
+ assertCompactionMetrics(metrics, beforeFiles.size, afterFiles.size,
"path OPTIMIZE")
+ assertOptimizeCommit(deltaLog, "path OPTIMIZE")
+ val result = spark.read.format("delta").load(path)
+ assert(result.collect().map(_.getLong(0)).toSet == (0L until
64L).toSet)
+ }
+ }
+ }
+
+ test("native delta optimize table command should be offloaded") {
+ withNativeWriteOffloadConf {
+ withTable("delta_native_optimize_table") {
+ spark
+ .range(0, 32, 1, 4)
+ .toDF("id")
+ .write
+ .format("delta")
+ .mode("overwrite")
+ .saveAsTable("delta_native_optimize_table")
+ spark
+ .range(32, 64, 1, 4)
+ .toDF("id")
+ .write
+ .format("delta")
+ .mode("append")
+ .saveAsTable("delta_native_optimize_table")
+
+ val deltaLog = DeltaLog.forTable(spark,
TableIdentifier("delta_native_optimize_table"))
+ val beforeFiles = files(deltaLog)
+
+ val optimizeDf = sql("OPTIMIZE delta_native_optimize_table")
+ assertContainsNativeWriteCommand(
+ Seq(optimizeDf.queryExecution.executedPlan),
+ "OPTIMIZE table")
+ val metrics = collectOptimizeMetrics(optimizeDf)
+
+ val afterFiles = files(deltaLog)
+ assertCompactionMetrics(metrics, beforeFiles.size, afterFiles.size,
"table OPTIMIZE")
+ assertOptimizeCommit(deltaLog, "table OPTIMIZE")
+ val result = spark.read.table("delta_native_optimize_table")
+ assert(result.collect().map(_.getLong(0)).toSet == (0L until
64L).toSet)
+ }
+ }
+ }
+
+ test("native delta optimize partition predicate command should be
offloaded") {
+ withNativeWriteOffloadConf {
+ withTempDir {
+ dir =>
+ val path = dir.getCanonicalPath
+ spark
+ .range(0, 20, 1, 4)
+ .selectExpr("id", "cast(id % 2 as int) as part")
+ .write
+ .format("delta")
+ .partitionBy("part")
+ .mode("append")
+ .save(path)
+ spark
+ .range(20, 40, 1, 4)
+ .selectExpr("id", "cast(id % 2 as int) as part")
+ .write
+ .format("delta")
+ .partitionBy("part")
+ .mode("append")
+ .save(path)
+
+ val deltaLog = DeltaLog.forTable(spark, path)
+ val beforeFiles = files(deltaLog)
+ val beforePart0Paths = beforeFiles
+ .filter(_.partitionValues.get("part").contains("0"))
+ .map(_.path)
+ val beforePart1Count =
beforeFiles.count(_.partitionValues.get("part").contains("1"))
+
+ val optimizeDf = sql(s"OPTIMIZE delta.`$path` WHERE part = 1")
+ assertContainsNativeWriteCommand(
+ Seq(optimizeDf.queryExecution.executedPlan),
+ "OPTIMIZE WHERE")
+ val metrics = collectOptimizeMetrics(optimizeDf)
+
+ val afterFiles = files(deltaLog)
+ val afterPart0Paths = afterFiles
+ .filter(_.partitionValues.get("part").contains("0"))
+ .map(_.path)
+ val afterPart1Count =
afterFiles.count(_.partitionValues.get("part").contains("1"))
+ assert(
+ beforePart0Paths.subsetOf(afterPart0Paths),
+ "OPTIMIZE WHERE part = 1 should not remove files from part = 0")
+ assert(
+ afterPart1Count < beforePart1Count,
+ s"Expected fewer active files in part = 1,
before=$beforePart1Count " +
+ s"after=$afterPart1Count")
+ assertCompactionMetrics(
+ metrics,
+ beforeFiles.size,
+ afterFiles.size,
+ "partition predicate OPTIMIZE",
+ expectedPartitionsOptimized = Some(1L))
+ assertOptimizeCommit(deltaLog, "partition predicate OPTIMIZE")
+ val result = spark.read.format("delta").load(path)
+ assert(result.select("id").collect().map(_.getLong(0)).toSet == (0L
until 40L).toSet)
+ assert(result.where("part = 0").count() == 20)
+ assert(result.where("part = 1").count() == 20)
+ }
+ }
+ }
+
+ test("delta optimize command should not be offloaded when native write is
disabled") {
+ withNativeWriteOffloadConf {
+ withTempDir {
+ dir =>
+ val path = dir.getCanonicalPath
+ spark.range(0, 10, 1,
2).toDF("id").write.format("delta").mode("append").save(path)
+ spark.range(10, 20, 1,
2).toDF("id").write.format("delta").mode("append").save(path)
+
+ withSQLConf(VeloxDeltaConfig.ENABLE_NATIVE_WRITE.key -> "false") {
+ val optimizeDf = sql(s"OPTIMIZE delta.`$path`")
+ assertNoNativeWriteCommand(
+ Seq(optimizeDf.queryExecution.executedPlan),
+ "OPTIMIZE with native write disabled")
+ optimizeDf.collect()
+ }
+
+ val result = spark.read.format("delta").load(path)
+ assert(result.collect().map(_.getLong(0)).toSet == (0L until
20L).toSet)
+ }
+ }
+ }
+
test("delta save command should not be offloaded when native write is
disabled") {
withNativeWriteOffloadConf {
withTempDir {
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 29a1bfd685..71b12e340e 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -703,6 +703,9 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-37779: ColumnarToRowExec should be canonicalizable after
being (de)serialized")
enableSuite[GlutenSparkPlannerSuite]
enableSuite[GlutenSparkScriptTransformationSuite]
+ // Flaky in CI containers for Spark 4.1: intermittently fails with
+ // `/tmp/test-resource*.py: Permission denied` and can crash JVM.
+ .exclude("SPARK-33934: Add SparkFile's root dir to env property PATH")
enableSuite[GlutenSparkSqlParserSuite]
enableSuite[GlutenUnsafeFixedWidthAggregationMapSuite]
enableSuite[GlutenUnsafeKVExternalSorterSuite]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]