This is an automated email from the ASF dual-hosted git repository.

chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 8416ccf026 [GLUTEN-8565][VL] Support CollectLimit Operator (#8566)
8416ccf026 is described below

commit 8416ccf026f326e900126e8f2503d75c012c8bb6
Author: Arnav Balyan <[email protected]>
AuthorDate: Mon Mar 3 17:03:56 2025 +0530

    [GLUTEN-8565][VL] Support CollectLimit Operator (#8566)
---
 .../clickhouse/CHSparkPlanExecApi.scala            |   5 +
 .../VeloxColumnarBatchJniWrapper.java              |   2 +
 .../gluten/columnarbatch/VeloxColumnarBatches.java |  27 ++++
 .../gluten/backendsapi/velox/VeloxBackend.scala    |   3 +
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |   1 +
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  |   5 +
 .../execution/ColumnarCollectLimitExec.scala       | 154 +++++++++++++++++++++
 .../execution/GlutenSQLCollectLimitExecSuite.scala | 141 +++++++++++++++++++
 cpp/velox/jni/VeloxJniWrapper.cc                   |  32 +++++
 .../gluten/backendsapi/BackendSettingsApi.scala    |   3 +
 .../gluten/backendsapi/SparkPlanExecApi.scala      |   2 +
 .../execution/ColumnarCollectLimitBaseExec.scala   |  68 +++++++++
 .../columnar/offload/OffloadSingleNodeRules.scala  |   8 +-
 .../extension/columnar/validator/Validators.scala  |   1 +
 .../spark/sql/execution/GlutenImplicits.scala      |   4 +-
 .../gluten/utils/velox/VeloxTestSettings.scala     |   1 -
 .../gluten/utils/velox/VeloxTestSettings.scala     |   1 -
 .../org/apache/gluten/sql/shims/SparkShims.scala   |   2 +
 .../gluten/sql/shims/spark32/Spark32Shims.scala    |   3 +
 .../gluten/sql/shims/spark33/Spark33Shims.scala    |   2 +
 .../gluten/sql/shims/spark34/Spark34Shims.scala    |   3 +
 .../gluten/sql/shims/spark35/Spark35Shims.scala    |   3 +
 22 files changed, 466 insertions(+), 5 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 3572291a1d..067b5dc54e 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -933,6 +933,11 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
       original: StringSplit): ExpressionTransformer =
     CHStringSplitTransformer(substraitExprName, Seq(srcExpr, regexExpr, 
limitExpr), original)
 
+  override def genColumnarCollectLimitExec(
+      limit: Int,
+      child: SparkPlan): ColumnarCollectLimitBaseExec =
+    throw new GlutenNotSupportException("ColumnarCollectLimit is not supported 
in ch backend.")
+
   override def genColumnarRangeExec(
       start: Long,
       end: Long,
diff --git 
a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatchJniWrapper.java
 
b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatchJniWrapper.java
index 4a86012b55..2307cf2a57 100644
--- 
a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatchJniWrapper.java
+++ 
b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatchJniWrapper.java
@@ -34,6 +34,8 @@ public class VeloxColumnarBatchJniWrapper implements 
RuntimeAware {
 
   public native long compose(long[] batches);
 
+  public native long slice(long veloxBatchHandle, int offset, int limit);
+
   @Override
   public long rtHandle() {
     return runtime.getHandle();
diff --git 
a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
 
b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
index 6e37db885c..71f40f3424 100644
--- 
a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
+++ 
b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
@@ -114,4 +114,31 @@ public final class VeloxColumnarBatches {
     final long handle = 
VeloxColumnarBatchJniWrapper.create(runtime).compose(handles);
     return ColumnarBatches.create(handle);
   }
+
+  /**
+   * Returns a new ColumnarBatch that contains at most `limit` rows from the 
given batch.
+   *
+   * <p>If `limit >= batch.numRows()`, returns the original batch. Otherwise, 
copies up to `limit`
+   * rows into new column vectors.
+   *
+   * @param batch the original batch
+   * @param limit the maximum number of rows to include
+   * @return a new pruned [[ColumnarBatch]] with row count = `limit`, or the 
original batch if no
+   *     pruning is required
+   */
+  public static ColumnarBatch slice(ColumnarBatch batch, int offset, int 
limit) {
+    int totalRows = batch.numRows();
+    if (limit >= totalRows) {
+      // No need to prune
+      return batch;
+    } else {
+      Runtime runtime =
+          Runtimes.contextInstance(
+              BackendsApiManager.getBackendName(), 
"VeloxColumnarBatches#sliceBatch");
+      long nativeHandle =
+          ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName(), 
batch);
+      long handle = 
VeloxColumnarBatchJniWrapper.create(runtime).slice(nativeHandle, offset, limit);
+      return ColumnarBatches.create(handle);
+    }
+  }
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 25655c72db..0d13a4e9a6 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -563,4 +563,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
   override def supportColumnarArrowUdf(): Boolean = true
 
   override def needPreComputeRangeFrameBoundary(): Boolean = true
+
+  override def supportCollectLimitExec(): Boolean = true
+
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index acf50383e2..3b15fa2263 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -162,6 +162,7 @@ object VeloxRuleApi {
       RasOffload.from[GenerateExec](OffloadOthers()),
       RasOffload.from[EvalPythonExec](OffloadOthers()),
       RasOffload.from[SampleExec](OffloadOthers()),
+      RasOffload.from[CollectLimitExec](OffloadOthers()),
       RasOffload.from[RangeExec](OffloadOthers())
     )
     offloads.foreach(
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 6f0820620a..c7dafdd480 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -838,6 +838,11 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
     VeloxHiveUDFTransformer.replaceWithExpressionTransformer(expr, 
attributeSeq)
   }
 
+  override def genColumnarCollectLimitExec(
+      limit: Int,
+      child: SparkPlan): ColumnarCollectLimitBaseExec =
+    ColumnarCollectLimitExec(limit, child)
+
   override def genColumnarRangeExec(
       start: Long,
       end: Long,
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
new file mode 100644
index 0000000000..8c328430ed
--- /dev/null
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.columnarbatch.VeloxColumnarBatches
+import org.apache.gluten.extension.columnar.transition.Convention
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.execution.{ShuffledColumnarBatchRDD, SparkPlan}
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.metric.SQLColumnarShuffleReadMetricsReporter
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+case class ColumnarCollectLimitExec(
+    limit: Int,
+    child: SparkPlan
+) extends ColumnarCollectLimitBaseExec(limit, child) {
+
+  override def batchType(): Convention.BatchType =
+    BackendsApiManager.getSettings.primaryBatchType
+
+  /**
+   * Returns an iterator that yields up to `limit` rows in total from the 
input partitionIter.
+   * Either retain the entire batch if it fits within the remaining limit, or 
prune it if it
+   * partially exceeds the remaining limit.
+   */
+  private def collectLimitedRows(
+      partitionIter: Iterator[ColumnarBatch],
+      limit: Int
+  ): Iterator[ColumnarBatch] = {
+    if (partitionIter.isEmpty) {
+      return Iterator.empty
+    }
+    new Iterator[ColumnarBatch] {
+
+      private var rowsCollected = 0
+      private var nextBatch: Option[ColumnarBatch] = None
+
+      override def hasNext: Boolean = {
+        nextBatch.isDefined || fetchNext()
+      }
+
+      override def next(): ColumnarBatch = {
+        if (!hasNext) {
+          throw new NoSuchElementException("No more batches available.")
+        }
+        val batch = nextBatch.get
+        nextBatch = None
+        batch
+      }
+
+      /**
+       * Attempt to fetch the next batch from the underlying iterator if we 
haven't yet hit the
+       * limit. Returns true if we found a new batch, false otherwise.
+       */
+      private def fetchNext(): Boolean = {
+        if (rowsCollected >= limit || !partitionIter.hasNext) {
+          return false
+        }
+
+        val currentBatch = partitionIter.next()
+        val currentBatchRowCount = currentBatch.numRows()
+        val remaining = limit - rowsCollected
+
+        if (currentBatchRowCount <= remaining) {
+          rowsCollected += currentBatchRowCount
+          ColumnarBatches.retain(currentBatch)
+          nextBatch = Some(currentBatch)
+        } else {
+          val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0, 
remaining)
+          rowsCollected += remaining
+          nextBatch = Some(prunedBatch)
+        }
+        true
+      }
+    }
+  }
+
+  private lazy val writeMetrics =
+    SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+  private lazy val readMetrics =
+    
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+  private lazy val useSortBasedShuffle: Boolean =
+    BackendsApiManager.getSparkPlanExecApiInstance
+      .useSortBasedShuffle(outputPartitioning, child.output)
+
+  @transient private lazy val serializer: Serializer =
+    BackendsApiManager.getSparkPlanExecApiInstance
+      .createColumnarBatchSerializer(child.schema, metrics, 
useSortBasedShuffle)
+
+  @transient override lazy val metrics: Map[String, SQLMetric] =
+    BackendsApiManager.getMetricsApiInstance
+      .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+      readMetrics ++ writeMetrics
+
+  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    val childRDD = child.executeColumnar()
+
+    if (childRDD.getNumPartitions == 0) {
+      return sparkContext.parallelize(Seq.empty[ColumnarBatch], 1)
+    }
+
+    val processedRDD =
+      if (childRDD.getNumPartitions == 1) childRDD
+      else shuffleLimitedPartitions(childRDD)
+
+    processedRDD.mapPartitions(partition => collectLimitedRows(partition, 
limit))
+  }
+
+  private def shuffleLimitedPartitions(childRDD: RDD[ColumnarBatch]): 
RDD[ColumnarBatch] = {
+    val locallyLimited = childRDD.mapPartitions(partition => 
collectLimitedRows(partition, limit))
+    new ShuffledColumnarBatchRDD(
+      BackendsApiManager.getSparkPlanExecApiInstance.genShuffleDependency(
+        locallyLimited,
+        child.output,
+        child.output,
+        SinglePartition,
+        serializer,
+        writeMetrics,
+        metrics,
+        useSortBasedShuffle
+      ),
+      readMetrics
+    )
+  }
+
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+
+  override def output: Seq[Attribute] = child.output
+
+  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+    copy(child = newChild)
+}
diff --git 
a/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
 
b/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
new file mode 100644
index 0000000000..8af7b544e2
--- /dev/null
+++ 
b/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, Row}
+
+class GlutenSQLCollectLimitExecSuite extends WholeStageTransformerSuite {
+
+  override protected val resourcePath: String = "N/A"
+  override protected val fileFormat: String = "N/A"
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.shuffle.manager", 
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+  }
+
+  private def assertGlutenOperatorMatch[T: reflect.ClassTag](
+      df: DataFrame,
+      checkMatch: Boolean): Unit = {
+    val executedPlan = getExecutedPlan(df)
+
+    val operatorFound = executedPlan.exists {
+      plan =>
+        try {
+          implicitly[reflect.ClassTag[T]].runtimeClass.isInstance(plan)
+        } catch {
+          case _: Throwable => false
+        }
+    }
+
+    val assertionCondition = operatorFound == checkMatch
+    val assertionMessage =
+      if (checkMatch) {
+        s"Operator 
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} not found " +
+          s"in executed plan:\n $executedPlan"
+      } else {
+        s"Operator 
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} was found " +
+          s"in executed plan:\n $executedPlan"
+      }
+
+    assert(assertionCondition, assertionMessage)
+  }
+
+  testWithSpecifiedSparkVersion(
+    "ColumnarCollectLimitExec - basic limit test",
+    Array("3.2", "3.3")) {
+    val df = spark.range(0, 1000, 1).toDF("id").limit(5)
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with filter", 
Array("3.2", "3.3")) {
+    val df = spark
+      .range(0, 20, 1)
+      .toDF("id")
+      .filter("id % 2 == 0")
+      .limit(5)
+    val expectedData = Seq(Row(0L), Row(2L), Row(4L), Row(6L), Row(8L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  testWithSpecifiedSparkVersion(
+    "ColumnarCollectLimitExec - range with repartition",
+    Array("3.2", "3.3")) {
+
+    val df = spark
+      .range(0, 10, 1)
+      .toDF("id")
+      .repartition(3)
+      .limit(3)
+    val expectedData = Seq(Row(1L), Row(2L), Row(4L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  testWithSpecifiedSparkVersion(
+    "ColumnarCollectLimitExec - with distinct values",
+    Array("3.2", "3.3")) {
+    val df = spark
+      .range(0, 10, 1)
+      .toDF("id")
+      .select("id")
+      .distinct()
+      .limit(5)
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - chained limit", 
Array("3.2", "3.3")) {
+    val df = spark
+      .range(0, 10, 1)
+      .toDF("id")
+      .limit(8)
+      .limit(3)
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch = 
true)
+  }
+
+  testWithSpecifiedSparkVersion(
+    "ColumnarCollectLimitExec - limit after union",
+    Array("3.2", "3.3")) {
+    val df1 = spark.range(0, 5).toDF("id")
+    val df2 = spark.range(5, 10).toDF("id")
+    val unionDf = df1.union(df2).limit(3)
+
+    val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+    checkAnswer(unionDf, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf, 
checkMatch = true)
+  }
+}
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 9d6ad157ff..d29f5b0f7b 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -431,6 +431,38 @@ 
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_splitBlockByPartitio
   JNI_METHOD_END(nullptr)
 }
 
+JNIEXPORT jlong JNICALL 
Java_org_apache_gluten_columnarbatch_VeloxColumnarBatchJniWrapper_slice( // 
NOLINT
+    JNIEnv* env,
+    jobject wrapper,
+    jlong veloxBatchHandle,
+    jint offset,
+    jint limit) {
+  JNI_METHOD_START
+  auto ctx = getRuntime(env, wrapper);
+  auto batch = ObjectStore::retrieve<ColumnarBatch>(veloxBatchHandle);
+
+  auto numRows = batch->numRows();
+  if (limit >= numRows) {
+    return veloxBatchHandle;
+  }
+
+  auto veloxBatch = std::dynamic_pointer_cast<VeloxColumnarBatch>(batch);
+  VELOX_CHECK_NOT_NULL(veloxBatch, "Expected VeloxColumnarBatch but got a 
different type.");
+
+  auto rowVector = veloxBatch->getRowVector();
+  auto prunedVector = rowVector->slice(offset, limit);
+
+  auto prunedRowVector = 
std::dynamic_pointer_cast<facebook::velox::RowVector>(prunedVector);
+  VELOX_CHECK_NOT_NULL(prunedRowVector, "Expected RowVector but got a 
different type.");
+
+  auto prunedBatch = std::make_shared<VeloxColumnarBatch>(prunedRowVector);
+
+  jlong prunedHandle = ctx->saveObject(prunedBatch);
+  return prunedHandle;
+
+  JNI_METHOD_END(kInvalidObjectHandle)
+}
+
 #ifdef __cplusplus
 }
 #endif
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 5bedf021c4..84a4d04bfa 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -152,4 +152,7 @@ trait BackendSettingsApi {
   def supportColumnarArrowUdf(): Boolean = false
 
   def needPreComputeRangeFrameBoundary(): Boolean = false
+
+  def supportCollectLimitExec(): Boolean = false
+
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 02803bbf2b..2ebc965126 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -694,6 +694,8 @@ trait SparkPlanExecApi {
       original: StringSplit): ExpressionTransformer =
     GenericExpressionTransformer(substraitExprName, Seq(srcExpr, regexExpr, 
limitExpr), original)
 
+  def genColumnarCollectLimitExec(limit: Int, plan: SparkPlan): 
ColumnarCollectLimitBaseExec
+
   def genColumnarRangeExec(
       start: Long,
       end: Long,
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
new file mode 100644
index 0000000000..a860329cd8
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
SinglePartition}
+import org.apache.spark.sql.execution.{CollectLimitExec, LimitExec, SparkPlan}
+
+abstract class ColumnarCollectLimitBaseExec(
+    limit: Int,
+    childPlan: SparkPlan
+) extends LimitExec
+  with ValidatablePlan {
+
+  override def outputPartitioning: Partitioning = SinglePartition
+
+  override protected def doValidateInternal(): ValidationResult = {
+    val isSupported = BackendsApiManager.getSettings.supportCollectLimitExec()
+
+    if (!isSupported) {
+      return ValidationResult.failed(
+        s"CollectLimitExec is not supported by the current backend."
+      )
+    }
+
+    if (
+      (childPlan.supportsColumnar && GlutenConfig.get.enablePreferColumnar) &&
+      BackendsApiManager.getSettings.supportColumnarShuffleExec() &&
+      SparkShimLoader.getSparkShims.isColumnarLimitExecSupported()
+    ) {
+      return ValidationResult.succeeded
+    }
+    ValidationResult.failed("Columnar shuffle not enabled or child does not 
support columnar.")
+  }
+
+  override protected def doExecute()
+      : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
+    throw new UnsupportedOperationException(s"This operator doesn't support 
doExecute().")
+  }
+
+}
+object ColumnarCollectLimitBaseExec {
+  def from(collectLimitExec: CollectLimitExec): ColumnarCollectLimitBaseExec = 
{
+    BackendsApiManager.getSparkPlanExecApiInstance
+      .genColumnarCollectLimitExec(
+        collectLimitExec.limit,
+        collectLimitExec.child
+      )
+  }
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index bfd8f59797..f58cb05666 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -15,7 +15,6 @@
  * limitations under the License.
  */
 package org.apache.gluten.extension.columnar.offload
-
 import org.apache.gluten.backendsapi.BackendsApiManager
 import org.apache.gluten.config.GlutenConfig
 import org.apache.gluten.execution._
@@ -28,6 +27,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight, 
BuildSide}
 import org.apache.spark.sql.catalyst.plans.logical.Join
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.CollectLimitExec
 import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, 
ObjectHashAggregateExec, SortAggregateExec}
 import org.apache.spark.sql.execution.datasources.WriteFilesExec
 import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -344,6 +344,12 @@ object OffloadOthers {
             plan.withReplacement,
             plan.seed,
             child)
+        case plan: CollectLimitExec =>
+          logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
+          
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarCollectLimitExec(
+            plan.limit,
+            plan.child
+          )
         case p if !p.isInstanceOf[GlutenPlan] =>
           logDebug(s"Transformation for ${p.getClass} is currently not 
supported.")
           p
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 509d7c02ae..82a6a415c6 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -136,6 +136,7 @@ object Validators {
         fail(p)
       case p: CartesianProductExec if !settings.supportCartesianProductExec() 
=> fail(p)
       case p: TakeOrderedAndProjectExec if 
!settings.supportColumnarShuffleExec() => fail(p)
+      case p: CollectLimitExec if !settings.supportCollectLimitExec() => 
fail(p)
       case _ => pass()
     }
   }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
index 709673feab..1276647b3b 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
@@ -18,12 +18,13 @@ package org.apache.spark.sql.execution
 
 import org.apache.gluten.execution.{GlutenPlan, WholeStageTransformer}
 import org.apache.gluten.utils.PlanUtil
+
 import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
 import org.apache.spark.sql.execution.ColumnarWriteFilesExec.NoopLeaf
-import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, 
AdaptiveSparkPlanExec, QueryStageExec}
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, 
AQEShuffleReadExec, QueryStageExec}
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.command.{DataWritingCommandExec, 
ExecutedCommandExec}
 import org.apache.spark.sql.execution.datasources.WriteFilesExec
@@ -34,7 +35,6 @@ import org.apache.spark.sql.internal.SQLConf
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
-
 // format: off
 /**
  * A helper class to get the Gluten fallback summary from a Spark [[Dataset]].
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index b7dbff4fb6..d8c795f059 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -163,7 +163,6 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[VeloxAdaptiveQueryExecSuite]
     .includeAllGlutenTests()
     .includeByPrefix(
-      "SPARK-29906",
       "SPARK-30291",
       "SPARK-30403",
       "SPARK-30719",
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index b3f7c23ddf..4b382339b5 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -185,7 +185,6 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[VeloxAdaptiveQueryExecSuite]
     .includeAllGlutenTests()
     .includeByPrefix(
-      "SPARK-29906",
       "SPARK-30291",
       "SPARK-30403",
       "SPARK-30719",
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 2fe6dddeb3..f8c060bc18 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -295,4 +295,6 @@ trait SparkShims {
 
   def isParquetFileEncrypted(fileStatus: LocatedFileStatus, conf: 
Configuration): Boolean
 
+  def isColumnarLimitExecSupported(): Boolean
+
 }
diff --git 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 1f94b47e22..8289ffce0e 100644
--- 
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++ 
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -310,4 +310,7 @@ class Spark32Shims extends SparkShims {
         false
     }
   }
+
+  override def isColumnarLimitExecSupported(): Boolean = true
+
 }
diff --git 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 694c719c9b..c8627f5d5a 100644
--- 
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++ 
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -394,4 +394,6 @@ class Spark33Shims extends SparkShims {
     }
   }
 
+  override def isColumnarLimitExecSupported(): Boolean = true
+
 }
diff --git 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 5087b02ff8..31f4a4f13c 100644
--- 
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++ 
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -613,4 +613,7 @@ class Spark34Shims extends SparkShims {
         false
     }
   }
+
+  override def isColumnarLimitExecSupported(): Boolean = false
+
 }
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index a5898b0a96..e370e1204c 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -666,4 +666,7 @@ class Spark35Shims extends SparkShims {
       case e: Exception => false
     }
   }
+
+  override def isColumnarLimitExecSupported(): Boolean = false
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to