This is an automated email from the ASF dual-hosted git repository.
hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 55c065b688 [GLUTEN-3620][VL] Support Range operator for Velox Backend
(#8161)
55c065b688 is described below
commit 55c065b688f21e76c88d2a893c5126cf5fb9b109
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu Jan 23 17:26:51 2025 +0530
[GLUTEN-3620][VL] Support Range operator for Velox Backend (#8161)
Closes #3620
---
.../clickhouse/CHSparkPlanExecApi.scala | 11 ++
.../gluten/backendsapi/velox/VeloxBackend.scala | 3 +
.../gluten/backendsapi/velox/VeloxRuleApi.scala | 3 +-
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 11 ++
.../gluten/execution/ColumnarRangeExec.scala | 132 +++++++++++++++++++++
.../gluten/backendsapi/BackendSettingsApi.scala | 15 +++
.../gluten/backendsapi/SparkPlanExecApi.scala | 10 ++
.../execution/RangeExecBaseTransformer.scala | 69 +++++++++++
.../columnar/offload/OffloadSingleNodeRules.scala | 11 ++
.../extension/columnar/validator/Validators.scala | 1 +
.../gluten/utils/velox/VeloxTestSettings.scala | 2 +
.../gluten/utils/velox/VeloxTestSettings.scala | 3 +
.../sql/execution/GlutenSQLRangeExecSuite.scala | 82 +++++++++++++
.../gluten/utils/velox/VeloxTestSettings.scala | 2 +
.../gluten/utils/velox/VeloxTestSettings.scala | 2 +
15 files changed, 356 insertions(+), 1 deletion(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 679ac4435c..26ffc126cc 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -931,4 +931,15 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
limitExpr: ExpressionTransformer,
original: StringSplit): ExpressionTransformer =
CHStringSplitTransformer(substraitExprName, Seq(srcExpr, regexExpr,
limitExpr), original)
+
+ override def genColumnarRangeExec(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int,
+ numElements: BigInt,
+ outputAttributes: Seq[Attribute],
+ child: Seq[SparkPlan]): ColumnarRangeBaseExec =
+ throw new GlutenNotSupportException("ColumnarRange is not supported in ch
backend.")
+
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 677d8792c7..d893eea9e9 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -576,4 +576,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
override def supportColumnarArrowUdf(): Boolean = true
override def needPreComputeRangeFrameBoundary(): Boolean = true
+
+ override def supportRangeExec(): Boolean = true
+
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 9825ae1d31..14227dd7d0 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -160,7 +160,8 @@ object VeloxRuleApi {
RasOffload.from[LimitExec](OffloadOthers()),
RasOffload.from[GenerateExec](OffloadOthers()),
RasOffload.from[EvalPythonExec](OffloadOthers()),
- RasOffload.from[SampleExec](OffloadOthers())
+ RasOffload.from[SampleExec](OffloadOthers()),
+ RasOffload.from[RangeExec](OffloadOthers())
)
offloads.foreach(
offload =>
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 559882e8b9..c95a770bf8 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -838,4 +838,15 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
attributeSeq: Seq[Attribute]): ExpressionTransformer = {
VeloxHiveUDFTransformer.replaceWithExpressionTransformer(expr,
attributeSeq)
}
+
+ override def genColumnarRangeExec(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int,
+ numElements: BigInt,
+ outputAttributes: Seq[Attribute],
+ child: Seq[SparkPlan]): ColumnarRangeBaseExec =
+ ColumnarRangeExec(start, end, step, numSlices, numElements,
outputAttributes, child)
+
}
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
new file mode 100644
index 0000000000..3fb86a8aae
--- /dev/null
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.columnarbatch.ArrowBatches.ArrowJavaBatch
+import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.iterator.Iterators
+import org.apache.gluten.vectorized.ArrowWritableColumnVector
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * ColumnarRangeExec is a concrete implementation of ColumnarRangeBaseExec
that executes the Range
+ * operation and supports columnar processing. It generates columnar batches
for the specified
+ * range.
+ *
+ * @param start
+ * Starting value of the range.
+ * @param end
+ * Ending value of the range.
+ * @param step
+ * Step size for the range.
+ * @param numSlices
+ * Number of slices for partitioning the range.
+ * @param numElements
+ * Total number of elements in the range.
+ * @param outputAttributes
+ * Attributes defining the output schema of the operator.
+ * @param child
+ * Child SparkPlan nodes for this operator, if any.
+ */
+case class ColumnarRangeExec(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int,
+ numElements: BigInt,
+ outputAttributes: Seq[Attribute],
+ child: Seq[SparkPlan]
+) extends ColumnarRangeBaseExec(start, end, step, numSlices, numElements,
outputAttributes, child) {
+
+ override def batchType(): Convention.BatchType = {
+ ArrowJavaBatch
+ }
+
+ override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ if (start == end || (start < end ^ 0 < step)) {
+ sparkContext.emptyRDD[ColumnarBatch]
+ } else {
+ sparkContext
+ .parallelize(0 until numSlices, numSlices)
+ .mapPartitionsWithIndex {
+ (partitionIndex, _) =>
+ val batchSize = 1000
+ val safePartitionStart = (partitionIndex) * numElements /
numSlices * step + start
+ val safePartitionEnd = (partitionIndex + 1) * numElements /
numSlices * step + start
+
+ def getSafeMargin(value: BigInt): Long =
+ if (value.isValidLong) value.toLong
+ else if (value > 0) Long.MaxValue
+ else Long.MinValue
+
+ val partitionStart = getSafeMargin(safePartitionStart)
+ val partitionEnd = getSafeMargin(safePartitionEnd)
+
+ /**
+ * Generates the columnar batches for the specified range. Each
batch contains a subset
+ * of the range values, managed using Arrow column vectors.
+ */
+ val iterator = new Iterator[ColumnarBatch] {
+ var current = safePartitionStart
+
+ override def hasNext: Boolean = {
+ if (step > 0) {
+ current < safePartitionEnd
+ } else {
+ current > safePartitionEnd
+ }
+ }
+
+ override def next(): ColumnarBatch = {
+ val numRows = math.min(
+ ((safePartitionEnd - current) / step).toInt.max(1),
+ batchSize
+ )
+
+ val vectors =
ArrowWritableColumnVector.allocateColumns(numRows, schema)
+
+ for (i <- 0 until numRows) {
+ val value = current + i * step
+ vectors(0).putLong(i, getSafeMargin(value))
+ }
+ vectors.foreach(_.setValueCount(numRows))
+ current += numRows * step
+
+ val batch = new
ColumnarBatch(vectors.asInstanceOf[Array[ColumnVector]], numRows)
+ batch
+ }
+ }
+ Iterators
+ .wrap(iterator)
+ .recyclePayload(
+ batch => {
+ batch.close()
+ })
+ .create()
+
+ }
+ }
+ }
+
+ override protected def doExecute():
RDD[org.apache.spark.sql.catalyst.InternalRow] = {
+ throw new UnsupportedOperationException("doExecute is not supported for
this operator")
+ }
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 04aa7fe7ca..d7ee6f7f76 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -56,30 +56,41 @@ trait BackendSettingsApi {
options: Map[String, String]): ValidationResult =
ValidationResult.succeeded
def supportNativeWrite(fields: Array[StructField]): Boolean = true
+
def supportNativeMetadataColumns(): Boolean = false
+
def supportNativeRowIndexColumn(): Boolean = false
def supportExpandExec(): Boolean = false
+
def supportSortExec(): Boolean = false
+
def supportSortMergeJoinExec(): Boolean = true
+
def supportWindowExec(windowFunctions: Seq[NamedExpression]): Boolean = {
false
}
+
def supportWindowGroupLimitExec(rankLikeFunction: Expression): Boolean = {
false
}
+
def supportColumnarShuffleExec(): Boolean = {
GlutenConfig.get.enableColumnarShuffle
}
+
def enableJoinKeysRewrite(): Boolean = true
+
def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = {
case _: InnerLike | RightOuter | FullOuter => true
case _ => false
}
+
def supportHashBuildJoinTypeOnRight: JoinType => Boolean = {
case _: InnerLike | LeftOuter | FullOuter | LeftSemi | LeftAnti | _:
ExistenceJoin => true
case _ => false
}
+
def supportStructType(): Boolean = false
def structFieldToLowerCase(): Boolean = true
@@ -90,6 +101,7 @@ trait BackendSettingsApi {
def recreateJoinExecOnFallback(): Boolean = false
def excludeScanExecFromCollapsedStage(): Boolean = false
+
def rescaleDecimalArithmetic: Boolean = false
def allowDecimalArithmetic: Boolean = true
@@ -140,4 +152,7 @@ trait BackendSettingsApi {
def supportColumnarArrowUdf(): Boolean = false
def needPreComputeRangeFrameBoundary(): Boolean = false
+
+ def supportRangeExec(): Boolean = false
+
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 7432bc0af9..549e128a5f 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -694,4 +694,14 @@ trait SparkPlanExecApi {
limitExpr: ExpressionTransformer,
original: StringSplit): ExpressionTransformer =
GenericExpressionTransformer(substraitExprName, Seq(srcExpr, regexExpr,
limitExpr), original)
+
+ def genColumnarRangeExec(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int,
+ numElements: BigInt,
+ outputAttributes: Seq[Attribute],
+ child: Seq[SparkPlan]): ColumnarRangeBaseExec
+
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala
new file mode 100644
index 0000000000..4e3888a25c
--- /dev/null
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.columnar.transition.Convention
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.{LeafExecNode, RangeExec, SparkPlan}
+
+/**
+ * Base class for RangeExec transformation, can be implemented by the by
supported backends.
+ * Currently velox is supported.
+ */
+abstract class ColumnarRangeBaseExec(
+ start: Long,
+ end: Long,
+ step: Long,
+ numSlices: Int,
+ numElements: BigInt,
+ outputAttributes: Seq[Attribute],
+ child: Seq[SparkPlan])
+ extends LeafExecNode
+ with ValidatablePlan {
+
+ override def output: Seq[Attribute] = {
+ outputAttributes
+ }
+
+ override def rowType0(): Convention.RowType = Convention.RowType.None
+
+ override protected def doExecute()
+ : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
+ throw new UnsupportedOperationException(s"This operator doesn't support
doExecute().")
+ }
+}
+
+/**
+ * Companion object for ColumnarRangeBaseExec, provides factory methods to
create instance from
+ * existing RangeExec plan.
+ */
+object ColumnarRangeBaseExec {
+ def from(rangeExec: RangeExec): ColumnarRangeBaseExec = {
+ BackendsApiManager.getSparkPlanExecApiInstance
+ .genColumnarRangeExec(
+ rangeExec.start,
+ rangeExec.end,
+ rangeExec.step,
+ rangeExec.numSlices,
+ rangeExec.numElements,
+ rangeExec.output,
+ rangeExec.children
+ )
+ }
+}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index 8d0abcaa68..bfd8f59797 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -324,6 +324,17 @@ object OffloadOthers {
child,
plan.evalType)
}
+ case plan: RangeExec =>
+ logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
+ BackendsApiManager.getSparkPlanExecApiInstance.genColumnarRangeExec(
+ plan.start,
+ plan.end,
+ plan.step,
+ plan.numSlices,
+ plan.numElements,
+ plan.output,
+ plan.children
+ )
case plan: SampleExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
val child = plan.child
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 509d7c02ae..0e0f5f444b 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -136,6 +136,7 @@ object Validators {
fail(p)
case p: CartesianProductExec if !settings.supportCartesianProductExec()
=> fail(p)
case p: TakeOrderedAndProjectExec if
!settings.supportColumnarShuffleExec() => fail(p)
+ case p: RangeExec if !settings.supportRangeExec() => fail(p)
case _ => pass()
}
}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 62ab868363..b08f4300f8 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -137,6 +137,8 @@ class VeloxTestSettings extends BackendTestSettings {
)
// Double precision loss:
https://github.com/facebookincubator/velox/pull/6051#issuecomment-1731028215.
.exclude("SPARK-22271: mean overflows and returns null for some decimal
variables")
+ // Rewrite this test since it checks the physical operator which is
changed in Gluten
+ .exclude("SPARK-27439: Explain result should match collected result after
view change")
enableSuite[GlutenDataFrameNaFunctionsSuite]
.exclude(
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 72b77ae1f9..e3fc20503b 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -1036,6 +1036,8 @@ class VeloxTestSettings extends BackendTestSettings {
// The describe issue is just fixed by
https://github.com/apache/spark/pull/40914.
// We can enable the below test for spark 3.4 and higher versions.
.excludeGlutenTest("describe")
+ // Rewrite this test since it checks the physical operator which is
changed in Gluten
+ .exclude("SPARK-27439: Explain result should match collected result after
view change")
enableSuite[GlutenDataFrameTimeWindowingSuite]
enableSuite[GlutenDataFrameTungstenSuite]
enableSuite[GlutenDataFrameWindowFunctionsSuite]
@@ -1177,6 +1179,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenImplicitsTest]
enableSuite[GlutenCollapseProjectExecTransformerSuite]
enableSuite[GlutenSparkSessionExtensionSuite]
+ enableSuite[GlutenSQLRangeExecSuite]
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala
new file mode 100644
index 0000000000..6804d56b9e
--- /dev/null
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.execution.ColumnarRangeBaseExec
+
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row}
+import org.apache.spark.sql.functions.sum
+
+class GlutenSQLRangeExecSuite extends GlutenSQLTestsTrait {
+
+ private def assertGlutenOperatorMatch[T: reflect.ClassTag](df: DataFrame):
Unit = {
+ val executedPlan = getExecutedPlan(df)
+ assert(
+ executedPlan.exists(plan =>
implicitly[reflect.ClassTag[T]].runtimeClass.isInstance(plan)),
+ s"Operator ${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName}
not found " +
+ s"in executed plan:\n $executedPlan"
+ )
+ }
+
+ testGluten("ColumnarRangeExec produces correct results") {
+ val df = spark.range(0, 10, 1).toDF("id")
+ val expectedData = (0L until 10L).map(Row(_)).toSeq
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarRangeBaseExec](df)
+ }
+
+ testGluten("ColumnarRangeExec with step") {
+ val df = spark.range(5, 15, 2).toDF("id")
+ val expectedData = Seq(5L, 7L, 9L, 11L, 13L).map(Row(_))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarRangeBaseExec](df)
+ }
+
+ testGluten("ColumnarRangeExec with filter") {
+ val df = spark.range(0, 20, 1).toDF("id").filter("id % 3 == 0")
+ val expectedData = Seq(0L, 3L, 6L, 9L, 12L, 15L, 18L).map(Row(_))
+
+ checkAnswer(df, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarRangeBaseExec](df)
+ }
+
+ testGluten("ColumnarRangeExec with aggregation") {
+ val df = spark.range(1, 6, 1).toDF("id")
+ val sumDf = df.agg(sum("id"))
+ val expectedData = Seq(Row(15L))
+
+ checkAnswer(sumDf, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarRangeBaseExec](df)
+ }
+
+ testGluten("ColumnarRangeExec with join") {
+ val df1 = spark.range(0, 5, 1).toDF("id1")
+ val df2 = spark.range(3, 8, 1).toDF("id2")
+ val joinDf = df1.join(df2, df1("id1") === df2("id2"))
+ val expectedData = Seq(Row(3L, 3L), Row(4L, 4L))
+
+ checkAnswer(joinDf, expectedData)
+
+ assertGlutenOperatorMatch[ColumnarRangeBaseExec](joinDf)
+ }
+}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 94d3a1f6e8..06e8309baf 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -1066,6 +1066,8 @@ class VeloxTestSettings extends BackendTestSettings {
)
// test for sort node not present but gluten uses shuffle hash join
.exclude("SPARK-41048: Improve output partitioning and ordering with AQE
cache")
+ // Rewrite this test since it checks the physical operator which is
changed in Gluten
+ .exclude("SPARK-27439: Explain result should match collected result after
view change")
enableSuite[GlutenDataFrameTimeWindowingSuite]
enableSuite[GlutenDataFrameTungstenSuite]
enableSuite[GlutenDataFrameWindowFunctionsSuite]
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 73c4d43ced..30dda10999 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -1088,6 +1088,8 @@ class VeloxTestSettings extends BackendTestSettings {
)
// test for sort node not present but gluten uses shuffle hash join
.exclude("SPARK-41048: Improve output partitioning and ordering with AQE
cache")
+ // Rewrite this test since it checks the physical operator which is
changed in Gluten
+ .exclude("SPARK-27439: Explain result should match collected result after
view change")
enableSuite[GlutenDataFrameTimeWindowingSuite]
enableSuite[GlutenDataFrameTungstenSuite]
enableSuite[GlutenDataFrameWindowFunctionsSuite]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]