This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new bf97aa6530 [GLUTEN-10668] Refactor to share `WindowGroupLimitExecShim` 
across different Spark shims (#10669)
bf97aa6530 is described below

commit bf97aa6530d627b30861143c75578d8d90a765e1
Author: Jiaan Geng <[email protected]>
AuthorDate: Mon Sep 15 15:04:12 2025 +0800

    [GLUTEN-10668] Refactor to share `WindowGroupLimitExecShim` across 
different Spark shims (#10669)
---
 .../clickhouse/CHSparkPlanExecApi.scala            |  2 +-
 .../CHWindowGroupLimitExecTransformer.scala        |  8 ++--
 .../gluten/backendsapi/SparkPlanExecApi.scala      |  2 +-
 .../WindowGroupLimitExecTransformer.scala          |  8 ++--
 .../columnar/offload/OffloadSingleNodeRules.scala  | 19 +++++----
 .../columnar/rewrite/PullOutPreProject.scala       | 44 ++++++++++----------
 .../org/apache/gluten/sql/shims/SparkShims.scala   |  5 ++-
 .../window/WindowGroupLimitExecShim.scala          |  8 ++--
 .../window/WindowGroupLimitExecShim.scala          | 47 ----------------------
 .../window/WindowGroupLimitExecShim.scala          | 47 ----------------------
 .../gluten/sql/shims/spark35/Spark35Shims.scala    | 18 ++++++---
 .../window/WindowGroupLimitExecShim.scala          | 41 -------------------
 .../gluten/sql/shims/spark40/Spark40Shims.scala    | 18 ++++++---
 .../window/WindowGroupLimitExecShim.scala          | 41 -------------------
 14 files changed, 74 insertions(+), 234 deletions(-)

diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 44d58e4747..1291b4d257 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -938,7 +938,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
       orderSpec: Seq[SortOrder],
       rankLikeFunction: Expression,
       limit: Int,
-      mode: WindowGroupLimitMode,
+      mode: GlutenWindowGroupLimitMode,
       child: SparkPlan): SparkPlan =
     CHWindowGroupLimitExecTransformer(
       partitionSpec,
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHWindowGroupLimitExecTransformer.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHWindowGroupLimitExecTransformer.scala
index 5b5c3c21a6..7b1fbc20cf 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHWindowGroupLimitExecTransformer.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHWindowGroupLimitExecTransformer.scala
@@ -29,7 +29,7 @@ import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.window.{Final, Partial, 
WindowGroupLimitMode}
+import org.apache.spark.sql.execution.window.{GlutenFinal, GlutenPartial, 
GlutenWindowGroupLimitMode}
 
 import com.google.protobuf.StringValue
 import io.substrait.proto.SortField
@@ -41,7 +41,7 @@ case class CHWindowGroupLimitExecTransformer(
     orderSpec: Seq[SortOrder],
     rankLikeFunction: Expression,
     limit: Int,
-    mode: WindowGroupLimitMode,
+    mode: GlutenWindowGroupLimitMode,
     child: SparkPlan)
   extends UnaryTransformSupport {
 
@@ -57,8 +57,8 @@ case class CHWindowGroupLimitExecTransformer(
   override def output: Seq[Attribute] = child.output
 
   override def requiredChildDistribution: Seq[Distribution] = mode match {
-    case Partial => super.requiredChildDistribution
-    case Final =>
+    case GlutenPartial => super.requiredChildDistribution
+    case GlutenFinal =>
       if (partitionSpec.isEmpty) {
         AllTuples :: Nil
       } else {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index addfd1800a..fc53cbf6a3 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -730,7 +730,7 @@ trait SparkPlanExecApi {
       orderSpec: Seq[SortOrder],
       rankLikeFunction: Expression,
       limit: Int,
-      mode: WindowGroupLimitMode,
+      mode: GlutenWindowGroupLimitMode,
       child: SparkPlan): SparkPlan =
     WindowGroupLimitExecTransformer(partitionSpec, orderSpec, 
rankLikeFunction, limit, mode, child)
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
index 6ba0365d37..282e1b8e71 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
@@ -25,7 +25,7 @@ import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
 import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, 
Expression, SortOrder}
 import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, Distribution, Partitioning}
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.window.{Final, Partial, 
WindowGroupLimitMode}
+import org.apache.spark.sql.execution.window.{GlutenFinal, GlutenPartial, 
GlutenWindowGroupLimitMode}
 
 import io.substrait.proto.SortField
 
@@ -36,7 +36,7 @@ case class WindowGroupLimitExecTransformer(
     orderSpec: Seq[SortOrder],
     rankLikeFunction: Expression,
     limit: Int,
-    mode: WindowGroupLimitMode,
+    mode: GlutenWindowGroupLimitMode,
     child: SparkPlan)
   extends UnaryTransformSupport {
 
@@ -52,8 +52,8 @@ case class WindowGroupLimitExecTransformer(
   override def output: Seq[Attribute] = child.output
 
   override def requiredChildDistribution: Seq[Distribution] = mode match {
-    case Partial => super.requiredChildDistribution
-    case Final =>
+    case GlutenPartial => super.requiredChildDistribution
+    case GlutenFinal =>
       if (partitionSpec.isEmpty) {
         AllTuples :: Nil
       } else {
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index d9e6399235..93ee65c255 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -33,7 +33,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, 
ShuffleExchangeExec}
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec, 
BatchEvalPythonExec, EvalPythonExecTransformer}
-import org.apache.spark.sql.execution.window.{WindowExec, 
WindowGroupLimitExecShim}
+import org.apache.spark.sql.execution.window.WindowExec
 import org.apache.spark.sql.hive.HiveTableScanExecTransformer
 
 // Exchange transformation.
@@ -270,16 +270,15 @@ object OffloadOthers {
             plan.orderSpec,
             plan.child)
         case plan if 
SparkShimLoader.getSparkShims.isWindowGroupLimitExec(plan) =>
-          val windowGroupLimitPlan = SparkShimLoader.getSparkShims
-            .getWindowGroupLimitExecShim(plan)
-            .asInstanceOf[WindowGroupLimitExecShim]
+          val windowGroupLimitExecShim =
+            SparkShimLoader.getSparkShims.getWindowGroupLimitExecShim(plan)
           
BackendsApiManager.getSparkPlanExecApiInstance.genWindowGroupLimitTransformer(
-            windowGroupLimitPlan.partitionSpec,
-            windowGroupLimitPlan.orderSpec,
-            windowGroupLimitPlan.rankLikeFunction,
-            windowGroupLimitPlan.limit,
-            windowGroupLimitPlan.mode,
-            windowGroupLimitPlan.child
+            windowGroupLimitExecShim.partitionSpec,
+            windowGroupLimitExecShim.orderSpec,
+            windowGroupLimitExecShim.rankLikeFunction,
+            windowGroupLimitExecShim.limit,
+            windowGroupLimitExecShim.mode,
+            windowGroupLimitExecShim.child
           )
         case plan: GlobalLimitExec =>
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala
index a3c6669a2c..13c14bffc0 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala
@@ -25,7 +25,7 @@ import 
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec, 
TypedAggregateExpression}
 import org.apache.spark.sql.execution.python.ArrowEvalPythonExec
-import org.apache.spark.sql.execution.window.{WindowExec, 
WindowGroupLimitExecShim}
+import org.apache.spark.sql.execution.window.WindowExec
 
 import scala.collection.mutable
 
@@ -91,11 +91,10 @@ object PullOutPreProject extends RewriteSingleNode with 
PullOutProjectHelper {
         }.isDefined) ||
         windowNeedPreComputeRangeFrame(window)
       case plan if SparkShimLoader.getSparkShims.isWindowGroupLimitExec(plan) 
=>
-        val window = SparkShimLoader.getSparkShims
-          .getWindowGroupLimitExecShim(plan)
-          .asInstanceOf[WindowGroupLimitExecShim]
-        window.orderSpec.exists(o => isNotAttribute(o.child)) ||
-        window.partitionSpec.exists(isNotAttribute)
+        val windowGroupLimitExecShim =
+          SparkShimLoader.getSparkShims.getWindowGroupLimitExecShim(plan)
+        windowGroupLimitExecShim.orderSpec.exists(o => 
isNotAttribute(o.child)) ||
+        windowGroupLimitExecShim.partitionSpec.exists(isNotAttribute)
       case expand: ExpandExec => 
expand.projections.flatten.exists(isNotAttributeAndLiteral)
       case _ => false
     }
@@ -223,36 +222,37 @@ object PullOutPreProject extends RewriteSingleNode with 
PullOutProjectHelper {
       newProject
 
     case plan
-        if SparkShimLoader.getSparkShims.isWindowGroupLimitExec(plan) && 
needsPreProject(plan) =>
-      val windowLimit = SparkShimLoader.getSparkShims
-        .getWindowGroupLimitExecShim(plan)
-        .asInstanceOf[WindowGroupLimitExecShim]
+        if SparkShimLoader.getSparkShims.isWindowGroupLimitExec(plan) &&
+          needsPreProject(plan) =>
+      val windowGroupLimitExecShim =
+        SparkShimLoader.getSparkShims.getWindowGroupLimitExecShim(plan)
       val expressionMap = new mutable.HashMap[Expression, NamedExpression]()
       // Handle orderSpec.
-      val newOrderSpec = getNewSortOrder(windowLimit.orderSpec, expressionMap)
+      val newOrderSpec = getNewSortOrder(windowGroupLimitExecShim.orderSpec, 
expressionMap)
 
       // Handle partitionSpec.
       val newPartitionSpec =
-        
windowLimit.partitionSpec.toIndexedSeq.map(replaceExpressionWithAttribute(_, 
expressionMap))
+        windowGroupLimitExecShim.partitionSpec.toIndexedSeq.map(
+          replaceExpressionWithAttribute(_, expressionMap))
 
       val newChild = ProjectExec(
-        eliminateProjectList(windowLimit.child.outputSet, 
expressionMap.values.toSeq),
-        windowLimit.child)
-      windowLimit.child.logicalLink.foreach(newChild.setLogicalLink)
+        eliminateProjectList(windowGroupLimitExecShim.child.outputSet, 
expressionMap.values.toSeq),
+        windowGroupLimitExecShim.child)
+      
windowGroupLimitExecShim.child.logicalLink.foreach(newChild.setLogicalLink)
 
-      val newWindowLimitShim = windowLimit.copy(
+      val newWindowGroupLimitExecShim = windowGroupLimitExecShim.copy(
         orderSpec = newOrderSpec,
         partitionSpec = newPartitionSpec,
         child = newChild
       )
-      newWindowLimitShim.copyTagsFrom(windowLimit)
+      newWindowGroupLimitExecShim.copyTagsFrom(windowGroupLimitExecShim)
 
-      val newWindowLimit = SparkShimLoader.getSparkShims
-        .getWindowGroupLimitExec(newWindowLimitShim)
-      newWindowLimit.copyTagsFrom(newWindowLimitShim)
+      val newWindowGroupLimitExec =
+        
SparkShimLoader.getSparkShims.getWindowGroupLimitExec(newWindowGroupLimitExecShim)
+      newWindowGroupLimitExec.copyTagsFrom(newWindowGroupLimitExecShim)
 
-      val newProject = ProjectExec(plan.output, newWindowLimit)
-      newWindowLimit.logicalLink.foreach(newProject.setLogicalLink)
+      val newProject = ProjectExec(plan.output, newWindowGroupLimitExec)
+      newWindowGroupLimitExec.logicalLink.foreach(newProject.setLogicalLink)
       newProject
 
     case expand: ExpandExec if needsPreProject(expand) =>
diff --git 
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala 
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 570e3cfef5..3a59135c80 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -43,6 +43,7 @@ import 
org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, 
DataSourceV2ScanExecBase}
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, 
ShuffleExchangeLike}
+import org.apache.spark.sql.execution.window.WindowGroupLimitExecShim
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DecimalType, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -151,9 +152,9 @@ trait SparkShims {
 
   def isWindowGroupLimitExec(plan: SparkPlan): Boolean = false
 
-  def getWindowGroupLimitExecShim(plan: SparkPlan): SparkPlan = null
+  def getWindowGroupLimitExecShim(plan: SparkPlan): WindowGroupLimitExecShim = 
null
 
-  def getWindowGroupLimitExec(windowGroupLimitPlan: SparkPlan): SparkPlan = 
null
+  def getWindowGroupLimitExec(windowGroupLimitExecShim: 
WindowGroupLimitExecShim): SparkPlan = null
 
   def getLimitAndOffsetFromGlobalLimit(plan: GlobalLimitExec): (Int, Int) = 
(plan.limit, 0)
 
diff --git 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
 
b/shims/common/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
similarity index 89%
rename from 
shims/spark32/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
rename to 
shims/common/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
index 6aa2e5fb81..662a32251c 100644
--- 
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
+++ 
b/shims/common/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
@@ -21,18 +21,18 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
SortOrder}
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
 
-sealed trait WindowGroupLimitMode
+sealed trait GlutenWindowGroupLimitMode
 
-case object Partial extends WindowGroupLimitMode
+case object GlutenPartial extends GlutenWindowGroupLimitMode
 
-case object Final extends WindowGroupLimitMode
+case object GlutenFinal extends GlutenWindowGroupLimitMode
 
 case class WindowGroupLimitExecShim(
     partitionSpec: Seq[Expression],
     orderSpec: Seq[SortOrder],
     rankLikeFunction: Expression,
     limit: Int,
-    mode: WindowGroupLimitMode,
+    mode: GlutenWindowGroupLimitMode,
     child: SparkPlan)
   extends UnaryExecNode {
   override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
diff --git 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
 
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
deleted file mode 100644
index 6aa2e5fb81..0000000000
--- 
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.window
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
SortOrder}
-import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
-
-sealed trait WindowGroupLimitMode
-
-case object Partial extends WindowGroupLimitMode
-
-case object Final extends WindowGroupLimitMode
-
-case class WindowGroupLimitExecShim(
-    partitionSpec: Seq[Expression],
-    orderSpec: Seq[SortOrder],
-    rankLikeFunction: Expression,
-    limit: Int,
-    mode: WindowGroupLimitMode,
-    child: SparkPlan)
-  extends UnaryExecNode {
-  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
-    copy(child = newChild)
-
-  override protected def doExecute(): RDD[InternalRow] = {
-    throw new UnsupportedOperationException(
-      s"${this.getClass.getSimpleName} doesn't support doExecute")
-  }
-
-  override def output: Seq[Attribute] = child.output
-}
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
 
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
deleted file mode 100644
index 6aa2e5fb81..0000000000
--- 
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.window
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
SortOrder}
-import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
-
-sealed trait WindowGroupLimitMode
-
-case object Partial extends WindowGroupLimitMode
-
-case object Final extends WindowGroupLimitMode
-
-case class WindowGroupLimitExecShim(
-    partitionSpec: Seq[Expression],
-    orderSpec: Seq[SortOrder],
-    rankLikeFunction: Expression,
-    limit: Int,
-    mode: WindowGroupLimitMode,
-    child: SparkPlan)
-  extends UnaryExecNode {
-  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
-    copy(child = newChild)
-
-  override protected def doExecute(): RDD[InternalRow] = {
-    throw new UnsupportedOperationException(
-      s"${this.getClass.getSimpleName} doesn't support doExecute")
-  }
-
-  override def output: Seq[Attribute] = child.output
-}
diff --git 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index e48a401722..58759949f7 100644
--- 
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++ 
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -51,7 +51,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
 import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, 
ShuffleExchangeLike}
-import org.apache.spark.sql.execution.window.{WindowGroupLimitExec, 
WindowGroupLimitExecShim}
+import org.apache.spark.sql.execution.window.{Final, GlutenFinal, 
GlutenPartial, Partial, WindowGroupLimitExec, WindowGroupLimitExecShim}
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.types.{DecimalType, IntegerType, LongType, 
StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -304,24 +304,32 @@ class Spark35Shims extends SparkShims {
 
   override def getWindowGroupLimitExecShim(plan: SparkPlan): 
WindowGroupLimitExecShim = {
     val windowGroupLimitPlan = plan.asInstanceOf[WindowGroupLimitExec]
+    val mode = windowGroupLimitPlan.mode match {
+      case Partial => GlutenPartial
+      case Final => GlutenFinal
+    }
     WindowGroupLimitExecShim(
       windowGroupLimitPlan.partitionSpec,
       windowGroupLimitPlan.orderSpec,
       windowGroupLimitPlan.rankLikeFunction,
       windowGroupLimitPlan.limit,
-      windowGroupLimitPlan.mode,
+      mode,
       windowGroupLimitPlan.child
     )
   }
 
-  override def getWindowGroupLimitExec(windowGroupLimitPlan: SparkPlan): 
SparkPlan = {
-    val windowGroupLimitExecShim = 
windowGroupLimitPlan.asInstanceOf[WindowGroupLimitExecShim]
+  override def getWindowGroupLimitExec(
+      windowGroupLimitExecShim: WindowGroupLimitExecShim): SparkPlan = {
+    val mode = windowGroupLimitExecShim.mode match {
+      case GlutenPartial => Partial
+      case GlutenFinal => Final
+    }
     WindowGroupLimitExec(
       windowGroupLimitExecShim.partitionSpec,
       windowGroupLimitExecShim.orderSpec,
       windowGroupLimitExecShim.rankLikeFunction,
       windowGroupLimitExecShim.limit,
-      windowGroupLimitExecShim.mode,
+      mode,
       windowGroupLimitExecShim.child
     )
   }
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
 
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
deleted file mode 100644
index 16166e817e..0000000000
--- 
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.window
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
SortOrder}
-import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
-
-case class WindowGroupLimitExecShim(
-    partitionSpec: Seq[Expression],
-    orderSpec: Seq[SortOrder],
-    rankLikeFunction: Expression,
-    limit: Int,
-    mode: WindowGroupLimitMode,
-    child: SparkPlan)
-  extends UnaryExecNode {
-  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
-    copy(child = newChild)
-
-  override protected def doExecute(): RDD[InternalRow] = {
-    throw new UnsupportedOperationException(
-      s"${this.getClass.getSimpleName} doesn't support doExecute")
-  }
-
-  override def output: Seq[Attribute] = child.output
-}
diff --git 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index f24d489929..11d04198ab 100644
--- 
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++ 
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -51,7 +51,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV
 import org.apache.spark.sql.execution.datasources.v2.text.TextScan
 import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
 import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike, 
ShuffleExchangeLike}
-import org.apache.spark.sql.execution.window.{WindowGroupLimitExec, 
WindowGroupLimitExecShim}
+import org.apache.spark.sql.execution.window.{Final, GlutenFinal, 
GlutenPartial, Partial, WindowGroupLimitExec, WindowGroupLimitExecShim}
 import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
 import org.apache.spark.sql.types.{DecimalType, IntegerType, LongType, 
StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -303,24 +303,32 @@ class Spark40Shims extends SparkShims {
 
   override def getWindowGroupLimitExecShim(plan: SparkPlan): 
WindowGroupLimitExecShim = {
     val windowGroupLimitPlan = plan.asInstanceOf[WindowGroupLimitExec]
+    val mode = windowGroupLimitPlan.mode match {
+      case Partial => GlutenPartial
+      case Final => GlutenFinal
+    }
     WindowGroupLimitExecShim(
       windowGroupLimitPlan.partitionSpec,
       windowGroupLimitPlan.orderSpec,
       windowGroupLimitPlan.rankLikeFunction,
       windowGroupLimitPlan.limit,
-      windowGroupLimitPlan.mode,
+      mode,
       windowGroupLimitPlan.child
     )
   }
 
-  override def getWindowGroupLimitExec(windowGroupLimitPlan: SparkPlan): 
SparkPlan = {
-    val windowGroupLimitExecShim = 
windowGroupLimitPlan.asInstanceOf[WindowGroupLimitExecShim]
+  override def getWindowGroupLimitExec(
+      windowGroupLimitExecShim: WindowGroupLimitExecShim): SparkPlan = {
+    val mode = windowGroupLimitExecShim.mode match {
+      case GlutenPartial => Partial
+      case GlutenFinal => Final
+    }
     WindowGroupLimitExec(
       windowGroupLimitExecShim.partitionSpec,
       windowGroupLimitExecShim.orderSpec,
       windowGroupLimitExecShim.rankLikeFunction,
       windowGroupLimitExecShim.limit,
-      windowGroupLimitExecShim.mode,
+      mode,
       windowGroupLimitExecShim.child
     )
   }
diff --git 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
 
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
deleted file mode 100644
index 16166e817e..0000000000
--- 
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.window
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, 
SortOrder}
-import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
-
-case class WindowGroupLimitExecShim(
-    partitionSpec: Seq[Expression],
-    orderSpec: Seq[SortOrder],
-    rankLikeFunction: Expression,
-    limit: Int,
-    mode: WindowGroupLimitMode,
-    child: SparkPlan)
-  extends UnaryExecNode {
-  override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
-    copy(child = newChild)
-
-  override protected def doExecute(): RDD[InternalRow] = {
-    throw new UnsupportedOperationException(
-      s"${this.getClass.getSimpleName} doesn't support doExecute")
-  }
-
-  override def output: Seq[Attribute] = child.output
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to