This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new bf97aa6530 [GLUTEN-10668] Refactor to share `WindowGroupLimitExecShim`
across different Spark shims (#10669)
bf97aa6530 is described below
commit bf97aa6530d627b30861143c75578d8d90a765e1
Author: Jiaan Geng <[email protected]>
AuthorDate: Mon Sep 15 15:04:12 2025 +0800
[GLUTEN-10668] Refactor to share `WindowGroupLimitExecShim` across
different Spark shims (#10669)
---
.../clickhouse/CHSparkPlanExecApi.scala | 2 +-
.../CHWindowGroupLimitExecTransformer.scala | 8 ++--
.../gluten/backendsapi/SparkPlanExecApi.scala | 2 +-
.../WindowGroupLimitExecTransformer.scala | 8 ++--
.../columnar/offload/OffloadSingleNodeRules.scala | 19 +++++----
.../columnar/rewrite/PullOutPreProject.scala | 44 ++++++++++----------
.../org/apache/gluten/sql/shims/SparkShims.scala | 5 ++-
.../window/WindowGroupLimitExecShim.scala | 8 ++--
.../window/WindowGroupLimitExecShim.scala | 47 ----------------------
.../window/WindowGroupLimitExecShim.scala | 47 ----------------------
.../gluten/sql/shims/spark35/Spark35Shims.scala | 18 ++++++---
.../window/WindowGroupLimitExecShim.scala | 41 -------------------
.../gluten/sql/shims/spark40/Spark40Shims.scala | 18 ++++++---
.../window/WindowGroupLimitExecShim.scala | 41 -------------------
14 files changed, 74 insertions(+), 234 deletions(-)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 44d58e4747..1291b4d257 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -938,7 +938,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
orderSpec: Seq[SortOrder],
rankLikeFunction: Expression,
limit: Int,
- mode: WindowGroupLimitMode,
+ mode: GlutenWindowGroupLimitMode,
child: SparkPlan): SparkPlan =
CHWindowGroupLimitExecTransformer(
partitionSpec,
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHWindowGroupLimitExecTransformer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHWindowGroupLimitExecTransformer.scala
index 5b5c3c21a6..7b1fbc20cf 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHWindowGroupLimitExecTransformer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHWindowGroupLimitExecTransformer.scala
@@ -29,7 +29,7 @@ import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples,
ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.window.{Final, Partial,
WindowGroupLimitMode}
+import org.apache.spark.sql.execution.window.{GlutenFinal, GlutenPartial,
GlutenWindowGroupLimitMode}
import com.google.protobuf.StringValue
import io.substrait.proto.SortField
@@ -41,7 +41,7 @@ case class CHWindowGroupLimitExecTransformer(
orderSpec: Seq[SortOrder],
rankLikeFunction: Expression,
limit: Int,
- mode: WindowGroupLimitMode,
+ mode: GlutenWindowGroupLimitMode,
child: SparkPlan)
extends UnaryTransformSupport {
@@ -57,8 +57,8 @@ case class CHWindowGroupLimitExecTransformer(
override def output: Seq[Attribute] = child.output
override def requiredChildDistribution: Seq[Distribution] = mode match {
- case Partial => super.requiredChildDistribution
- case Final =>
+ case GlutenPartial => super.requiredChildDistribution
+ case GlutenFinal =>
if (partitionSpec.isEmpty) {
AllTuples :: Nil
} else {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index addfd1800a..fc53cbf6a3 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -730,7 +730,7 @@ trait SparkPlanExecApi {
orderSpec: Seq[SortOrder],
rankLikeFunction: Expression,
limit: Int,
- mode: WindowGroupLimitMode,
+ mode: GlutenWindowGroupLimitMode,
child: SparkPlan): SparkPlan =
WindowGroupLimitExecTransformer(partitionSpec, orderSpec,
rankLikeFunction, limit, mode, child)
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
index 6ba0365d37..282e1b8e71 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/WindowGroupLimitExecTransformer.scala
@@ -25,7 +25,7 @@ import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute,
Expression, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples,
ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.window.{Final, Partial,
WindowGroupLimitMode}
+import org.apache.spark.sql.execution.window.{GlutenFinal, GlutenPartial,
GlutenWindowGroupLimitMode}
import io.substrait.proto.SortField
@@ -36,7 +36,7 @@ case class WindowGroupLimitExecTransformer(
orderSpec: Seq[SortOrder],
rankLikeFunction: Expression,
limit: Int,
- mode: WindowGroupLimitMode,
+ mode: GlutenWindowGroupLimitMode,
child: SparkPlan)
extends UnaryTransformSupport {
@@ -52,8 +52,8 @@ case class WindowGroupLimitExecTransformer(
override def output: Seq[Attribute] = child.output
override def requiredChildDistribution: Seq[Distribution] = mode match {
- case Partial => super.requiredChildDistribution
- case Final =>
+ case GlutenPartial => super.requiredChildDistribution
+ case GlutenFinal =>
if (partitionSpec.isEmpty) {
AllTuples :: Nil
} else {
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index d9e6399235..93ee65c255 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -33,7 +33,7 @@ import
org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec,
ShuffleExchangeExec}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.{ArrowEvalPythonExec,
BatchEvalPythonExec, EvalPythonExecTransformer}
-import org.apache.spark.sql.execution.window.{WindowExec,
WindowGroupLimitExecShim}
+import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.hive.HiveTableScanExecTransformer
// Exchange transformation.
@@ -270,16 +270,15 @@ object OffloadOthers {
plan.orderSpec,
plan.child)
case plan if
SparkShimLoader.getSparkShims.isWindowGroupLimitExec(plan) =>
- val windowGroupLimitPlan = SparkShimLoader.getSparkShims
- .getWindowGroupLimitExecShim(plan)
- .asInstanceOf[WindowGroupLimitExecShim]
+ val windowGroupLimitExecShim =
+ SparkShimLoader.getSparkShims.getWindowGroupLimitExecShim(plan)
BackendsApiManager.getSparkPlanExecApiInstance.genWindowGroupLimitTransformer(
- windowGroupLimitPlan.partitionSpec,
- windowGroupLimitPlan.orderSpec,
- windowGroupLimitPlan.rankLikeFunction,
- windowGroupLimitPlan.limit,
- windowGroupLimitPlan.mode,
- windowGroupLimitPlan.child
+ windowGroupLimitExecShim.partitionSpec,
+ windowGroupLimitExecShim.orderSpec,
+ windowGroupLimitExecShim.rankLikeFunction,
+ windowGroupLimitExecShim.limit,
+ windowGroupLimitExecShim.mode,
+ windowGroupLimitExecShim.child
)
case plan: GlobalLimitExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala
index a3c6669a2c..13c14bffc0 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala
@@ -25,7 +25,7 @@ import
org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression,
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.aggregate.{BaseAggregateExec,
TypedAggregateExpression}
import org.apache.spark.sql.execution.python.ArrowEvalPythonExec
-import org.apache.spark.sql.execution.window.{WindowExec,
WindowGroupLimitExecShim}
+import org.apache.spark.sql.execution.window.WindowExec
import scala.collection.mutable
@@ -91,11 +91,10 @@ object PullOutPreProject extends RewriteSingleNode with
PullOutProjectHelper {
}.isDefined) ||
windowNeedPreComputeRangeFrame(window)
case plan if SparkShimLoader.getSparkShims.isWindowGroupLimitExec(plan)
=>
- val window = SparkShimLoader.getSparkShims
- .getWindowGroupLimitExecShim(plan)
- .asInstanceOf[WindowGroupLimitExecShim]
- window.orderSpec.exists(o => isNotAttribute(o.child)) ||
- window.partitionSpec.exists(isNotAttribute)
+ val windowGroupLimitExecShim =
+ SparkShimLoader.getSparkShims.getWindowGroupLimitExecShim(plan)
+ windowGroupLimitExecShim.orderSpec.exists(o =>
isNotAttribute(o.child)) ||
+ windowGroupLimitExecShim.partitionSpec.exists(isNotAttribute)
case expand: ExpandExec =>
expand.projections.flatten.exists(isNotAttributeAndLiteral)
case _ => false
}
@@ -223,36 +222,37 @@ object PullOutPreProject extends RewriteSingleNode with
PullOutProjectHelper {
newProject
case plan
- if SparkShimLoader.getSparkShims.isWindowGroupLimitExec(plan) &&
needsPreProject(plan) =>
- val windowLimit = SparkShimLoader.getSparkShims
- .getWindowGroupLimitExecShim(plan)
- .asInstanceOf[WindowGroupLimitExecShim]
+ if SparkShimLoader.getSparkShims.isWindowGroupLimitExec(plan) &&
+ needsPreProject(plan) =>
+ val windowGroupLimitExecShim =
+ SparkShimLoader.getSparkShims.getWindowGroupLimitExecShim(plan)
val expressionMap = new mutable.HashMap[Expression, NamedExpression]()
// Handle orderSpec.
- val newOrderSpec = getNewSortOrder(windowLimit.orderSpec, expressionMap)
+ val newOrderSpec = getNewSortOrder(windowGroupLimitExecShim.orderSpec,
expressionMap)
// Handle partitionSpec.
val newPartitionSpec =
-
windowLimit.partitionSpec.toIndexedSeq.map(replaceExpressionWithAttribute(_,
expressionMap))
+ windowGroupLimitExecShim.partitionSpec.toIndexedSeq.map(
+ replaceExpressionWithAttribute(_, expressionMap))
val newChild = ProjectExec(
- eliminateProjectList(windowLimit.child.outputSet,
expressionMap.values.toSeq),
- windowLimit.child)
- windowLimit.child.logicalLink.foreach(newChild.setLogicalLink)
+ eliminateProjectList(windowGroupLimitExecShim.child.outputSet,
expressionMap.values.toSeq),
+ windowGroupLimitExecShim.child)
+
windowGroupLimitExecShim.child.logicalLink.foreach(newChild.setLogicalLink)
- val newWindowLimitShim = windowLimit.copy(
+ val newWindowGroupLimitExecShim = windowGroupLimitExecShim.copy(
orderSpec = newOrderSpec,
partitionSpec = newPartitionSpec,
child = newChild
)
- newWindowLimitShim.copyTagsFrom(windowLimit)
+ newWindowGroupLimitExecShim.copyTagsFrom(windowGroupLimitExecShim)
- val newWindowLimit = SparkShimLoader.getSparkShims
- .getWindowGroupLimitExec(newWindowLimitShim)
- newWindowLimit.copyTagsFrom(newWindowLimitShim)
+ val newWindowGroupLimitExec =
+
SparkShimLoader.getSparkShims.getWindowGroupLimitExec(newWindowGroupLimitExecShim)
+ newWindowGroupLimitExec.copyTagsFrom(newWindowGroupLimitExecShim)
- val newProject = ProjectExec(plan.output, newWindowLimit)
- newWindowLimit.logicalLink.foreach(newProject.setLogicalLink)
+ val newProject = ProjectExec(plan.output, newWindowGroupLimitExec)
+ newWindowGroupLimitExec.logicalLink.foreach(newProject.setLogicalLink)
newProject
case expand: ExpandExec if needsPreProject(expand) =>
diff --git
a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
index 570e3cfef5..3a59135c80 100644
--- a/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/sql/shims/SparkShims.scala
@@ -43,6 +43,7 @@ import
org.apache.spark.sql.execution.datasources.parquet.ParquetFilters
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec,
DataSourceV2ScanExecBase}
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike,
ShuffleExchangeLike}
+import org.apache.spark.sql.execution.window.WindowGroupLimitExecShim
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DecimalType, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -151,9 +152,9 @@ trait SparkShims {
def isWindowGroupLimitExec(plan: SparkPlan): Boolean = false
- def getWindowGroupLimitExecShim(plan: SparkPlan): SparkPlan = null
+ def getWindowGroupLimitExecShim(plan: SparkPlan): WindowGroupLimitExecShim =
null
- def getWindowGroupLimitExec(windowGroupLimitPlan: SparkPlan): SparkPlan =
null
+ def getWindowGroupLimitExec(windowGroupLimitExecShim:
WindowGroupLimitExecShim): SparkPlan = null
def getLimitAndOffsetFromGlobalLimit(plan: GlobalLimitExec): (Int, Int) =
(plan.limit, 0)
diff --git
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
b/shims/common/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
similarity index 89%
rename from
shims/spark32/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
rename to
shims/common/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
index 6aa2e5fb81..662a32251c 100644
---
a/shims/spark32/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
+++
b/shims/common/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
@@ -21,18 +21,18 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
SortOrder}
import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
-sealed trait WindowGroupLimitMode
+sealed trait GlutenWindowGroupLimitMode
-case object Partial extends WindowGroupLimitMode
+case object GlutenPartial extends GlutenWindowGroupLimitMode
-case object Final extends WindowGroupLimitMode
+case object GlutenFinal extends GlutenWindowGroupLimitMode
case class WindowGroupLimitExecShim(
partitionSpec: Seq[Expression],
orderSpec: Seq[SortOrder],
rankLikeFunction: Expression,
limit: Int,
- mode: WindowGroupLimitMode,
+ mode: GlutenWindowGroupLimitMode,
child: SparkPlan)
extends UnaryExecNode {
override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
diff --git
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
b/shims/spark33/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
deleted file mode 100644
index 6aa2e5fb81..0000000000
---
a/shims/spark33/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.window
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
SortOrder}
-import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
-
-sealed trait WindowGroupLimitMode
-
-case object Partial extends WindowGroupLimitMode
-
-case object Final extends WindowGroupLimitMode
-
-case class WindowGroupLimitExecShim(
- partitionSpec: Seq[Expression],
- orderSpec: Seq[SortOrder],
- rankLikeFunction: Expression,
- limit: Int,
- mode: WindowGroupLimitMode,
- child: SparkPlan)
- extends UnaryExecNode {
- override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
- copy(child = newChild)
-
- override protected def doExecute(): RDD[InternalRow] = {
- throw new UnsupportedOperationException(
- s"${this.getClass.getSimpleName} doesn't support doExecute")
- }
-
- override def output: Seq[Attribute] = child.output
-}
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
b/shims/spark34/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
deleted file mode 100644
index 6aa2e5fb81..0000000000
---
a/shims/spark34/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.window
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
SortOrder}
-import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
-
-sealed trait WindowGroupLimitMode
-
-case object Partial extends WindowGroupLimitMode
-
-case object Final extends WindowGroupLimitMode
-
-case class WindowGroupLimitExecShim(
- partitionSpec: Seq[Expression],
- orderSpec: Seq[SortOrder],
- rankLikeFunction: Expression,
- limit: Int,
- mode: WindowGroupLimitMode,
- child: SparkPlan)
- extends UnaryExecNode {
- override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
- copy(child = newChild)
-
- override protected def doExecute(): RDD[InternalRow] = {
- throw new UnsupportedOperationException(
- s"${this.getClass.getSimpleName} doesn't support doExecute")
- }
-
- override def output: Seq[Attribute] = child.output
-}
diff --git
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
index e48a401722..58759949f7 100644
---
a/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
+++
b/shims/spark35/src/main/scala/org/apache/gluten/sql/shims/spark35/Spark35Shims.scala
@@ -51,7 +51,7 @@ import
org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike,
ShuffleExchangeLike}
-import org.apache.spark.sql.execution.window.{WindowGroupLimitExec,
WindowGroupLimitExecShim}
+import org.apache.spark.sql.execution.window.{Final, GlutenFinal,
GlutenPartial, Partial, WindowGroupLimitExec, WindowGroupLimitExecShim}
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types.{DecimalType, IntegerType, LongType,
StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -304,24 +304,32 @@ class Spark35Shims extends SparkShims {
override def getWindowGroupLimitExecShim(plan: SparkPlan):
WindowGroupLimitExecShim = {
val windowGroupLimitPlan = plan.asInstanceOf[WindowGroupLimitExec]
+ val mode = windowGroupLimitPlan.mode match {
+ case Partial => GlutenPartial
+ case Final => GlutenFinal
+ }
WindowGroupLimitExecShim(
windowGroupLimitPlan.partitionSpec,
windowGroupLimitPlan.orderSpec,
windowGroupLimitPlan.rankLikeFunction,
windowGroupLimitPlan.limit,
- windowGroupLimitPlan.mode,
+ mode,
windowGroupLimitPlan.child
)
}
- override def getWindowGroupLimitExec(windowGroupLimitPlan: SparkPlan):
SparkPlan = {
- val windowGroupLimitExecShim =
windowGroupLimitPlan.asInstanceOf[WindowGroupLimitExecShim]
+ override def getWindowGroupLimitExec(
+ windowGroupLimitExecShim: WindowGroupLimitExecShim): SparkPlan = {
+ val mode = windowGroupLimitExecShim.mode match {
+ case GlutenPartial => Partial
+ case GlutenFinal => Final
+ }
WindowGroupLimitExec(
windowGroupLimitExecShim.partitionSpec,
windowGroupLimitExecShim.orderSpec,
windowGroupLimitExecShim.rankLikeFunction,
windowGroupLimitExecShim.limit,
- windowGroupLimitExecShim.mode,
+ mode,
windowGroupLimitExecShim.child
)
}
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
b/shims/spark35/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
deleted file mode 100644
index 16166e817e..0000000000
---
a/shims/spark35/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.window
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
SortOrder}
-import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
-
-case class WindowGroupLimitExecShim(
- partitionSpec: Seq[Expression],
- orderSpec: Seq[SortOrder],
- rankLikeFunction: Expression,
- limit: Int,
- mode: WindowGroupLimitMode,
- child: SparkPlan)
- extends UnaryExecNode {
- override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
- copy(child = newChild)
-
- override protected def doExecute(): RDD[InternalRow] = {
- throw new UnsupportedOperationException(
- s"${this.getClass.getSimpleName} doesn't support doExecute")
- }
-
- override def output: Seq[Attribute] = child.output
-}
diff --git
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
index f24d489929..11d04198ab 100644
---
a/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
+++
b/shims/spark40/src/main/scala/org/apache/gluten/sql/shims/spark40/Spark40Shims.scala
@@ -51,7 +51,7 @@ import
org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, DataSourceV
import org.apache.spark.sql.execution.datasources.v2.text.TextScan
import org.apache.spark.sql.execution.datasources.v2.utils.CatalogUtil
import org.apache.spark.sql.execution.exchange.{BroadcastExchangeLike,
ShuffleExchangeLike}
-import org.apache.spark.sql.execution.window.{WindowGroupLimitExec,
WindowGroupLimitExecShim}
+import org.apache.spark.sql.execution.window.{Final, GlutenFinal,
GlutenPartial, Partial, WindowGroupLimitExec, WindowGroupLimitExecShim}
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}
import org.apache.spark.sql.types.{DecimalType, IntegerType, LongType,
StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -303,24 +303,32 @@ class Spark40Shims extends SparkShims {
override def getWindowGroupLimitExecShim(plan: SparkPlan):
WindowGroupLimitExecShim = {
val windowGroupLimitPlan = plan.asInstanceOf[WindowGroupLimitExec]
+ val mode = windowGroupLimitPlan.mode match {
+ case Partial => GlutenPartial
+ case Final => GlutenFinal
+ }
WindowGroupLimitExecShim(
windowGroupLimitPlan.partitionSpec,
windowGroupLimitPlan.orderSpec,
windowGroupLimitPlan.rankLikeFunction,
windowGroupLimitPlan.limit,
- windowGroupLimitPlan.mode,
+ mode,
windowGroupLimitPlan.child
)
}
- override def getWindowGroupLimitExec(windowGroupLimitPlan: SparkPlan):
SparkPlan = {
- val windowGroupLimitExecShim =
windowGroupLimitPlan.asInstanceOf[WindowGroupLimitExecShim]
+ override def getWindowGroupLimitExec(
+ windowGroupLimitExecShim: WindowGroupLimitExecShim): SparkPlan = {
+ val mode = windowGroupLimitExecShim.mode match {
+ case GlutenPartial => Partial
+ case GlutenFinal => Final
+ }
WindowGroupLimitExec(
windowGroupLimitExecShim.partitionSpec,
windowGroupLimitExecShim.orderSpec,
windowGroupLimitExecShim.rankLikeFunction,
windowGroupLimitExecShim.limit,
- windowGroupLimitExecShim.mode,
+ mode,
windowGroupLimitExecShim.child
)
}
diff --git
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
b/shims/spark40/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
deleted file mode 100644
index 16166e817e..0000000000
---
a/shims/spark40/src/main/scala/org/apache/spark/sql/execution/window/WindowGroupLimitExecShim.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.execution.window
-
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression,
SortOrder}
-import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
-
-case class WindowGroupLimitExecShim(
- partitionSpec: Seq[Expression],
- orderSpec: Seq[SortOrder],
- rankLikeFunction: Expression,
- limit: Int,
- mode: WindowGroupLimitMode,
- child: SparkPlan)
- extends UnaryExecNode {
- override protected def withNewChildInternal(newChild: SparkPlan): SparkPlan =
- copy(child = newChild)
-
- override protected def doExecute(): RDD[InternalRow] = {
- throw new UnsupportedOperationException(
- s"${this.getClass.getSimpleName} doesn't support doExecute")
- }
-
- override def output: Seq[Attribute] = child.output
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]