This is an automated email from the ASF dual-hosted git repository.

hongze pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 55c065b688 [GLUTEN-3620][VL] Support Range operator for Velox Backend 
(#8161)
55c065b688 is described below

commit 55c065b688f21e76c88d2a893c5126cf5fb9b109
Author: Arnav Balyan <[email protected]>
AuthorDate: Thu Jan 23 17:26:51 2025 +0530

    [GLUTEN-3620][VL] Support Range operator for Velox Backend (#8161)
    
    Closes #3620
---
 .../clickhouse/CHSparkPlanExecApi.scala            |  11 ++
 .../gluten/backendsapi/velox/VeloxBackend.scala    |   3 +
 .../gluten/backendsapi/velox/VeloxRuleApi.scala    |   3 +-
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  |  11 ++
 .../gluten/execution/ColumnarRangeExec.scala       | 132 +++++++++++++++++++++
 .../gluten/backendsapi/BackendSettingsApi.scala    |  15 +++
 .../gluten/backendsapi/SparkPlanExecApi.scala      |  10 ++
 .../execution/RangeExecBaseTransformer.scala       |  69 +++++++++++
 .../columnar/offload/OffloadSingleNodeRules.scala  |  11 ++
 .../extension/columnar/validator/Validators.scala  |   1 +
 .../gluten/utils/velox/VeloxTestSettings.scala     |   2 +
 .../gluten/utils/velox/VeloxTestSettings.scala     |   3 +
 .../sql/execution/GlutenSQLRangeExecSuite.scala    |  82 +++++++++++++
 .../gluten/utils/velox/VeloxTestSettings.scala     |   2 +
 .../gluten/utils/velox/VeloxTestSettings.scala     |   2 +
 15 files changed, 356 insertions(+), 1 deletion(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 679ac4435c..26ffc126cc 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -931,4 +931,15 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
       limitExpr: ExpressionTransformer,
       original: StringSplit): ExpressionTransformer =
     CHStringSplitTransformer(substraitExprName, Seq(srcExpr, regexExpr, 
limitExpr), original)
+
+  override def genColumnarRangeExec(
+      start: Long,
+      end: Long,
+      step: Long,
+      numSlices: Int,
+      numElements: BigInt,
+      outputAttributes: Seq[Attribute],
+      child: Seq[SparkPlan]): ColumnarRangeBaseExec =
+    throw new GlutenNotSupportException("ColumnarRange is not supported in ch 
backend.")
+
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
index 677d8792c7..d893eea9e9 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala
@@ -576,4 +576,7 @@ object VeloxBackendSettings extends BackendSettingsApi {
   override def supportColumnarArrowUdf(): Boolean = true
 
   override def needPreComputeRangeFrameBoundary(): Boolean = true
+
+  override def supportRangeExec(): Boolean = true
+
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
index 9825ae1d31..14227dd7d0 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala
@@ -160,7 +160,8 @@ object VeloxRuleApi {
       RasOffload.from[LimitExec](OffloadOthers()),
       RasOffload.from[GenerateExec](OffloadOthers()),
       RasOffload.from[EvalPythonExec](OffloadOthers()),
-      RasOffload.from[SampleExec](OffloadOthers())
+      RasOffload.from[SampleExec](OffloadOthers()),
+      RasOffload.from[RangeExec](OffloadOthers())
     )
     offloads.foreach(
       offload =>
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 559882e8b9..c95a770bf8 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -838,4 +838,15 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
       attributeSeq: Seq[Attribute]): ExpressionTransformer = {
     VeloxHiveUDFTransformer.replaceWithExpressionTransformer(expr, 
attributeSeq)
   }
+
+  override def genColumnarRangeExec(
+      start: Long,
+      end: Long,
+      step: Long,
+      numSlices: Int,
+      numElements: BigInt,
+      outputAttributes: Seq[Attribute],
+      child: Seq[SparkPlan]): ColumnarRangeBaseExec =
+    ColumnarRangeExec(start, end, step, numSlices, numElements, 
outputAttributes, child)
+
 }
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
new file mode 100644
index 0000000000..3fb86a8aae
--- /dev/null
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.columnarbatch.ArrowBatches.ArrowJavaBatch
+import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.gluten.iterator.Iterators
+import org.apache.gluten.vectorized.ArrowWritableColumnVector
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
+
+/**
+ * ColumnarRangeExec is a concrete implementation of ColumnarRangeBaseExec 
that executes the Range
+ * operation and supports columnar processing. It generates columnar batches 
for the specified
+ * range.
+ *
+ * @param start
+ *   Starting value of the range.
+ * @param end
+ *   Ending value of the range.
+ * @param step
+ *   Step size for the range.
+ * @param numSlices
+ *   Number of slices for partitioning the range.
+ * @param numElements
+ *   Total number of elements in the range.
+ * @param outputAttributes
+ *   Attributes defining the output schema of the operator.
+ * @param child
+ *   Child SparkPlan nodes for this operator, if any.
+ */
+case class ColumnarRangeExec(
+    start: Long,
+    end: Long,
+    step: Long,
+    numSlices: Int,
+    numElements: BigInt,
+    outputAttributes: Seq[Attribute],
+    child: Seq[SparkPlan]
+) extends ColumnarRangeBaseExec(start, end, step, numSlices, numElements, 
outputAttributes, child) {
+
+  override def batchType(): Convention.BatchType = {
+    ArrowJavaBatch
+  }
+
+  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    if (start == end || (start < end ^ 0 < step)) {
+      sparkContext.emptyRDD[ColumnarBatch]
+    } else {
+      sparkContext
+        .parallelize(0 until numSlices, numSlices)
+        .mapPartitionsWithIndex {
+          (partitionIndex, _) =>
+            val batchSize = 1000
+            val safePartitionStart = (partitionIndex) * numElements / 
numSlices * step + start
+            val safePartitionEnd = (partitionIndex + 1) * numElements / 
numSlices * step + start
+
+            def getSafeMargin(value: BigInt): Long =
+              if (value.isValidLong) value.toLong
+              else if (value > 0) Long.MaxValue
+              else Long.MinValue
+
+            val partitionStart = getSafeMargin(safePartitionStart)
+            val partitionEnd = getSafeMargin(safePartitionEnd)
+
+            /**
+             * Generates the columnar batches for the specified range. Each 
batch contains a subset
+             * of the range values, managed using Arrow column vectors.
+             */
+            val iterator = new Iterator[ColumnarBatch] {
+              var current = safePartitionStart
+
+              override def hasNext: Boolean = {
+                if (step > 0) {
+                  current < safePartitionEnd
+                } else {
+                  current > safePartitionEnd
+                }
+              }
+
+              override def next(): ColumnarBatch = {
+                val numRows = math.min(
+                  ((safePartitionEnd - current) / step).toInt.max(1),
+                  batchSize
+                )
+
+                val vectors = 
ArrowWritableColumnVector.allocateColumns(numRows, schema)
+
+                for (i <- 0 until numRows) {
+                  val value = current + i * step
+                  vectors(0).putLong(i, getSafeMargin(value))
+                }
+                vectors.foreach(_.setValueCount(numRows))
+                current += numRows * step
+
+                val batch = new 
ColumnarBatch(vectors.asInstanceOf[Array[ColumnVector]], numRows)
+                batch
+              }
+            }
+            Iterators
+              .wrap(iterator)
+              .recyclePayload(
+                batch => {
+                  batch.close()
+                })
+              .create()
+
+        }
+    }
+  }
+
+  override protected def doExecute(): 
RDD[org.apache.spark.sql.catalyst.InternalRow] = {
+    throw new UnsupportedOperationException("doExecute is not supported for 
this operator")
+  }
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
index 04aa7fe7ca..d7ee6f7f76 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala
@@ -56,30 +56,41 @@ trait BackendSettingsApi {
       options: Map[String, String]): ValidationResult = 
ValidationResult.succeeded
 
   def supportNativeWrite(fields: Array[StructField]): Boolean = true
+
   def supportNativeMetadataColumns(): Boolean = false
+
   def supportNativeRowIndexColumn(): Boolean = false
 
   def supportExpandExec(): Boolean = false
+
   def supportSortExec(): Boolean = false
+
   def supportSortMergeJoinExec(): Boolean = true
+
   def supportWindowExec(windowFunctions: Seq[NamedExpression]): Boolean = {
     false
   }
+
   def supportWindowGroupLimitExec(rankLikeFunction: Expression): Boolean = {
     false
   }
+
   def supportColumnarShuffleExec(): Boolean = {
     GlutenConfig.get.enableColumnarShuffle
   }
+
   def enableJoinKeysRewrite(): Boolean = true
+
   def supportHashBuildJoinTypeOnLeft: JoinType => Boolean = {
     case _: InnerLike | RightOuter | FullOuter => true
     case _ => false
   }
+
   def supportHashBuildJoinTypeOnRight: JoinType => Boolean = {
     case _: InnerLike | LeftOuter | FullOuter | LeftSemi | LeftAnti | _: 
ExistenceJoin => true
     case _ => false
   }
+
   def supportStructType(): Boolean = false
 
   def structFieldToLowerCase(): Boolean = true
@@ -90,6 +101,7 @@ trait BackendSettingsApi {
   def recreateJoinExecOnFallback(): Boolean = false
 
   def excludeScanExecFromCollapsedStage(): Boolean = false
+
   def rescaleDecimalArithmetic: Boolean = false
 
   def allowDecimalArithmetic: Boolean = true
@@ -140,4 +152,7 @@ trait BackendSettingsApi {
   def supportColumnarArrowUdf(): Boolean = false
 
   def needPreComputeRangeFrameBoundary(): Boolean = false
+
+  def supportRangeExec(): Boolean = false
+
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 7432bc0af9..549e128a5f 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -694,4 +694,14 @@ trait SparkPlanExecApi {
       limitExpr: ExpressionTransformer,
       original: StringSplit): ExpressionTransformer =
     GenericExpressionTransformer(substraitExprName, Seq(srcExpr, regexExpr, 
limitExpr), original)
+
+  def genColumnarRangeExec(
+      start: Long,
+      end: Long,
+      step: Long,
+      numSlices: Int,
+      numElements: BigInt,
+      outputAttributes: Seq[Attribute],
+      child: Seq[SparkPlan]): ColumnarRangeBaseExec
+
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala
new file mode 100644
index 0000000000..4e3888a25c
--- /dev/null
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.execution
+
+import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.extension.columnar.transition.Convention
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.{LeafExecNode, RangeExec, SparkPlan}
+
+/**
+ * Base class for RangeExec transformation, can be implemented by the by 
supported backends.
+ * Currently velox is supported.
+ */
+abstract class ColumnarRangeBaseExec(
+    start: Long,
+    end: Long,
+    step: Long,
+    numSlices: Int,
+    numElements: BigInt,
+    outputAttributes: Seq[Attribute],
+    child: Seq[SparkPlan])
+  extends LeafExecNode
+  with ValidatablePlan {
+
+  override def output: Seq[Attribute] = {
+    outputAttributes
+  }
+
+  override def rowType0(): Convention.RowType = Convention.RowType.None
+
+  override protected def doExecute()
+      : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
+    throw new UnsupportedOperationException(s"This operator doesn't support 
doExecute().")
+  }
+}
+
+/**
+ * Companion object for ColumnarRangeBaseExec, provides factory methods to 
create instance from
+ * existing RangeExec plan.
+ */
+object ColumnarRangeBaseExec {
+  def from(rangeExec: RangeExec): ColumnarRangeBaseExec = {
+    BackendsApiManager.getSparkPlanExecApiInstance
+      .genColumnarRangeExec(
+        rangeExec.start,
+        rangeExec.end,
+        rangeExec.step,
+        rangeExec.numSlices,
+        rangeExec.numElements,
+        rangeExec.output,
+        rangeExec.children
+      )
+  }
+}
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index 8d0abcaa68..bfd8f59797 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -324,6 +324,17 @@ object OffloadOthers {
               child,
               plan.evalType)
           }
+        case plan: RangeExec =>
+          logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
+          BackendsApiManager.getSparkPlanExecApiInstance.genColumnarRangeExec(
+            plan.start,
+            plan.end,
+            plan.step,
+            plan.numSlices,
+            plan.numElements,
+            plan.output,
+            plan.children
+          )
         case plan: SampleExec =>
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
           val child = plan.child
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
index 509d7c02ae..0e0f5f444b 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala
@@ -136,6 +136,7 @@ object Validators {
         fail(p)
       case p: CartesianProductExec if !settings.supportCartesianProductExec() 
=> fail(p)
       case p: TakeOrderedAndProjectExec if 
!settings.supportColumnarShuffleExec() => fail(p)
+      case p: RangeExec if !settings.supportRangeExec() => fail(p)
       case _ => pass()
     }
   }
diff --git 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 62ab868363..b08f4300f8 100644
--- 
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -137,6 +137,8 @@ class VeloxTestSettings extends BackendTestSettings {
     )
     // Double precision loss: 
https://github.com/facebookincubator/velox/pull/6051#issuecomment-1731028215.
     .exclude("SPARK-22271: mean overflows and returns null for some decimal 
variables")
+    // Rewrite this test since it checks the physical operator which is 
changed in Gluten
+    .exclude("SPARK-27439: Explain result should match collected result after 
view change")
 
   enableSuite[GlutenDataFrameNaFunctionsSuite]
     .exclude(
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 72b77ae1f9..e3fc20503b 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -1036,6 +1036,8 @@ class VeloxTestSettings extends BackendTestSettings {
     // The describe issue is just fixed by 
https://github.com/apache/spark/pull/40914.
     // We can enable the below test for spark 3.4 and higher versions.
     .excludeGlutenTest("describe")
+    // Rewrite this test since it checks the physical operator which is 
changed in Gluten
+    .exclude("SPARK-27439: Explain result should match collected result after 
view change")
   enableSuite[GlutenDataFrameTimeWindowingSuite]
   enableSuite[GlutenDataFrameTungstenSuite]
   enableSuite[GlutenDataFrameWindowFunctionsSuite]
@@ -1177,6 +1179,7 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenImplicitsTest]
   enableSuite[GlutenCollapseProjectExecTransformerSuite]
   enableSuite[GlutenSparkSessionExtensionSuite]
+  enableSuite[GlutenSQLRangeExecSuite]
 
   override def getSQLQueryTestSettings: SQLQueryTestSettings = 
VeloxSQLQueryTestSettings
 }
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala
new file mode 100644
index 0000000000..6804d56b9e
--- /dev/null
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution
+
+import org.apache.gluten.execution.ColumnarRangeBaseExec
+
+import org.apache.spark.sql.{DataFrame, GlutenSQLTestsTrait, Row}
+import org.apache.spark.sql.functions.sum
+
+class GlutenSQLRangeExecSuite extends GlutenSQLTestsTrait {
+
+  private def assertGlutenOperatorMatch[T: reflect.ClassTag](df: DataFrame): 
Unit = {
+    val executedPlan = getExecutedPlan(df)
+    assert(
+      executedPlan.exists(plan => 
implicitly[reflect.ClassTag[T]].runtimeClass.isInstance(plan)),
+      s"Operator ${implicitly[reflect.ClassTag[T]].runtimeClass.getSimpleName} 
not found " +
+        s"in executed plan:\n $executedPlan"
+    )
+  }
+
+  testGluten("ColumnarRangeExec produces correct results") {
+    val df = spark.range(0, 10, 1).toDF("id")
+    val expectedData = (0L until 10L).map(Row(_)).toSeq
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarRangeBaseExec](df)
+  }
+
+  testGluten("ColumnarRangeExec with step") {
+    val df = spark.range(5, 15, 2).toDF("id")
+    val expectedData = Seq(5L, 7L, 9L, 11L, 13L).map(Row(_))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarRangeBaseExec](df)
+  }
+
+  testGluten("ColumnarRangeExec with filter") {
+    val df = spark.range(0, 20, 1).toDF("id").filter("id % 3 == 0")
+    val expectedData = Seq(0L, 3L, 6L, 9L, 12L, 15L, 18L).map(Row(_))
+
+    checkAnswer(df, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarRangeBaseExec](df)
+  }
+
+  testGluten("ColumnarRangeExec with aggregation") {
+    val df = spark.range(1, 6, 1).toDF("id")
+    val sumDf = df.agg(sum("id"))
+    val expectedData = Seq(Row(15L))
+
+    checkAnswer(sumDf, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarRangeBaseExec](df)
+  }
+
+  testGluten("ColumnarRangeExec with join") {
+    val df1 = spark.range(0, 5, 1).toDF("id1")
+    val df2 = spark.range(3, 8, 1).toDF("id2")
+    val joinDf = df1.join(df2, df1("id1") === df2("id2"))
+    val expectedData = Seq(Row(3L, 3L), Row(4L, 4L))
+
+    checkAnswer(joinDf, expectedData)
+
+    assertGlutenOperatorMatch[ColumnarRangeBaseExec](joinDf)
+  }
+}
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 94d3a1f6e8..06e8309baf 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -1066,6 +1066,8 @@ class VeloxTestSettings extends BackendTestSettings {
     )
     // test for sort node not present but gluten uses shuffle hash join
     .exclude("SPARK-41048: Improve output partitioning and ordering with AQE 
cache")
+    // Rewrite this test since it checks the physical operator which is 
changed in Gluten
+    .exclude("SPARK-27439: Explain result should match collected result after 
view change")
   enableSuite[GlutenDataFrameTimeWindowingSuite]
   enableSuite[GlutenDataFrameTungstenSuite]
   enableSuite[GlutenDataFrameWindowFunctionsSuite]
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 73c4d43ced..30dda10999 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -1088,6 +1088,8 @@ class VeloxTestSettings extends BackendTestSettings {
     )
     // test for sort node not present but gluten uses shuffle hash join
     .exclude("SPARK-41048: Improve output partitioning and ordering with AQE 
cache")
+    // Rewrite this test since it checks the physical operator which is 
changed in Gluten
+    .exclude("SPARK-27439: Explain result should match collected result after 
view change")
   enableSuite[GlutenDataFrameTimeWindowingSuite]
   enableSuite[GlutenDataFrameTungstenSuite]
   enableSuite[GlutenDataFrameWindowFunctionsSuite]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to