This is an automated email from the ASF dual-hosted git repository.
chengchengjin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 8416ccf026 [GLUTEN-8565][VL] Support CollectLimit Operator (#8566)
8416ccf026 is described below
commit 8416ccf026f326e900126e8f2503d75c012c8bb6
Author: Arnav Balyan <[email protected]>
AuthorDate: Mon Mar 3 17:03:56 2025 +0530
[GLUTEN-8565][VL] Support CollectLimit Operator (#8566)
---
.../clickhouse/CHSparkPlanExecApi.scala | 5 +
.../VeloxColumnarBatchJniWrapper.java | 2 +
.../gluten/columnarbatch/VeloxColumnarBatches.java | 27 ++++
.../gluten/backendsapi/velox/VeloxBackend.scala | 3 +
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 1 +
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 5 +
.../execution/ColumnarCollectLimitExec.scala | 154 +++++++++++++++++++++
.../execution/GlutenSQLCollectLimitExecSuite.scala | 141 +++++++++++++++++++
cpp/velox/jni/VeloxJniWrapper.cc | 32 +++++
.../gluten/backendsapi/BackendSettingsApi.scala | 3 +
.../gluten/backendsapi/SparkPlanExecApi.scala | 2 +
.../execution/ColumnarCollectLimitBaseExec.scala | 68 +++++++++
.../columnar/offload/OffloadSingleNodeRules.scala | 8 +-
.../extension/columnar/validator/Validators.scala | 1 +
.../spark/sql/execution/GlutenImplicits.scala | 4 +-
.../gluten/utils/velox/VeloxTestSettings.scala | 1 -
.../gluten/utils/velox/VeloxTestSettings.scala | 1 -
.../org/apache/gluten/sql/shims/SparkShims.scala | 2 +
.../gluten/sql/shims/spark32/Spark32Shims.scala | 3 +
.../gluten/sql/shims/spark33/Spark33Shims.scala | 2 +
.../gluten/sql/shims/spark34/Spark34Shims.scala | 3 +
.../gluten/sql/shims/spark35/Spark35Shims.scala | 3 +
22 files changed, 466 insertions(+), 5 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 3572291a1d..067b5dc54e 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -933,6 +933,11 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
original: StringSplit): ExpressionTransformer =
CHStringSplitTransformer(substraitExprName, Seq(srcExpr, regexExpr,
limitExpr), original)
+ override def genColumnarCollectLimitExec(
+ limit: Int,
+ child: SparkPlan): ColumnarCollectLimitBaseExec =
+ throw new GlutenNotSupportException("ColumnarCollectLimit is not supported
in ch backend.")
+
override def genColumnarRangeExec(
start: Long,
end: Long,
diff --git
a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatchJniWrapper.java
b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatchJniWrapper.java
index 4a86012b55..2307cf2a57 100644
---
a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatchJniWrapper.java
+++
b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatchJniWrapper.java
@@ -34,6 +34,8 @@ public class VeloxColumnarBatchJniWrapper implements
RuntimeAware {
public native long compose(long[] batches);
+ public native long slice(long veloxBatchHandle, int offset, int limit);
+
@Override
public long rtHandle() {
return runtime.getHandle();
diff --git
a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
index 6e37db885c..71f40f3424 100644
---
a/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
+++
b/backends-velox/src/main/java/org/apache/gluten/columnarbatch/VeloxColumnarBatches.java
@@ -114,4 +114,31 @@ public final class VeloxColumnarBatches {
final long handle =
VeloxColumnarBatchJniWrapper.create(runtime).compose(handles);
return ColumnarBatches.create(handle);
}
+
+ /**
+ * Returns a new ColumnarBatch that contains at most `limit` rows from the
given batch.
+ *
+ * <p>If `limit >= batch.numRows()`, returns the original batch. Otherwise,
copies up to `limit`
+ * rows into new column vectors.
+ *
+ * @param batch the original batch
+ * @param limit the maximum number of rows to include
+ * @return a new pruned [[ColumnarBatch]] with row count = `limit`, or the
original batch if no
+ * pruning is required
+ */
+ public static ColumnarBatch slice(ColumnarBatch batch, int offset, int
limit) {
+ int totalRows = batch.numRows();
+ if (limit >= totalRows) {
+ // No need to prune
+ return batch;
+ } else {
+ Runtime runtime =
+ Runtimes.contextInstance(
+ BackendsApiManager.getBackendName(),
"VeloxColumnarBatches#sliceBatch");
+ long nativeHandle =
+ ColumnarBatches.getNativeHandle(BackendsApiManager.getBackendName(),
batch);
+ long handle =
VeloxColumnarBatchJniWrapper.create(runtime).slice(nativeHandle, offset, limit);
+ return ColumnarBatches.create(handle);
+ }
+ }
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 25655c72db..0d13a4e9a6 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -563,4 +563,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def supportColumnarArrowUdf(): Boolean = true
override def needPreComputeRangeFrameBoundary(): Boolean = true
+
+ override def supportCollectLimitExec(): Boolean = true
+
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index acf50383e2..3b15fa2263 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -162,6 +162,7 @@ object VeloxRuleApi {
RasOffload.from[GenerateExec](OffloadOthers()),
RasOffload.from[EvalPythonExec](OffloadOthers()),
RasOffload.from[SampleExec](OffloadOthers()),
+ RasOffload.from[CollectLimitExec](OffloadOthers()),
RasOffload.from[RangeExec](OffloadOthers())
)
offloads.foreach(
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 6f0820620a..c7dafdd480 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -838,6 +838,11 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
VeloxHiveUDFTransformer.replaceWithExpressionTransformer(expr,
attributeSeq)
}
+ override def genColumnarCollectLimitExec(
+ limit: Int,
+ child: SparkPlan): ColumnarCollectLimitBaseExec =
+ ColumnarCollectLimitExec(limit, child)
+
override def genColumnarRangeExec(
start: Long,
end: Long,
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
new file mode 100644
index 0000000000..8c328430ed
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitExec.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.columnarbatch.ColumnarBatches
+import org.apache.gluten.columnarbatch.VeloxColumnarBatches
+import org.apache.gluten.extension.columnar.transition.Convention
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.physical.SinglePartition
+import org.apache.spark.sql.execution.{ShuffledColumnarBatchRDD, SparkPlan}
+import org.apache.spark.sql.execution.metric.{SQLMetric,
SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.metric.SQLColumnarShuffleReadMetricsReporter
+import org.apache.spark.sql.vectorized.ColumnarBatch
+
+case class ColumnarCollectLimitExec(
+ limit: Int,
+ child: SparkPlan
+) extends ColumnarCollectLimitBaseExec(limit, child) {
+
+ override def batchType(): Convention.BatchType =
+ BackendsApiManager.getSettings.primaryBatchType
+
+ /**
+ * Returns an iterator that yields up to `limit` rows in total from the
input partitionIter.
+ * Either retain the entire batch if it fits within the remaining limit, or
prune it if it
+ * partially exceeds the remaining limit.
+ */
+ private def collectLimitedRows(
+ partitionIter: Iterator[ColumnarBatch],
+ limit: Int
+ ): Iterator[ColumnarBatch] = {
+ if (partitionIter.isEmpty) {
+ return Iterator.empty
+ }
+ new Iterator[ColumnarBatch] {
+
+ private var rowsCollected = 0
+ private var nextBatch: Option[ColumnarBatch] = None
+
+ override def hasNext: Boolean = {
+ nextBatch.isDefined || fetchNext()
+ }
+
+ override def next(): ColumnarBatch = {
+ if (!hasNext) {
+ throw new NoSuchElementException("No more batches available.")
+ }
+ val batch = nextBatch.get
+ nextBatch = None
+ batch
+ }
+
+ /**
+ * Attempt to fetch the next batch from the underlying iterator if we
haven't yet hit the
+ * limit. Returns true if we found a new batch, false otherwise.
+ */
+ private def fetchNext(): Boolean = {
+ if (rowsCollected >= limit || !partitionIter.hasNext) {
+ return false
+ }
+
+ val currentBatch = partitionIter.next()
+ val currentBatchRowCount = currentBatch.numRows()
+ val remaining = limit - rowsCollected
+
+ if (currentBatchRowCount <= remaining) {
+ rowsCollected += currentBatchRowCount
+ ColumnarBatches.retain(currentBatch)
+ nextBatch = Some(currentBatch)
+ } else {
+ val prunedBatch = VeloxColumnarBatches.slice(currentBatch, 0,
remaining)
+ rowsCollected += remaining
+ nextBatch = Some(prunedBatch)
+ }
+ true
+ }
+ }
+ }
+
+ private lazy val writeMetrics =
+ SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
+
+ private lazy val readMetrics =
+
SQLColumnarShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
+
+ private lazy val useSortBasedShuffle: Boolean =
+ BackendsApiManager.getSparkPlanExecApiInstance
+ .useSortBasedShuffle(outputPartitioning, child.output)
+
+ @transient private lazy val serializer: Serializer =
+ BackendsApiManager.getSparkPlanExecApiInstance
+ .createColumnarBatchSerializer(child.schema, metrics,
useSortBasedShuffle)
+
+ @transient override lazy val metrics: Map[String, SQLMetric] =
+ BackendsApiManager.getMetricsApiInstance
+ .genColumnarShuffleExchangeMetrics(sparkContext, useSortBasedShuffle) ++
+ readMetrics ++ writeMetrics
+
+ override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ val childRDD = child.executeColumnar()
+
+ if (childRDD.getNumPartitions == 0) {
+ return sparkContext.parallelize(Seq.empty[ColumnarBatch], 1)
+ }
+
+ val processedRDD =
+ if (childRDD.getNumPartitions == 1) childRDD
+ else shuffleLimitedPartitions(childRDD)
+
+ processedRDD.mapPartitions(partition => collectLimitedRows(partition,
limit))
+ }
+
+ private def shuffleLimitedPartitions(childRDD: RDD[ColumnarBatch]):
RDD[ColumnarBatch] = {
+ val locallyLimited = childRDD.mapPartitions(partition =>
collectLimitedRows(partition, limit))
+ new ShuffledColumnarBatchRDD(
+ BackendsApiManager.getSparkPlanExecApiInstance.genShuffleDependency(
+ locallyLimited,
+ child.output,
+ child.output,
+ SinglePartition,
+ serializer,
+ writeMetrics,
+ metrics,
+ useSortBasedShuffle
+ ),
+ readMetrics
+ )
+ }
+
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+
+ override def output: Seq[Attribute] = child.output
+
+ override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
+ copy(child = newChild)
+}
diff --git
a/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
b/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
new file mode 100644
index 0000000000..8af7b544e2
--- /dev/null
+++
b/backends-velox/src/test/scala/org/apache/gluten/execution/GlutenSQLCollectLimitExecSuite.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{DataFrame, Row}
+
+class GlutenSQLCollectLimitExecSuite extends WholeStageTransformerSuite {
+
+ override protected val resourcePath: String = "N/A"
+ override protected val fileFormat: String = "N/A"
+
+ override protected def sparkConf: SparkConf = {
+ super.sparkConf
+ .set("spark.shuffle.manager",
"org.apache.spark.shuffle.sort.ColumnarShuffleManager")
+ }
+
+ private def assertGlutenOperatorMatch[T: reflect.ClassTag](
+ df: DataFrame,
+ checkMatch: Boolean): Unit = {
+ val executedPlan = getExecutedPlan(df)
+
+ val operatorFound = executedPlan.exists {
+ plan =>
+ try {
+ implicitly[reflect.ClassTag[T]].runtimeClass.isInstance(plan)
+ } catch {
+ case _: Throwable => false
+ }
+ }
+
+ val assertionCondition = operatorFound == checkMatch
+ val assertionMessage =
+ if (checkMatch) {
+ s"Operator
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} not found " +
+ s"in executed plan:\n $executedPlan"
+ } else {
+ s"Operator
${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} was found " +
+ s"in executed plan:\n $executedPlan"
+ }
+
+ assert(assertionCondition, assertionMessage)
+ }
+
+ testWithSpecifiedSparkVersion(
+ "ColumnarCollectLimitExec - basic limit test",
+ Array("3.2", "3.3")) {
+ val df = spark.range(0, 1000, 1).toDF("id").limit(5)
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - with filter",
Array("3.2", "3.3")) {
+ val df = spark
+ .range(0, 20, 1)
+ .toDF("id")
+ .filter("id % 2 == 0")
+ .limit(5)
+ val expectedData = Seq(Row(0L), Row(2L), Row(4L), Row(6L), Row(8L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ testWithSpecifiedSparkVersion(
+ "ColumnarCollectLimitExec - range with repartition",
+ Array("3.2", "3.3")) {
+
+ val df = spark
+ .range(0, 10, 1)
+ .toDF("id")
+ .repartition(3)
+ .limit(3)
+ val expectedData = Seq(Row(1L), Row(2L), Row(4L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ testWithSpecifiedSparkVersion(
+ "ColumnarCollectLimitExec - with distinct values",
+ Array("3.2", "3.3")) {
+ val df = spark
+ .range(0, 10, 1)
+ .toDF("id")
+ .select("id")
+ .distinct()
+ .limit(5)
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L), Row(3L), Row(4L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ testWithSpecifiedSparkVersion("ColumnarCollectLimitExec - chained limit",
Array("3.2", "3.3")) {
+ val df = spark
+ .range(0, 10, 1)
+ .toDF("id")
+ .limit(8)
+ .limit(3)
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](df, checkMatch =
true)
+ }
+
+ testWithSpecifiedSparkVersion(
+ "ColumnarCollectLimitExec - limit after union",
+ Array("3.2", "3.3")) {
+ val df1 = spark.range(0, 5).toDF("id")
+ val df2 = spark.range(5, 10).toDF("id")
+ val unionDf = df1.union(df2).limit(3)
+
+ val expectedData = Seq(Row(0L), Row(1L), Row(2L))
+
+ checkAnswer(unionDf, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarCollectLimitBaseExec](unionDf,
checkMatch = true)
+ }
+}
diff --git a/cpp/velox/jni/VeloxJniWrapper.cc b/cpp/velox/jni/VeloxJniWrapper.cc
index 9d6ad157ff..d29f5b0f7b 100644
--- a/cpp/velox/jni/VeloxJniWrapper.cc
+++ b/cpp/velox/jni/VeloxJniWrapper.cc
@@ -431,6 +431,38 @@
Java_org_apache_gluten_datasource_VeloxDataSourceJniWrapper_splitBlockByPartitio
JNI_METHOD_END(nullptr)
}
+JNIEXPORT jlong JNICALL
Java_org_apache_gluten_columnarbatch_VeloxColumnarBatchJniWrapper_slice( //
NOLINT
+ JNIEnv* env,
+ jobject wrapper,
+ jlong veloxBatchHandle,
+ jint offset,
+ jint limit) {
+ JNI_METHOD_START
+ auto ctx = getRuntime(env, wrapper);
+ auto batch = ObjectStore::retrieve<ColumnarBatch>(veloxBatchHandle);
+
+ auto numRows = batch->numRows();
+ if (limit >= numRows) {
+ return veloxBatchHandle;
+ }
+
+ auto veloxBatch = std::dynamic_pointer_cast<VeloxColumnarBatch>(batch);
+ VELOX_CHECK_NOT_NULL(veloxBatch, "Expected VeloxColumnarBatch but got a
different type.");
+
+ auto rowVector = veloxBatch->getRowVector();
+ auto prunedVector = rowVector->slice(offset, limit);
+
+ auto prunedRowVector =
std::dynamic_pointer_cast<facebook::velox::RowVector>(prunedVector);
+ VELOX_CHECK_NOT_NULL(prunedRowVector, "Expected RowVector but got a
different type.");
+
+ auto prunedBatch = std::make_shared<VeloxColumnarBatch>(prunedRowVector);
+
+ jlong prunedHandle = ctx->saveObject(prunedBatch);
+ return prunedHandle;
+
+ JNI_METHOD_END(kInvalidObjectHandle)
+}
+
#ifdef __cplusplus
}
#endif
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 5bedf021c4..84a4d04bfa 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -152,4 +152,7 @@ trait BackendSettingsApi {
def supportColumnarArrowUdf(): Boolean = false
def needPreComputeRangeFrameBoundary(): Boolean = false
+
+ def supportCollectLimitExec(): Boolean = false
+
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 02803bbf2b..2ebc965126 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -694,6 +694,8 @@ trait SparkPlanExecApi {
original: StringSplit): ExpressionTransformer =
GenericExpressionTransformer(substraitExprName, Seq(srcExpr, regexExpr,
limitExpr), original)
+ def genColumnarCollectLimitExec(limit: Int, plan: SparkPlan):
ColumnarCollectLimitBaseExec
+
def genColumnarRangeExec(
start: Long,
end: Long,
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
new file mode 100644
index 0000000000..a860329cd8
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/ColumnarCollectLimitBaseExec.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.config.GlutenConfig
+import org.apache.gluten.extension.ValidationResult
+import org.apache.gluten.sql.shims.SparkShimLoader
+
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning,
SinglePartition}
+import org.apache.spark.sql.execution.{CollectLimitExec, LimitExec, SparkPlan}
+
+abstract class ColumnarCollectLimitBaseExec(
+ limit: Int,
+ childPlan: SparkPlan
+) extends LimitExec
+ with ValidatablePlan {
+
+ override def outputPartitioning: Partitioning = SinglePartition
+
+ override protected def doValidateInternal(): ValidationResult = {
+ val isSupported = BackendsApiManager.getSettings.supportCollectLimitExec()
+
+ if (!isSupported) {
+ return ValidationResult.failed(
+ s"CollectLimitExec is not supported by the current backend."
+ )
+ }
+
+ if (
+ (childPlan.supportsColumnar && GlutenConfig.get.enablePreferColumnar) &&
+ BackendsApiManager.getSettings.supportColumnarShuffleExec() &&
+ SparkShimLoader.getSparkShims.isColumnarLimitExecSupported()
+ ) {
+ return ValidationResult.succeeded
+ }
+ ValidationResult.failed("Columnar shuffle not enabled or child does not
support columnar.")
+ }
+
+ override protected def doExecute()
+ : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
+ throw new UnsupportedOperationException(s"This operator doesn't support
doExecute().")
+ }
+
+}
+object ColumnarCollectLimitBaseExec {
+ def from(collectLimitExec: CollectLimitExec): ColumnarCollectLimitBaseExec =
{
+ BackendsApiManager.getSparkPlanExecApiInstance
+ .genColumnarCollectLimitExec(
+ collectLimitExec.limit,
+ collectLimitExec.child
+ )
+ }
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index bfd8f59797..f58cb05666 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -15,7 +15,6 @@
* limitations under the License.
*/
package org.apache.gluten.extension.columnar.offload
-
import org.apache.gluten.backendsapi.BackendsApiManager
import org.apache.gluten.config.GlutenConfig
import org.apache.gluten.execution._
@@ -28,6 +27,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight,
BuildSide}
import org.apache.spark.sql.catalyst.plans.logical.Join
import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.CollectLimitExec
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec,
ObjectHashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.datasources.WriteFilesExec
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
@@ -344,6 +344,12 @@ object OffloadOthers {
plan.withReplacement,
plan.seed,
child)
+ case plan: CollectLimitExec =>
+ logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
+
BackendsApiManager.getSparkPlanExecApiInstance.genColumnarCollectLimitExec(
+ plan.limit,
+ plan.child
+ )
case p if !p.isInstanceOf[GlutenPlan] =>
logDebug(s"Transformation for ${p.getClass} is currently not
supported.")
p
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 509d7c02ae..82a6a415c6 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -136,6 +136,7 @@ object Validators {
fail(p)
case p: CartesianProductExec if !settings.supportCartesianProductExec()
=> fail(p)
case p: TakeOrderedAndProjectExec if
!settings.supportColumnarShuffleExec() => fail(p)
+ case p: CollectLimitExec if !settings.supportCollectLimitExec() =>
fail(p)
case _ => pass()
}
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
index 709673feab..1276647b3b 100644
---
a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenImplicits.scala
@@ -18,12 +18,13 @@ package org.apache.spark.sql.execution
import org.apache.gluten.execution.{GlutenPlan, WholeStageTransformer}
import org.apache.gluten.utils.PlanUtil
+
import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{CommandResult, LogicalPlan}
import org.apache.spark.sql.catalyst.util.StringUtils.PlanStringConcat
import org.apache.spark.sql.execution.ColumnarWriteFilesExec.NoopLeaf
-import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec,
AdaptiveSparkPlanExec, QueryStageExec}
+import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec,
AQEShuffleReadExec, QueryStageExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.command.{DataWritingCommandExec,
ExecutedCommandExec}
import org.apache.spark.sql.execution.datasources.WriteFilesExec
@@ -34,7 +35,6 @@ import org.apache.spark.sql.internal.SQLConf
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
-
// format: off
/**
* A helper class to get the Gluten fallback summary from a Spark [[Dataset]].
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index b7dbff4fb6..d8c795f059 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -163,7 +163,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[VeloxAdaptiveQueryExecSuite]
.includeAllGlutenTests()
.includeByPrefix(
- "SPARK-29906",
"SPARK-30291",
"SPARK-30403",
"SPARK-30719",
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index b3f7c23ddf..4b382339b5 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -185,7 +185,6 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[VeloxAdaptiveQueryExecSuite]
.includeAllGlutenTests()
.includeByPrefix(
- "SPARK-29906",
"SPARK-30291",
"SPARK-30403",
"SPARK-30719",
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 2fe6dddeb3..f8c060bc18 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -295,4 +295,6 @@ trait SparkShims {
def isParquetFileEncrypted(fileStatus: LocatedFileStatus, conf:
Configuration): Boolean
+ def isColumnarLimitExecSupported(): Boolean
+
}
diff --git
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
index 1f94b47e22..8289ffce0e 100644
---
a/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
+++
b/shims/spark32/src/main/scala/org/apache/gluten/sql/shims/spark32/Spark32Shims.scala
@@ -310,4 +310,7 @@ class Spark32Shims extends SparkShims {
false
}
}
+
+ override def isColumnarLimitExecSupported(): Boolean = true
+
}
diff --git
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
index 694c719c9b..c8627f5d5a 100644
---
a/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
+++
b/shims/spark33/src/main/scala/org/apache/gluten/sql/shims/spark33/Spark33Shims.scala
@@ -394,4 +394,6 @@ class Spark33Shims extends SparkShims {
}
}
+ override def isColumnarLimitExecSupported(): Boolean = true
+
}
diff --git
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
index 5087b02ff8..31f4a4f13c 100644
---
a/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
+++
b/shims/spark34/src/main/scala/org/apache/gluten/sql/shims/spark34/Spark34Shims.scala
@@ -613,4 +613,7 @@ class Spark34Shims extends SparkShims {
false
}
}
+
+ override def isColumnarLimitExecSupported(): Boolean = false
+
}
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index a5898b0a96..e370e1204c 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -666,4 +666,7 @@ class Spark35Shims extends SparkShims {
case e: Exception => false
}
}
+
+ override def isColumnarLimitExecSupported(): Boolean = false
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]