This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 5cfa0d70a3 [GLUTEN-10215][VL] Delta Write: Offload 
DeltaOptimizedWriterExec (#11461)
5cfa0d70a3 is described below

commit 5cfa0d70a37d2adff355dd834e01b00f4839271f
Author: Hongze Zhang <[email protected]>
AuthorDate: Fri Jan 23 09:38:40 2026 +0000

    [GLUTEN-10215][VL] Delta Write: Offload DeltaOptimizedWriterExec (#11461)
---
 backends-velox/pom.xml                             |   6 +
 .../sql/delta/GlutenOptimisticTransaction.scala    |  34 +-
 .../delta/files/GlutenDeltaFileFormatWriter.scala  |   8 +-
 .../perf/GlutenDeltaOptimizedWriterExec.scala      | 365 ++++++++++++++++++
 .../clustering/ClusteredTableClusteringSuite.scala | 153 ++++++++
 .../CoordinatedCommitsTestUtils.scala              | 421 +++++++++++++++++++++
 .../delta/skipping/ClusteredTableTestUtils.scala   | 385 +++++++++++++++++++
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  |  10 +-
 .../ColumnarCollapseTransformStages.scala          |   5 +-
 pom.xml                                            |   1 +
 10 files changed, 1380 insertions(+), 8 deletions(-)

diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml
index d19ec7ea1b..986728a47a 100755
--- a/backends-velox/pom.xml
+++ b/backends-velox/pom.xml
@@ -538,6 +538,12 @@
           
<artifactId>${delta.package.name}_${scala.binary.version}</artifactId>
           <scope>provided</scope>
         </dependency>
+        <dependency>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk-dynamodb</artifactId>
+          <version>${aws-java-sdk-dynamodb.version}</version>
+          <scope>test</scope>
+        </dependency>
       </dependencies>
     </profile>
     <profile>
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
index 275d2c0cd9..0f2381f454 100644
--- 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/GlutenOptimisticTransaction.scala
@@ -16,14 +16,15 @@
  */
 package org.apache.spark.sql.delta
 
-import org.apache.gluten.config.VeloxDeltaConfig
+import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.extension.columnar.transition.Transitions
 
-import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.{AnalysisException, Dataset}
 import org.apache.spark.sql.delta.actions.{AddFile, FileAction}
 import org.apache.spark.sql.delta.constraints.{Constraint, Constraints, 
DeltaInvariantCheckerExec}
 import org.apache.spark.sql.delta.files.{GlutenDeltaFileFormatWriter, 
TransactionalWrite}
 import org.apache.spark.sql.delta.hooks.AutoCompact
-import org.apache.spark.sql.delta.perf.DeltaOptimizedWriterExec
+import org.apache.spark.sql.delta.perf.{DeltaOptimizedWriterExec, 
GlutenDeltaOptimizedWriterExec}
 import org.apache.spark.sql.delta.schema.InnerInvariantViolationException
 import org.apache.spark.sql.delta.sources.DeltaSQLConf
 import 
org.apache.spark.sql.delta.stats.{GlutenDeltaIdentityColumnStatsTracker, 
GlutenDeltaJobStatisticsTracker}
@@ -50,7 +51,6 @@ class GlutenOptimisticTransaction(delegate: 
OptimisticTransaction)
     hasWritten = true
 
     val spark = inputData.sparkSession
-    val veloxDeltaConfig = new VeloxDeltaConfig(spark.sessionState.conf)
 
     val (data, partitionSchema) = performCDCPartition(inputData)
     val outputPath = deltaLog.dataPath
@@ -108,7 +108,31 @@ class GlutenOptimisticTransaction(delegate: 
OptimisticTransaction)
           !isOptimize &&
           shouldOptimizeWrite(writeOptions, spark.sessionState.conf)
         ) {
-          DeltaOptimizedWriterExec(maybeCheckInvariants, 
metadata.partitionColumns, deltaLog)
+          // FIXME: This may create unexpected C2R2C / R2C where the original 
plan is better to be
+          //  written with the vanilla DeltaOptimizedWriterExec. We'd optimize 
the query plan
+          //  here further.
+          val planWithVeloxOutput = 
Transitions.toBatchPlan(maybeCheckInvariants, VeloxBatchType)
+          try {
+            val glutenWriterExec = GlutenDeltaOptimizedWriterExec(
+              planWithVeloxOutput,
+              metadata.partitionColumns,
+              deltaLog)
+            val validationResult = glutenWriterExec.doValidate()
+            if (validationResult.ok()) {
+              glutenWriterExec
+            } else {
+              logInfo(
+                s"GlutenDeltaOptimizedWriterExec: Internal shuffle validated 
negative," +
+                  s" reason: ${validationResult.reason()}. Falling back to 
row-based shuffle.")
+              DeltaOptimizedWriterExec(maybeCheckInvariants, 
metadata.partitionColumns, deltaLog)
+            }
+          } catch {
+            case e: AnalysisException =>
+              logWarning(
+                s"GlutenDeltaOptimizedWriterExec: Failed to create internal 
shuffle," +
+                  s" reason: ${e.getMessage()}. Falling back to row-based 
shuffle.")
+              DeltaOptimizedWriterExec(maybeCheckInvariants, 
metadata.partitionColumns, deltaLog)
+          }
         } else {
           maybeCheckInvariants
         }
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
index 8b5a4fdc34..3ea64ab6e1 100644
--- 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
@@ -18,9 +18,10 @@ package org.apache.spark.sql.delta.files
 
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution._
 import org.apache.gluten.execution.datasource.GlutenFormatFactory
-import org.apache.gluten.extension.columnar.transition.Transitions
+import org.apache.gluten.extension.columnar.transition.{Convention, 
Transitions}
 
 import org.apache.spark._
 import org.apache.spark.internal.{LoggingShims, MDC}
@@ -262,6 +263,11 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
           val newPlan = sortPlan.child match {
             case wst @ WholeStageTransformer(wholeStageChild, _) =>
               wst.withNewChildren(Seq(addNativeSort(wholeStageChild)))
+            case other if Convention.get(other).batchType == VeloxBatchType =>
+              val nativeSortPlan = addNativeSort(other)
+              val nativeSortPlanWithWst =
+                GenerateTransformStageId()(ColumnarCollapseTransformStages(new 
GlutenConfig(sparkSession.sessionState.conf))(nativeSortPlan))
+              nativeSortPlanWithWst
             case other =>
               Transitions.toBatchPlan(sortPlan, VeloxBatchType)
           }
diff --git 
a/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/perf/GlutenDeltaOptimizedWriterExec.scala
 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/perf/GlutenDeltaOptimizedWriterExec.scala
new file mode 100644
index 0000000000..8e83a8af87
--- /dev/null
+++ 
b/backends-velox/src-delta33/main/scala/org/apache/spark/sql/delta/perf/GlutenDeltaOptimizedWriterExec.scala
@@ -0,0 +1,365 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.perf
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.backendsapi.velox.VeloxBatchType
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.execution.{ValidatablePlan, ValidationResult}
+import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.vectorized.ColumnarBatchSerializerInstance
+
+// scalastyle:off import.ordering.noEmptyLine
+import org.apache.spark._
+import org.apache.spark.internal.config
+import org.apache.spark.internal.config.ConfigEntry
+import org.apache.spark.network.util.ByteUnit
+import org.apache.spark.rdd.RDD
+import org.apache.spark.shuffle._
+import org.apache.spark.shuffle.sort.ColumnarShuffleManager
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog}
+import org.apache.spark.sql.delta.metering.DeltaLogging
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.util.BinPackingUtils
+import org.apache.spark.sql.execution.{ColumnarCollapseTransformStages, 
ColumnarShuffleExchangeExec, GenerateTransformStageId}
+import org.apache.spark.sql.execution.{ShuffledColumnarBatchRDD, SparkPlan, 
UnaryExecNode}
+import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, 
SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.vectorized.ColumnarBatch
+import org.apache.spark.storage._
+import org.apache.spark.util.ThreadUtils
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration.Duration
+
+/** Gluten's vectorized version of [[DeltaOptimizedWriterExec]]. */
+case class GlutenDeltaOptimizedWriterExec(
+    child: SparkPlan,
+    partitionColumns: Seq[String],
+    @transient deltaLog: DeltaLog
+) extends ValidatablePlan
+  with UnaryExecNode
+  with DeltaLogging {
+
+  override def output: Seq[Attribute] = child.output
+
+  private lazy val writeMetrics =
+    SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+  private lazy val readMetrics =
+    SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+  override lazy val metrics: Map[String, SQLMetric] = Map(
+    "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
+  ) ++ readMetrics ++ writeMetrics
+
+  private lazy val childNumPartitions = 
child.executeColumnar().getNumPartitions
+
+  private lazy val numPartitions: Int = {
+    val targetShuffleBlocks = 
getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_SHUFFLE_BLOCKS)
+    math.min(
+      math.max(targetShuffleBlocks / childNumPartitions, 1),
+      getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_MAX_SHUFFLE_PARTITIONS))
+  }
+
+  @transient private var cachedShuffleRDD: ShuffledColumnarBatchRDD = _
+
+  @transient private lazy val mapTracker = SparkEnv.get.mapOutputTracker
+
+  private lazy val columnarShufflePlan = {
+    val resolver = 
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
+    val saltedPartitioning = HashPartitioning(
+      partitionColumns.map(
+        p =>
+          output
+            .find(o => resolver(p, o.name))
+            .getOrElse(throw 
DeltaErrors.failedFindPartitionColumnInOutputPlan(p))),
+      numPartitions)
+    val shuffle =
+      ShuffleExchangeExec(saltedPartitioning, child)
+    val columnarShuffle =
+      
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarShuffleExchange(shuffle)
+    val columnarShuffleWithWst =
+      GenerateTransformStageId()(
+        ColumnarCollapseTransformStages(new 
GlutenConfig(conf))(columnarShuffle))
+    columnarShuffleWithWst.asInstanceOf[ColumnarShuffleExchangeExec]
+  }
+
+  /** Creates a ShuffledRowRDD for facilitating the shuffle in the map side. */
+  private def getShuffleRDD: ShuffledColumnarBatchRDD = {
+    if (cachedShuffleRDD == null) {
+      val columnarShuffleRdd =
+        
columnarShufflePlan.executeColumnar().asInstanceOf[ShuffledColumnarBatchRDD]
+      cachedShuffleRDD = columnarShuffleRdd
+    }
+    cachedShuffleRDD
+  }
+
+  private def computeBins(): Array[List[(BlockManagerId, ArrayBuffer[(BlockId, 
Long, Int)])]] = {
+    // Get all shuffle information
+    val shuffleStats = getShuffleStats()
+
+    // Group by blockId instead of block manager
+    val blockInfo = shuffleStats.flatMap {
+      case (bmId, blocks) =>
+        blocks.map {
+          case (blockId, size, index) =>
+            (blockId, (bmId, size, index))
+        }
+    }.toMap
+
+    val maxBinSize =
+      
ByteUnit.BYTE.convertFrom(getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_BIN_SIZE), 
ByteUnit.MiB)
+
+    val bins = shuffleStats.toSeq
+      .flatMap(_._2)
+      .groupBy(_._1.asInstanceOf[ShuffleBlockId].reduceId)
+      .flatMap {
+        case (_, blocks) =>
+          BinPackingUtils.binPackBySize[(BlockId, Long, Int), BlockId](
+            blocks,
+            _._2, // size
+            _._1, // blockId
+            maxBinSize)
+      }
+
+    bins
+      .map {
+        bin =>
+          var binSize = 0L
+          val blockLocations =
+            new mutable.HashMap[BlockManagerId, ArrayBuffer[(BlockId, Long, 
Int)]]()
+          for (blockId <- bin) {
+            val (bmId, size, index) = blockInfo(blockId)
+            binSize += size
+            val blocksAtBM =
+              blockLocations.getOrElseUpdate(bmId, new ArrayBuffer[(BlockId, 
Long, Int)]())
+            blocksAtBM.append((blockId, size, index))
+          }
+          (binSize, blockLocations.toList)
+      }
+      .toArray
+      .sortBy(_._1)(Ordering[Long].reverse) // submit largest blocks first
+      .map(_._2)
+  }
+
+  /** Performs the shuffle before the write, so that we can bin-pack output 
data. */
+  private def getShuffleStats(): Array[(BlockManagerId, 
collection.Seq[(BlockId, Long, Int)])] = {
+    val dep = getShuffleRDD.dependency
+    // Gets the shuffle output stats
+    def getStats() =
+      mapTracker.getMapSizesByExecutorId(dep.shuffleId, 0, Int.MaxValue, 0, 
numPartitions).toArray
+
+    // Executes the shuffle map stage in case we are missing output stats
+    def awaitShuffleMapStage(): Unit = {
+      assert(dep != null, "Shuffle dependency should not be null")
+      // hack to materialize the shuffle files in a fault tolerant way
+      ThreadUtils.awaitResult(sparkContext.submitMapStage(dep), Duration.Inf)
+    }
+
+    try {
+      val res = getStats()
+      if (res.isEmpty) awaitShuffleMapStage()
+      getStats()
+    } catch {
+      case e: FetchFailedException =>
+        logWarning(log"Failed to fetch shuffle blocks for the optimized 
writer. Retrying", e)
+        awaitShuffleMapStage()
+        getStats()
+        throw e
+    }
+  }
+
+  override protected def doValidateInternal(): ValidationResult = {
+    // Single partitioned tasks can simply be written.
+    if (childNumPartitions <= 1) {
+      return ValidationResult.succeeded
+    }
+    columnarShufflePlan.doValidate()
+  }
+
+  override def doExecute(): RDD[InternalRow] = throw new 
UnsupportedOperationException
+
+  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    // Single partitioned tasks can simply be written.
+    if (childNumPartitions <= 1) {
+      return child.executeColumnar()
+    }
+    val shuffledRDD = getShuffleRDD
+
+    val partitions = computeBins()
+
+    recordDeltaEvent(
+      deltaLog,
+      "delta.optimizeWrite.planned",
+      data = Map(
+        "originalPartitions" -> childNumPartitions,
+        "outputPartitions" -> partitions.length,
+        "shufflePartitions" -> numPartitions,
+        "numShuffleBlocks" -> 
getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_SHUFFLE_BLOCKS),
+        "binSize" -> getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_BIN_SIZE),
+        "maxShufflePartitions" ->
+          getConf(DeltaSQLConf.DELTA_OPTIMIZE_WRITE_MAX_SHUFFLE_PARTITIONS)
+      )
+    )
+
+    new GlutenDeltaOptimizedWriterRDD(
+      sparkContext,
+      shuffledRDD.dependency,
+      readMetrics,
+      new OptimizedWriterBlocks(partitions))
+  }
+
+  private def getConf[T](entry: ConfigEntry[T]): T = {
+    conf.getConf(entry)
+  }
+
+  override protected def withNewChildInternal(newChild: SparkPlan): 
GlutenDeltaOptimizedWriterExec =
+    copy(child = newChild)
+
+  override def batchType(): Convention.BatchType = VeloxBatchType
+
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+}
+
+/**
+ * A specialized implementation similar to `ShuffledRowRDD`, where a partition 
reads a prepared set
+ * of shuffle blocks.
+ */
+private class GlutenDeltaOptimizedWriterRDD(
+    @transient sparkContext: SparkContext,
+    var dep: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch],
+    metrics: Map[String, SQLMetric],
+    @transient blocks: OptimizedWriterBlocks)
+  extends RDD[ColumnarBatch](sparkContext, Seq(dep))
+  with DeltaLogging {
+
+  override def getPartitions: Array[Partition] = 
Array.tabulate(blocks.bins.length) {
+    i => ShuffleBlockRDDPartition(i, blocks.bins(i))
+  }
+
+  override def compute(split: Partition, context: TaskContext): 
Iterator[ColumnarBatch] = {
+    val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
+    val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, 
metrics)
+
+    val blocks = if (context.stageAttemptNumber() > 0) {
+      // We lost shuffle blocks, so we need to now get new manager addresses
+      val executorTracker = SparkEnv.get.mapOutputTracker
+      val oldBlockLocations = 
split.asInstanceOf[ShuffleBlockRDDPartition].blocks
+
+      // assumes we bin-pack by reducerId
+      val reducerId = 
oldBlockLocations.head._2.head._1.asInstanceOf[ShuffleBlockId].reduceId
+      // Get block addresses
+      val newLocations = executorTracker
+        .getMapSizesByExecutorId(dep.shuffleId, reducerId)
+        .flatMap {
+          case (bmId, newBlocks) =>
+            newBlocks.map(blockInfo => (blockInfo._3, (bmId, blockInfo)))
+        }
+        .toMap
+
+      val blockLocations = new mutable.HashMap[BlockManagerId, 
ArrayBuffer[(BlockId, Long, Int)]]()
+      oldBlockLocations.foreach {
+        case (_, oldBlocks) =>
+          oldBlocks.foreach {
+            oldBlock =>
+              val (bmId, blockInfo) = newLocations(oldBlock._3)
+              val blocksAtBM =
+                blockLocations.getOrElseUpdate(bmId, new ArrayBuffer[(BlockId, 
Long, Int)]())
+              blocksAtBM.append(blockInfo)
+          }
+      }
+
+      blockLocations.iterator
+    } else {
+      split.asInstanceOf[ShuffleBlockRDDPartition].blocks.iterator
+    }
+
+    val reader = new GlutenOptimizedWriterShuffleReader(dep, context, blocks, 
sqlMetricsReporter)
+    reader.read().map(_._2)
+  }
+
+  override def clearDependencies(): Unit = {
+    super.clearDependencies()
+    dep = null
+  }
+}
+
+/** A simplified implementation of the `BlockStoreShuffleReader` for reading 
shuffle blocks. */
+private class GlutenOptimizedWriterShuffleReader(
+    dep: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch],
+    context: TaskContext,
+    blocks: Iterator[(BlockManagerId, ArrayBuffer[(BlockId, Long, Int)])],
+    readMetrics: ShuffleReadMetricsReporter)
+  extends ShuffleReader[Int, ColumnarBatch] {
+
+  /** Read the combined key-values for this reduce task */
+  override def read(): Iterator[Product2[Int, ColumnarBatch]] = {
+    val serializerManager = dep match {
+      case _: ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch] =>
+        ColumnarShuffleManager.bypassDecompressionSerializerManger
+      case _ =>
+        SparkEnv.get.serializerManager
+    }
+    val wrappedStreams = new ShuffleBlockFetcherIterator(
+      context,
+      SparkEnv.get.blockManager.blockStoreClient,
+      SparkEnv.get.blockManager,
+      SparkEnv.get.mapOutputTracker,
+      blocks,
+      serializerManager.wrapStream,
+      // Note: we use getSizeAsMb when no suffix is provided for backwards 
compatibility
+      SparkEnv.get.conf.get(config.REDUCER_MAX_SIZE_IN_FLIGHT) * 1024 * 1024,
+      SparkEnv.get.conf.get(config.REDUCER_MAX_REQS_IN_FLIGHT),
+      SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
+      SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
+      SparkEnv.get.conf.get(config.SHUFFLE_MAX_ATTEMPTS_ON_NETTY_OOM),
+      SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT),
+      SparkEnv.get.conf.get(config.SHUFFLE_DETECT_CORRUPT_MEMORY),
+      SparkEnv.get.conf.get(config.SHUFFLE_CHECKSUM_ENABLED),
+      SparkEnv.get.conf.get(config.SHUFFLE_CHECKSUM_ALGORITHM),
+      readMetrics,
+      false
+    ).toCompletionIterator
+
+    // Create a key/value iterator for each stream
+    val recordIter = dep match {
+      case columnarDep: ColumnarShuffleDependency[Int, ColumnarBatch, 
ColumnarBatch] =>
+        // If the dependency is a ColumnarShuffleDependency, we use the 
columnar serializer.
+        columnarDep.serializer
+          .newInstance()
+          .asInstanceOf[ColumnarBatchSerializerInstance]
+          .deserializeStreams(wrappedStreams)
+          .asKeyValueIterator
+      case _ =>
+        val serializerInstance = dep.serializer.newInstance()
+        // Create a key/value iterator for each stream
+        wrappedStreams.flatMap {
+          case (blockId, wrappedStream) =>
+            // Note: the asKeyValueIterator below wraps a key/value iterator 
inside of a
+            // NextIterator. The NextIterator makes sure that close() is 
called on the
+            // underlying InputStream when all records have been read.
+            
serializerInstance.deserializeStream(wrappedStream).asKeyValueIterator
+        }
+    }
+    // An interruptible iterator must be used here in order to support task 
cancellation
+    new InterruptibleIterator[(Any, Any)](context, recordIter)
+      .asInstanceOf[Iterator[Product2[Int, ColumnarBatch]]]
+  }
+}
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/clustering/ClusteredTableClusteringSuite.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/clustering/ClusteredTableClusteringSuite.scala
new file mode 100644
index 0000000000..2391f48d1b
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/clustering/ClusteredTableClusteringSuite.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.clustering
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.delta.DeltaLog
+import org.apache.spark.sql.delta.actions.AddFile
+import org.apache.spark.sql.delta.skipping.ClusteredTableTestUtils
+import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+class ClusteredTableClusteringSuite
+  extends SparkFunSuite
+  with SharedSparkSession
+  with ClusteredTableTestUtils
+  with DeltaSQLCommandTest {
+  import testImplicits._
+
+  private val table: String = "test_table"
+
+  // Ingest data to create numFiles files with one row in each file.
+  private def addFiles(table: String, numFiles: Int): Unit = {
+    val df = (1 to numFiles).map(i => (i, i)).toDF("col1", "col2")
+    withSQLConf(SQLConf.MAX_RECORDS_PER_FILE.key -> "1") {
+      df.write.format("delta").mode("append").saveAsTable(table)
+    }
+  }
+
+  private def getFiles(table: String): Set[AddFile] = {
+    val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table))
+    deltaLog.update().allFiles.collect().toSet
+  }
+
+  private def assertClustered(files: Set[AddFile]): Unit = {
+    
assert(files.forall(_.clusteringProvider.contains(ClusteredTableUtils.clusteringProvider)))
+  }
+
+  private def assertNotClustered(files: Set[AddFile]): Unit = {
+    assert(files.forall(_.clusteringProvider.isEmpty))
+  }
+
+  test("optimize clustered table") {
+    withSQLConf(SQLConf.MAX_RECORDS_PER_FILE.key -> "2") {
+      withClusteredTable(table = table, schema = "col1 int, col2 int", 
clusterBy = "col1, col2") {
+        addFiles(table, numFiles = 4)
+        val files0 = getFiles(table)
+        assert(files0.size === 4)
+        assertNotClustered(files0)
+
+        // Optimize should cluster the data into two 2 files since 
MAX_RECORDS_PER_FILE is 2.
+        runOptimize(table) {
+          metrics =>
+            assert(metrics.numFilesRemoved == 4)
+            assert(metrics.numFilesAdded == 2)
+        }
+
+        val files1 = getFiles(table)
+        assert(files1.size == 2)
+        assertClustered(files1)
+      }
+    }
+  }
+
+  test("cluster by 1 column") {
+    withSQLConf(SQLConf.MAX_RECORDS_PER_FILE.key -> "2") {
+      withClusteredTable(table = table, schema = "col1 int, col2 int", 
clusterBy = "col1") {
+        addFiles(table, numFiles = 4)
+        val files0 = getFiles(table)
+        assert(files0.size === 4)
+        assertNotClustered(files0)
+
+        // Optimize should cluster the data into two 2 files since 
MAX_RECORDS_PER_FILE is 2.
+        runOptimize(table) {
+          metrics =>
+            assert(metrics.numFilesRemoved == 4)
+            assert(metrics.numFilesAdded == 2)
+        }
+
+        val files1 = getFiles(table)
+        assert(files1.size == 2)
+        assertClustered(files1)
+      }
+    }
+  }
+
+  test("optimize clustered table with batching") {
+    Seq(("1", 2), ("1g", 1)).foreach {
+      case (batchSize, optimizeCommits) =>
+        withClusteredTable(table = table, schema = "col1 int, col2 int", 
clusterBy = "col1, col2") {
+          addFiles(table, numFiles = 4)
+          val files0 = getFiles(table)
+          assert(files0.size === 4)
+          assertNotClustered(files0)
+
+          val totalSize = files0.toSeq.map(_.size).sum
+          val halfSize = totalSize / 2
+
+          withSQLConf(
+            DeltaSQLConf.DELTA_OPTIMIZE_BATCH_SIZE.key -> batchSize,
+            DeltaSQLConf.DELTA_OPTIMIZE_CLUSTERING_MIN_CUBE_SIZE.key -> 
halfSize.toString,
+            DeltaSQLConf.DELTA_OPTIMIZE_CLUSTERING_TARGET_CUBE_SIZE.key -> 
halfSize.toString
+          ) {
+            // Optimize should create 2 cubes, which will be in separate 
batches if the batch size
+            // is small enough
+            runOptimize(table) {
+              metrics =>
+                assert(metrics.numFilesRemoved == 4)
+                assert(metrics.numFilesAdded == 2)
+            }
+
+            val files1 = getFiles(table)
+            assert(files1.size == 2)
+            assertClustered(files1)
+
+            val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table))
+
+            val commits = deltaLog.history.getHistory(None)
+            assert(commits.filter(_.operation == "OPTIMIZE").length == 
optimizeCommits)
+          }
+        }
+    }
+  }
+
+  test("optimize clustered table with batching on an empty table") {
+    withClusteredTable(table = table, schema = "col1 int, col2 int", clusterBy 
= "col1, col2") {
+      withSQLConf(DeltaSQLConf.DELTA_OPTIMIZE_BATCH_SIZE.key -> "1g") {
+        runOptimize(table) {
+          metrics =>
+            assert(metrics.numFilesRemoved == 0)
+            assert(metrics.numFilesAdded == 0)
+        }
+      }
+    }
+  }
+}
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala
new file mode 100644
index 0000000000..590787b6bd
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/coordinatedcommits/CoordinatedCommitsTestUtils.scala
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.coordinatedcommits
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.delta.{DeltaConfigs, DeltaLog, DeltaTestUtilsBase}
+import org.apache.spark.sql.delta.actions.{CommitInfo, Metadata, Protocol}
+import org.apache.spark.sql.delta.util.JsonUtils
+import org.apache.spark.sql.test.SharedSparkSession
+
+import io.delta.storage.LogStore
+import io.delta.storage.commit.{CommitCoordinatorClient => 
JCommitCoordinatorClient, CommitResponse, GetCommitsResponse => 
JGetCommitsResponse, TableDescriptor, TableIdentifier, UpdatedActions}
+import io.delta.storage.commit.actions.{AbstractMetadata, AbstractProtocol}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+
+import java.util.Optional
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.mutable
+import scala.util.control.NonFatal
+
+trait CoordinatedCommitsTestUtils extends DeltaTestUtilsBase {
+  self: SparkFunSuite with SharedSparkSession =>
+
+  protected val defaultCommitsCoordinatorName = "tracking-in-memory"
+  protected val defaultCommitsCoordinatorConf = Map("randomConf" -> 
"randomConfValue")
+
+  def getCoordinatedCommitsDefaultProperties(withICT: Boolean = false): 
Map[String, String] = {
+    val coordinatedCommitsConfJson = 
JsonUtils.toJson(defaultCommitsCoordinatorConf)
+    val properties = Map(
+      DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key -> 
defaultCommitsCoordinatorName,
+      DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.key -> 
coordinatedCommitsConfJson,
+      DeltaConfigs.COORDINATED_COMMITS_TABLE_CONF.key -> "{}"
+    )
+    if (withICT) {
+      properties + (DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.key -> "true")
+    } else {
+      properties
+    }
+  }
+
+  /**
+   * Runs a specific test with coordinated commits default properties unset. 
Any table created in
+   * this test won't have coordinated commits enabled by default.
+   */
+  def testWithDefaultCommitCoordinatorUnset(testName: String)(f: => Unit): 
Unit = {
+    test(testName) {
+      withoutCoordinatedCommitsDefaultTableProperties {
+        f
+      }
+    }
+  }
+
+  /**
+   * Runs the function `f` with coordinated commits default properties unset. 
Any table created in
+   * function `f` won't have coordinated commits enabled by default.
+   */
+  def withoutCoordinatedCommitsDefaultTableProperties[T](f: => T): T = {
+    val defaultCoordinatedCommitsConfs = CoordinatedCommitsUtils
+      .getDefaultCCConfigurations(spark, withDefaultKey = true)
+    defaultCoordinatedCommitsConfs.foreach {
+      case (defaultKey, _) =>
+        spark.conf.unset(defaultKey)
+    }
+    try { f }
+    finally {
+      defaultCoordinatedCommitsConfs.foreach {
+        case (defaultKey, oldValue) =>
+          spark.conf.set(defaultKey, oldValue)
+      }
+    }
+  }
+
+  /**
+   * Runs the function `f` with coordinated commits default properties set to 
what is specified. Any
+   * table created in function `f` will have the `commitCoordinator` property 
set to the specified
+   * `commitCoordinatorName`.
+   */
+  def withCustomCoordinatedCommitsTableProperties(
+      commitCoordinatorName: String,
+      conf: Map[String, String] = Map("randomConf" -> "randomConfValue"))(f: 
=> Unit): Unit = {
+    withSQLConf(
+      
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey ->
+        commitCoordinatorName,
+      
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey ->
+        JsonUtils.toJson(conf)
+    ) {
+      f
+    }
+  }
+
+  /** Run the test with different backfill batch sizes: 1, 2, 10 */
+  def testWithDifferentBackfillInterval(testName: String)(f: Int => Unit): 
Unit = {
+    Seq(1, 2, 10).foreach {
+      backfillBatchSize =>
+        test(s"$testName [Backfill batch size: $backfillBatchSize]") {
+          CommitCoordinatorProvider.clearNonDefaultBuilders()
+          CommitCoordinatorProvider.registerBuilder(
+            TrackingInMemoryCommitCoordinatorBuilder(backfillBatchSize))
+          CommitCoordinatorProvider.registerBuilder(
+            InMemoryCommitCoordinatorBuilder(backfillBatchSize))
+          f(backfillBatchSize)
+        }
+    }
+  }
+
+  /**
+   * Run the test against a [[TrackingCommitCoordinatorClient]] with backfill 
batch size =
+   * `batchBackfillSize`
+   */
+  def testWithCoordinatedCommits(backfillBatchSize: Int)(testName: String)(f: 
=> Unit): Unit = {
+    test(s"$testName [Backfill batch size: $backfillBatchSize]") {
+      CommitCoordinatorProvider.clearNonDefaultBuilders()
+      CommitCoordinatorProvider.registerBuilder(
+        TrackingInMemoryCommitCoordinatorBuilder(backfillBatchSize))
+      val coordinatedCommitsCoordinatorJson = 
JsonUtils.toJson(defaultCommitsCoordinatorConf)
+      withSQLConf(
+        
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey ->
+          defaultCommitsCoordinatorName,
+        
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey ->
+          coordinatedCommitsCoordinatorJson
+      ) {
+        f
+      }
+    }
+  }
+
+  /**
+   * Run the test with:
+   *   1. Without coordinated-commits 2. With coordinated-commits with 
different backfill batch
+   *      sizes
+   */
+  def testWithDifferentBackfillIntervalOptional(testName: String)(f: 
Option[Int] => Unit): Unit = {
+    test(s"$testName [Backfill batch size: None]") {
+      f(None)
+    }
+    testWithDifferentBackfillInterval(testName) {
+      backfillBatchSize =>
+        val coordinatedCommitsCoordinatorJson = 
JsonUtils.toJson(defaultCommitsCoordinatorConf)
+        withSQLConf(
+          
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey ->
+            defaultCommitsCoordinatorName,
+          
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey ->
+            coordinatedCommitsCoordinatorJson
+        ) {
+          f(Some(backfillBatchSize))
+        }
+    }
+  }
+
+  def getUpdatedActionsForZerothCommit(
+      commitInfo: CommitInfo,
+      oldMetadata: Metadata = Metadata()): UpdatedActions = {
+    val newMetadataConfiguration =
+      oldMetadata.configuration +
+        (DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.key -> 
defaultCommitsCoordinatorName)
+    val newMetadata = oldMetadata.copy(configuration = 
newMetadataConfiguration)
+    new UpdatedActions(commitInfo, newMetadata, Protocol(), oldMetadata, 
Protocol())
+  }
+
+  def getUpdatedActionsForNonZerothCommit(commitInfo: CommitInfo): 
UpdatedActions = {
+    val updatedActions = getUpdatedActionsForZerothCommit(commitInfo)
+    new UpdatedActions(
+      updatedActions.getCommitInfo,
+      updatedActions.getNewMetadata,
+      updatedActions.getNewProtocol,
+      updatedActions.getNewMetadata,
+      updatedActions.getOldProtocol
+    )
+  }
+}
+
+case class TrackingInMemoryCommitCoordinatorBuilder(
+    batchSize: Long,
+    defaultCommitCoordinatorClientOpt: Option[JCommitCoordinatorClient] = None,
+    defaultCommitCoordinatorName: String = "tracking-in-memory")
+  extends CommitCoordinatorBuilder {
+  lazy val trackingInMemoryCommitCoordinatorClient =
+    defaultCommitCoordinatorClientOpt.getOrElse {
+      new TrackingCommitCoordinatorClient(
+        new PredictableUuidInMemoryCommitCoordinatorClient(batchSize))
+    }
+
+  override def getName: String = defaultCommitCoordinatorName
+  override def build(spark: SparkSession, conf: Map[String, String]): 
JCommitCoordinatorClient = {
+    trackingInMemoryCommitCoordinatorClient
+  }
+}
+
+case class TrackingGenericInMemoryCommitCoordinatorBuilder(
+    builderName: String,
+    realBuilder: CommitCoordinatorBuilder)
+  extends CommitCoordinatorBuilder {
+  override def getName: String = builderName
+
+  override def build(spark: SparkSession, conf: Map[String, String]): 
JCommitCoordinatorClient = {
+    new TrackingCommitCoordinatorClient(realBuilder.build(spark, conf))
+  }
+}
+
+class PredictableUuidInMemoryCommitCoordinatorClient(batchSize: Long)
+  extends InMemoryCommitCoordinator(batchSize) {
+
+  var nextUuidSuffix = 1L
+  override def generateUUID(): String = {
+    nextUuidSuffix += 1
+    s"uuid-${nextUuidSuffix - 1}"
+  }
+}
+
+object TrackingCommitCoordinatorClient {
+  private val insideOperation = new ThreadLocal[Boolean] {
+    override def initialValue(): Boolean = false
+  }
+}
+
+class TrackingCommitCoordinatorClient(
+    val delegatingCommitCoordinatorClient: JCommitCoordinatorClient)
+  extends JCommitCoordinatorClient {
+
+  val numCommitsCalled = new AtomicInteger(0)
+  val numGetCommitsCalled = new AtomicInteger(0)
+  val numBackfillToVersionCalled = new AtomicInteger(0)
+  val numRegisterTableCalled = new AtomicInteger(0)
+
+  def recordOperation[T](op: String)(f: => T): T = {
+    val oldInsideOperation = 
TrackingCommitCoordinatorClient.insideOperation.get()
+    try {
+      if (!TrackingCommitCoordinatorClient.insideOperation.get()) {
+        op match {
+          case "commit" => numCommitsCalled.incrementAndGet()
+          case "getCommits" => numGetCommitsCalled.incrementAndGet()
+          case "backfillToVersion" => 
numBackfillToVersionCalled.incrementAndGet()
+          case "registerTable" => numRegisterTableCalled.incrementAndGet()
+          case _ => ()
+        }
+      }
+      TrackingCommitCoordinatorClient.insideOperation.set(true)
+      f
+    } finally {
+      TrackingCommitCoordinatorClient.insideOperation.set(oldInsideOperation)
+    }
+  }
+
+  override def commit(
+      logStore: LogStore,
+      hadoopConf: Configuration,
+      tableDesc: TableDescriptor,
+      commitVersion: Long,
+      actions: java.util.Iterator[String],
+      updatedActions: UpdatedActions): CommitResponse = 
recordOperation("commit") {
+    delegatingCommitCoordinatorClient.commit(
+      logStore,
+      hadoopConf,
+      tableDesc,
+      commitVersion,
+      actions,
+      updatedActions)
+  }
+
+  override def getCommits(
+      tableDesc: TableDescriptor,
+      startVersion: java.lang.Long,
+      endVersion: java.lang.Long): JGetCommitsResponse = 
recordOperation("getCommits") {
+    delegatingCommitCoordinatorClient.getCommits(tableDesc, startVersion, 
endVersion)
+  }
+
+  override def backfillToVersion(
+      logStore: LogStore,
+      hadoopConf: Configuration,
+      tableDesc: TableDescriptor,
+      version: Long,
+      lastKnownBackfilledVersion: java.lang.Long): Unit = 
recordOperation("backfillToVersion") {
+    delegatingCommitCoordinatorClient.backfillToVersion(
+      logStore,
+      hadoopConf,
+      tableDesc,
+      version,
+      lastKnownBackfilledVersion)
+  }
+
+  override def semanticEquals(other: JCommitCoordinatorClient): Boolean = {
+    other match {
+      case otherTracking: TrackingCommitCoordinatorClient =>
+        delegatingCommitCoordinatorClient.semanticEquals(
+          otherTracking.delegatingCommitCoordinatorClient)
+      case _ =>
+        delegatingCommitCoordinatorClient.semanticEquals(other)
+    }
+  }
+
+  def reset(): Unit = {
+    numCommitsCalled.set(0)
+    numGetCommitsCalled.set(0)
+    numBackfillToVersionCalled.set(0)
+  }
+
+  override def registerTable(
+      logPath: Path,
+      tableIdentifier: Optional[TableIdentifier],
+      currentVersion: Long,
+      currentMetadata: AbstractMetadata,
+      currentProtocol: AbstractProtocol): java.util.Map[String, String] =
+    recordOperation("registerTable") {
+      delegatingCommitCoordinatorClient.registerTable(
+        logPath,
+        tableIdentifier,
+        currentVersion,
+        currentMetadata,
+        currentProtocol)
+    }
+}
+
+/**
+ * A helper class which enables coordinated-commits for the test suite based 
on the given
+ * `coordinatedCommitsBackfillBatchSize` conf.
+ */
+trait CoordinatedCommitsBaseSuite
+  extends SparkFunSuite
+  with SharedSparkSession
+  with CoordinatedCommitsTestUtils {
+
+  // If this config is not overridden, coordinated commits are disabled.
+  def coordinatedCommitsBackfillBatchSize: Option[Int] = None
+
+  final def coordinatedCommitsEnabledInTests: Boolean = 
coordinatedCommitsBackfillBatchSize.nonEmpty
+
+  // Keeps track of the number of table names pointing to the location.
+  protected val locRefCount: mutable.Map[String, Int] = mutable.Map.empty
+
+  // In case some tests reuse the table path/name with DROP table, this method 
can be used to
+  // clean the table data in the commit coordinator. Note that we should call 
this before
+  // the table actually gets DROP.
+  def deleteTableFromCommitCoordinator(tableName: String): Unit = {
+    val cc = CommitCoordinatorProvider.getCommitCoordinatorClient(
+      defaultCommitsCoordinatorName,
+      defaultCommitsCoordinatorConf,
+      spark)
+    assert(
+      cc.isInstanceOf[TrackingCommitCoordinatorClient],
+      s"Please implement delete/drop method for coordinator: 
${cc.getClass.getName}")
+    val location =
+      try {
+        spark
+          .sql(s"describe detail $tableName")
+          .select("location")
+          .first()
+          .getAs[String](0)
+      } catch {
+        case NonFatal(_) =>
+          // Ignore if the table does not exist/broken.
+          return
+      }
+    val locKey = location.stripPrefix("file:")
+    if (locRefCount.contains(locKey)) {
+      locRefCount(locKey) -= 1
+    }
+    // When we create an external table in a location where some table already 
existed, two table
+    // names could be pointing to the same location. We should only clean up 
the table data in the
+    // commit coordinator when the last table name pointing to the location is 
dropped.
+    if (locRefCount.getOrElse(locKey, 0) == 0) {
+      val logPath = location + "/_delta_log"
+      cc.asInstanceOf[TrackingCommitCoordinatorClient]
+        .delegatingCommitCoordinatorClient
+        .asInstanceOf[InMemoryCommitCoordinator]
+        .dropTable(new Path(logPath))
+    }
+    DeltaLog.clearCache()
+  }
+
+  override protected def sparkConf: SparkConf = {
+    if (coordinatedCommitsBackfillBatchSize.nonEmpty) {
+      val coordinatedCommitsCoordinatorJson = 
JsonUtils.toJson(defaultCommitsCoordinatorConf)
+      super.sparkConf
+        .set(
+          
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey,
+          defaultCommitsCoordinatorName)
+        .set(
+          
DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_CONF.defaultTablePropertyKey,
+          coordinatedCommitsCoordinatorJson)
+    } else {
+      super.sparkConf
+    }
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    CommitCoordinatorProvider.clearNonDefaultBuilders()
+    coordinatedCommitsBackfillBatchSize.foreach {
+      batchSize =>
+        CommitCoordinatorProvider.registerBuilder(
+          TrackingInMemoryCommitCoordinatorBuilder(batchSize))
+    }
+    DeltaLog.clearCache()
+  }
+
+  protected def isICTEnabledForNewTables: Boolean = {
+    spark.conf
+      
.getOption(DeltaConfigs.COORDINATED_COMMITS_COORDINATOR_NAME.defaultTablePropertyKey)
+      .nonEmpty ||
+    spark.conf
+      
.getOption(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.defaultTablePropertyKey)
+      .contains("true")
+  }
+}
diff --git 
a/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala
 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala
new file mode 100644
index 0000000000..0f601043f6
--- /dev/null
+++ 
b/backends-velox/src-delta33/test/scala/org/apache/spark/sql/delta/skipping/ClusteredTableTestUtils.scala
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.delta.skipping
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.delta.{DeltaLog, Snapshot}
+import org.apache.spark.sql.delta.DeltaOperations
+import org.apache.spark.sql.delta.DeltaOperations.{CLUSTERING_PARAMETER_KEY, 
ZORDER_PARAMETER_KEY}
+import org.apache.spark.sql.delta.commands.optimize.OptimizeMetrics
+import 
org.apache.spark.sql.delta.coordinatedcommits.CoordinatedCommitsBaseSuite
+import org.apache.spark.sql.delta.hooks.UpdateCatalog
+import org.apache.spark.sql.delta.skipping.clustering.{ClusteredTableUtils, 
ClusteringColumn, ClusteringColumnInfo}
+import org.apache.spark.sql.delta.skipping.clustering.temp.ClusterBySpec
+import org.apache.spark.sql.delta.sources.DeltaSQLConf
+import org.apache.spark.sql.delta.util.JsonUtils
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
+
+import org.junit.Assert.assertEquals
+
+trait ClusteredTableTestUtilsBase
+  extends SparkFunSuite
+  with SharedSparkSession
+  with CoordinatedCommitsBaseSuite {
+  import testImplicits._
+
+  /**
+   * Helper for running optimize on the table with different APIs.
+   * @param table
+   *   the name of table
+   */
+  def optimizeTable(table: String): DataFrame = {
+    sql(s"OPTIMIZE $table")
+  }
+
+  /**
+   * Runs optimize on the table and calls postHook on the metrics.
+   * @param table
+   *   the name of table
+   * @param postHook
+   *   callback triggered with OptimizeMetrics returned by the OPTIMIZE command
+   */
+  def runOptimize(table: String)(postHook: OptimizeMetrics => Unit): Unit = {
+    // Verify Delta history operation parameters' clusterBy
+    val isPathBasedTable = table.startsWith("tahoe.") || 
table.startsWith("delta.")
+    var (deltaLog, snapshot) = if (isPathBasedTable) {
+      // Path based table e.g. delta.`path-to-directory` or 
tahoe.`path-to-directory`. Strip
+      // 6 characters to extract table path.
+      DeltaLog.forTableWithSnapshot(spark, table.drop(6).replace("`", ""))
+    } else {
+      DeltaLog.forTableWithSnapshot(spark, TableIdentifier(table))
+    }
+    val beforeVersion = snapshot.version
+
+    
postHook(optimizeTable(table).select($"metrics.*").as[OptimizeMetrics].head())
+    snapshot = deltaLog.update()
+    val afterVersion = snapshot.version
+
+    val shouldCheckFullStatus = 
deltaLog.history.getHistory(Some(1)).headOption.exists {
+      h =>
+        Seq(
+          DeltaOperations.OPTIMIZE_OPERATION_NAME
+        ).contains(h.operation)
+    }
+
+    // Note: Only expect isFull status when the table has non-empty clustering 
columns and
+    // clustering table feature, otherwise the OPTIMIZE will fall back to 
compaction and
+    // isFull status will not be relevant anymore.
+    val expectedOperationParameters = ClusteredTableUtils
+      .getClusteringColumnsOptional(snapshot)
+      .filter {
+        cols =>
+          cols.nonEmpty &&
+          shouldCheckFullStatus &&
+          ClusteredTableUtils.isSupported(snapshot.protocol) &&
+          afterVersion > beforeVersion
+      }
+      .map(_ => Map(DeltaOperations.CLUSTERING_IS_FULL_KEY -> false))
+      .getOrElse(Map.empty)
+    verifyDescribeHistoryOperationParameters(
+      table,
+      expectedOperationParameters = expectedOperationParameters)
+  }
+
+  /**
+   * Runs optimize full on the table and calls postHook on the metrics.
+   *
+   * @param table
+   *   the name of table
+   * @param postHook
+   *   callback triggered with OptimizeMetrics returned by the OPTIMIZE command
+   */
+  def runOptimizeFull(table: String)(postHook: OptimizeMetrics => Unit): Unit 
= {
+    postHook(sql(s"OPTIMIZE $table 
FULL").select($"metrics.*").as[OptimizeMetrics].head())
+
+    // Verify Delta history operation parameters' clusterBy
+    verifyDescribeHistoryOperationParameters(
+      table,
+      expectedOperationParameters = Map(DeltaOperations.CLUSTERING_IS_FULL_KEY 
-> true))
+  }
+
+  def verifyClusteringColumnsInDomainMetadata(
+      snapshot: Snapshot,
+      logicalColumnNames: Seq[String]): Unit = {
+    val expectedClusteringColumns = 
logicalColumnNames.map(ClusteringColumn(snapshot.schema, _))
+    val actualClusteringColumns =
+      ClusteredTableUtils.getClusteringColumnsOptional(snapshot).orNull
+    assert(expectedClusteringColumns == actualClusteringColumns)
+  }
+
+  // Verify the operation parameters of the last history event contains 
`clusterBy`.
+  protected def verifyDescribeHistoryOperationParameters(
+      table: String,
+      expectedOperationParameters: Map[String, Any] = Map.empty): Unit = {
+    val clusterBySupportedOperations = Set(
+      "CREATE TABLE",
+      "REPLACE TABLE",
+      "CREATE OR REPLACE TABLE",
+      "CREATE TABLE AS SELECT",
+      "REPLACE TABLE AS SELECT",
+      "CREATE OR REPLACE TABLE AS SELECT")
+
+    val isPathBasedTable = table.startsWith("tahoe.") || 
table.startsWith("delta.")
+    val (deltaLog, snapshot) = if (isPathBasedTable) {
+      // Path based table.
+      DeltaLog.forTableWithSnapshot(spark, table.drop(6).replace("`", ""))
+    } else {
+      DeltaLog.forTableWithSnapshot(spark, TableIdentifier(table))
+    }
+    val isClusteredTable = ClusteredTableUtils.isSupported(snapshot.protocol)
+    val clusteringColumns =
+      ClusteringColumnInfo.extractLogicalNames(snapshot)
+    val expectedClusterBy = JsonUtils.toJson(clusteringColumns)
+    val expectClustering = isClusteredTable && clusteringColumns.nonEmpty
+
+    val lastEvent = deltaLog.history.getHistory(Some(1)).head
+    val lastOperationParameters = lastEvent.operationParameters
+
+    def doAssert(assertion: => Boolean): Unit = {
+      val debugMsg = "verifyDescribeHistoryOperationParameters DEBUG: " +
+        "assert failed. Please check the expected behavior and " +
+        "add the operation to the appropriate case in " +
+        "verifyDescribeHistoryOperationParameters. " +
+        s"table: $table, lastOperation: ${lastEvent.operation} " +
+        s"lastOperationParameters: $lastOperationParameters " +
+        s"expectedOperationParameters: $expectedOperationParameters"
+      try {
+        assert(assertion, debugMsg)
+      } catch {
+        case e: Throwable =>
+          throw new Throwable(debugMsg, e)
+      }
+    }
+
+    // Check clusterBy exists and matches the expected clusterBy.
+    def assertClusterByExists(): Unit = {
+      doAssert(lastOperationParameters(CLUSTERING_PARAMETER_KEY) === 
expectedClusterBy)
+    }
+
+    // Check clusterBy does not exist or is empty.
+    def assertClusterByEmptyOrNotExists(): Unit = {
+      doAssert(
+        !lastOperationParameters.contains(CLUSTERING_PARAMETER_KEY) ||
+          lastOperationParameters(CLUSTERING_PARAMETER_KEY) === "[]")
+    }
+
+    // Check clusterBy does not exist.
+    def assertClusterByNotExist(): Unit = {
+      doAssert(!lastOperationParameters.contains(CLUSTERING_PARAMETER_KEY))
+    }
+
+    // Validate caller provided operator parameters from the last commit.
+    for ((operationParameterKey, value) <- expectedOperationParameters) {
+      // Convert value to string since value is stored as toString in 
operationParameters.
+      doAssert(lastOperationParameters(operationParameterKey) === 
value.toString)
+    }
+
+    // Check clusterBy
+    lastEvent.operation match {
+      case "CLUSTER BY" =>
+        // Operation is [[DeltaOperations.ClusterBy]] - ALTER TABLE CLUSTER BY
+        doAssert(
+          lastOperationParameters("newClusteringColumns") === 
clusteringColumns.mkString(",")
+        )
+      case "OPTIMIZE" =>
+        if (expectClustering) {
+          doAssert(lastOperationParameters(CLUSTERING_PARAMETER_KEY) === 
expectedClusterBy)
+          doAssert(lastOperationParameters(ZORDER_PARAMETER_KEY) === "[]")
+        } else {
+          // If the table clusters by NONE, OPTIMIZE will be a regular 
compaction.
+          // In this case, both clustering and z-order parameters should be 
empty.
+          doAssert(lastOperationParameters(CLUSTERING_PARAMETER_KEY) === "[]")
+          doAssert(lastOperationParameters(ZORDER_PARAMETER_KEY) === "[]")
+        }
+      case "CLONE" =>
+        // CLUSTER BY not in operation parameters for CLONE - similar to 
PARTITION BY.
+        doAssert(!lastOperationParameters.contains(CLUSTERING_PARAMETER_KEY))
+      case o if clusterBySupportedOperations.contains(o) =>
+        if (expectClustering) {
+          assertClusterByExists()
+        } else if (isClusteredTable && clusteringColumns.isEmpty) {
+          assertClusterByEmptyOrNotExists()
+        } else {
+          assertClusterByNotExist()
+        }
+      case "WRITE" | "RESTORE" =>
+        // These are known operations from our tests that don't have clusterBy.
+        doAssert(!lastOperationParameters.contains(CLUSTERING_PARAMETER_KEY))
+      case _ =>
+        // Other operations are not tested yet. If the test fails here, please 
check the expected
+        // behavior and add the operation to the appropriate case.
+        doAssert(false)
+    }
+  }
+
+  protected def deleteTableFromCommitCoordinatorIfNeeded(table: String): Unit 
= {
+    if (coordinatedCommitsEnabledInTests) {
+      // Clean up the table data in commit coordinator because DROP/REPLACE 
TABLE does not bother
+      // commit coordinator.
+      deleteTableFromCommitCoordinator(table)
+    }
+  }
+
+  override def withTable(tableNames: String*)(f: => Unit): Unit = {
+    Utils.tryWithSafeFinally(f) {
+      tableNames.foreach {
+        name =>
+          deleteTableFromCommitCoordinatorIfNeeded(name)
+          spark.sql(s"DROP TABLE IF EXISTS $name")
+      }
+    }
+  }
+
+  def withClusteredTable[T](
+      table: String,
+      schema: String,
+      clusterBy: String,
+      tableProperties: Map[String, String] = Map.empty,
+      location: Option[String] = None)(f: => T): T = {
+    createOrReplaceClusteredTable("CREATE", table, schema, clusterBy, 
tableProperties, location)
+
+    Utils.tryWithSafeFinally(f) {
+      deleteTableFromCommitCoordinatorIfNeeded(table)
+      spark.sql(s"DROP TABLE IF EXISTS $table")
+    }
+  }
+
+  /**
+   * Helper for creating or replacing table with different APIs.
+   * @param clause
+   *   clause for SQL API ('CREATE', 'REPLACE', 'CREATE OR REPLACE')
+   * @param table
+   *   the name of table
+   * @param schema
+   *   comma separated list of "colName dataType"
+   * @param clusterBy
+   *   comma separated list of clustering columns
+   */
+  def createOrReplaceClusteredTable(
+      clause: String,
+      table: String,
+      schema: String,
+      clusterBy: String,
+      tableProperties: Map[String, String] = Map.empty,
+      location: Option[String] = None): Unit = {
+    val locationClause = if (location.isEmpty) "" else s"LOCATION 
'${location.get}'"
+    val tablePropertiesClause = if (!tableProperties.isEmpty) {
+      val tablePropertiesString =
+        tableProperties.map { case (key, value) => s"'$key' = '$value'" 
}.mkString(",")
+      s"TBLPROPERTIES($tablePropertiesString)"
+    } else {
+      ""
+    }
+    spark.sql(
+      s"$clause TABLE $table ($schema) USING delta CLUSTER BY ($clusterBy) " +
+        s"$tablePropertiesClause $locationClause")
+    location.foreach(loc => locRefCount(loc) = locRefCount.getOrElse(loc, 0) + 
1)
+  }
+
+  protected def createOrReplaceAsSelectClusteredTable(
+      clause: String,
+      table: String,
+      srcTable: String,
+      clusterBy: String,
+      location: Option[String] = None): Unit = {
+    val locationClause = if (location.isEmpty) "" else s"LOCATION 
'${location.get}'"
+    spark.sql(
+      s"$clause TABLE $table USING delta CLUSTER BY ($clusterBy) " +
+        s"$locationClause AS SELECT * FROM $srcTable")
+  }
+
+  def verifyClusteringColumns(
+      tableIdentifier: TableIdentifier,
+      expectedLogicalClusteringColumns: Seq[String],
+      skipCatalogCheck: Boolean = false
+  ): Unit = {
+    val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, tableIdentifier)
+    verifyClusteringColumnsInternal(
+      snapshot,
+      tableIdentifier.table,
+      expectedLogicalClusteringColumns
+    )
+
+    if (skipCatalogCheck) {
+      return
+    }
+
+    val updateCatalogEnabled = 
spark.conf.get(DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED)
+    assert(
+      updateCatalogEnabled,
+      "need to enable [[DeltaSQLConf.DELTA_UPDATE_CATALOG_ENABLED]] to verify 
catalog updates.")
+    UpdateCatalog.awaitCompletion(10000)
+    val catalog = spark.sessionState.catalog
+    catalog.refreshTable(tableIdentifier)
+    val table = catalog.getTableMetadata(tableIdentifier)
+
+    // Verify CatalogTable's clusterBySpec.
+    assert(ClusteredTableUtils.getClusterBySpecOptional(table).isDefined)
+    assertEquals(
+      ClusterBySpec.fromColumnNames(expectedLogicalClusteringColumns),
+      ClusteredTableUtils.getClusterBySpecOptional(table).get)
+  }
+
+  def verifyClusteringColumns(
+      dataPath: String,
+      expectedLogicalClusteringColumns: Seq[String]): Unit = {
+    val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, dataPath)
+    verifyClusteringColumnsInternal(
+      snapshot,
+      s"delta.`$dataPath`",
+      expectedLogicalClusteringColumns
+    )
+  }
+
+  def verifyClusteringColumnsInternal(
+      snapshot: Snapshot,
+      tableNameOrPath: String,
+      expectedLogicalClusteringColumns: Seq[String]
+  ): Unit = {
+    assert(ClusteredTableUtils.isSupported(snapshot.protocol) === true)
+    verifyClusteringColumnsInDomainMetadata(snapshot, 
expectedLogicalClusteringColumns)
+
+    // Verify Delta history operation parameters' clusterBy
+    verifyDescribeHistoryOperationParameters(
+      tableNameOrPath
+    )
+
+    // Verify DESCRIBE DETAIL's properties doesn't contain the 
"clusteringColumns" key.
+    val describeDetailProps = sql(s"describe detail $tableNameOrPath")
+      .select("properties")
+      .first
+      .getAs[Map[String, String]](0)
+    
assert(!describeDetailProps.contains(ClusteredTableUtils.PROP_CLUSTERING_COLUMNS))
+
+    // Verify SHOW TBLPROPERTIES contains the correct clustering columns.
+    val clusteringColumnsVal =
+      sql(s"show tblproperties $tableNameOrPath")
+        .filter($"key" === ClusteredTableUtils.PROP_CLUSTERING_COLUMNS)
+        .select("value")
+        .first
+        .getString(0)
+    val clusterBySpec = ClusterBySpec
+      .fromProperties(Map(ClusteredTableUtils.PROP_CLUSTERING_COLUMNS -> 
clusteringColumnsVal))
+      .get
+    assert(expectedLogicalClusteringColumns === 
clusterBySpec.columnNames.map(_.toString))
+  }
+}
+
+trait ClusteredTableTestUtils extends ClusteredTableTestUtilsBase
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index bb2c9c6c38..10a7f6f3ea 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -370,7 +370,15 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
 
     val newShuffle = shuffle.outputPartitioning match {
       case HashPartitioning(exprs, _) =>
-        val hashExpr = new Murmur3Hash(exprs)
+        val hashExpr = if (exprs.isEmpty) {
+          // In Spark, a hash expression with empty input is not resolvable 
and an
+          // `WRONG_NUM_ARGS.WITHOUT_SUGGESTION` error will be reported when 
validating the project
+          // transformer. So we directly return the seed here, which is the 
intended hashed value
+          // for empty input given Spark's murmur3 hash logic.
+          Literal(new Murmur3Hash(Nil).seed, IntegerType)
+        } else {
+          new Murmur3Hash(exprs)
+        }
         val projectList = Seq(Alias(hashExpr, "hash_partition_key")()) ++ 
child.output
         val projectTransformer = ProjectExecTransformer(projectList, child)
         val validationResult = projectTransformer.doValidate()
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
index a770fdf7a3..134db93e70 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala
@@ -163,7 +163,9 @@ case class ColumnarCollapseTransformStages(glutenConf: 
GlutenConfig) extends Rul
   /** Inserts an InputIteratorTransformer on top of those that do not support 
transform. */
   private def insertInputIteratorTransformer(plan: SparkPlan): SparkPlan = {
     plan match {
-      case p if !supportTransform(p) =>
+      case p if p.isInstanceOf[WholeStageTransformer] || !supportTransform(p) 
=>
+        // TODO: if p.isInstanceOf[WholeStageTransformer], we can merge two 
whole stage
+        //  transformers.
         
ColumnarCollapseTransformStages.wrapInputIteratorTransformer(insertWholeStageTransformer(p))
       case p =>
         p.withNewChildren(p.children.map(insertInputIteratorTransformer))
@@ -172,6 +174,7 @@ case class ColumnarCollapseTransformStages(glutenConf: 
GlutenConfig) extends Rul
 
   private def insertWholeStageTransformer(plan: SparkPlan): SparkPlan = {
     plan match {
+      case wst: WholeStageTransformer => wst
       case t if supportTransform(t) =>
         // transformStageId will be updated by rule `GenerateTransformStageId`.
         
WholeStageTransformer(t.withNewChildren(t.children.map(insertInputIteratorTransformer)))(-1)
diff --git a/pom.xml b/pom.xml
index 54956693fb..b216855d84 100644
--- a/pom.xml
+++ b/pom.xml
@@ -78,6 +78,7 @@
     <delta.package.name>delta-spark</delta.package.name>
     <delta.version>3.3.2</delta.version>
     <delta.binary.version>33</delta.binary.version>
+    <aws-java-sdk-dynamodb.version>1.12.262</aws-java-sdk-dynamodb.version>
     <celeborn.version>0.6.1</celeborn.version>
     <uniffle.version>0.10.0</uniffle.version>
     <arrow.version>15.0.0</arrow.version>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to