This is an automated email from the ASF dual-hosted git repository.

changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new 89d467a6a1 [GLUTEN-11400][CORE]  Implement partitioning-aware union 
for ColumnarUnionExec (#11455)
89d467a6a1 is described below

commit 89d467a6a105dd396b079639e7eb5033f7b32fdb
Author: Chang Chen <[email protected]>
AuthorDate: Fri Jan 23 19:40:13 2026 +0800

    [GLUTEN-11400][CORE]  Implement partitioning-aware union for 
ColumnarUnionExec (#11455)
    
    * Refactor doExecute method signature in BasicPhysicalOperatorTransformer
    * Spark: Implement partitioning-aware union for ColumnarUnionExec
    * Revert "[4.1.0] Add `spark.sql.unionOutputPartitioning=false` to Maven 
test args in workflows"
    * Spark: Add Gluten tests for SPARK-52921 union partitioning
    * Spark: Add test for broadcast exchange metrics reset behavior in 
GlutenBroadcastExchangeSuite
    * Adding spark.sql.unionOutputPartitioning=false when run with RAS.
    * Spark: Propagate outputOrdering and outputPartitioning to 
ColumnarRangeExec
    * Spark: Refactor RangeExec transformers to pass LogicalRange instead of 
individual fields
    * fix ut
    * fix clickhouse build
---
 .github/workflows/velox_backend_x86.yml            |  12 ++-
 .../clickhouse/CHSparkPlanExecApi.scala            |  11 +-
 .../gluten/execution/CHRangeExecTransformer.scala  |  20 ++--
 .../backendsapi/velox/VeloxSparkPlanExecApi.scala  |  11 +-
 .../gluten/execution/ColumnarRangeExec.scala       |  55 +++++-----
 .../gluten/backendsapi/SparkPlanExecApi.scala      |   9 +-
 .../BasicPhysicalOperatorTransformer.scala         |  40 +++++--
 .../gluten/execution/JoinExecTransformer.scala     |   7 ++
 .../columnar/offload/OffloadSingleNodeRules.scala  |  13 +--
 .../sql}/execution/RangeExecBaseTransformer.scala  |  52 ++++-----
 .../sql/execution/GlutenQueryExecutionSuite.scala  |   2 +-
 .../sql/execution/GlutenQueryExecutionSuite.scala  |   2 +-
 .../sql/execution/GlutenQueryExecutionSuite.scala  |   2 +-
 .../sql/execution/GlutenQueryExecutionSuite.scala  |   2 +-
 .../gluten/utils/velox/VeloxTestSettings.scala     |   6 +-
 .../sql/GlutenDataFrameSetOperationsSuite.scala    | 116 +++++++++++++++++++++
 .../execution/GlutenBroadcastExchangeSuite.scala   |  21 +++-
 .../sql/execution/GlutenQueryExecutionSuite.scala  |   2 +-
 .../sql/execution/GlutenSQLRangeExecSuite.scala    |   1 -
 .../scala/org/apache/spark/SparkContextUtils.scala |  13 ++-
 .../scala/org/apache/spark/SparkContextUtils.scala |  10 ++
 .../scala/org/apache/spark/SparkContextUtils.scala |  10 ++
 .../scala/org/apache/spark/SparkContextUtils.scala |  10 ++
 .../scala/org/apache/spark/SparkContextUtils.scala |   9 ++
 .../org/apache/gluten/integration/Suite.scala      |   1 -
 25 files changed, 299 insertions(+), 138 deletions(-)

diff --git a/.github/workflows/velox_backend_x86.yml 
b/.github/workflows/velox_backend_x86.yml
index 14fb70e6dd..f324ab5596 100644
--- a/.github/workflows/velox_backend_x86.yml
+++ b/.github/workflows/velox_backend_x86.yml
@@ -286,12 +286,16 @@ jobs:
         run: |
           echo "JAVA_HOME: $JAVA_HOME"
           cd $GITHUB_WORKSPACE/tools/gluten-it
+          SPARK41_CONF=""
+          if [ "${{ matrix.spark }}" = "spark-4.1" ]; then
+            SPARK41_CONF="--extra-conf=spark.sql.unionOutputPartitioning=false"
+          fi
           GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
             --local --preset=velox --benchmark-type=h --error-on-memleak 
--off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
-            --extra-conf=spark.gluten.ras.enabled=true \
+            --extra-conf=spark.gluten.ras.enabled=true $SPARK41_CONF \
           && GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh queries-compare \
             --local --preset=velox --benchmark-type=ds --error-on-memleak 
--off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
-            --extra-conf=spark.gluten.ras.enabled=true 
+            --extra-conf=spark.gluten.ras.enabled=true $SPARK41_CONF 
 
   tpc-test-centos7:
     needs: build-native-lib-centos-7
@@ -1440,7 +1444,7 @@ jobs:
           export PATH=$JAVA_HOME/bin:$PATH
           java -version
           $MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 
-Pbackends-velox \
-          -Pspark-ut 
-DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/ 
-Dspark.sql.unionOutputPartitioning=false" \
+          -Pspark-ut 
-DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
           
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
       - name: Upload test report
         if: always()
@@ -1489,7 +1493,7 @@ jobs:
           export PATH=$JAVA_HOME/bin:$PATH
           java -version
           $MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 
-Pbackends-velox -Pspark-ut \
-          -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/ 
-Dspark.sql.unionOutputPartitioning=false" \
+          -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
           -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
       - name: Upload test report
         if: always()
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 68ada074e0..9208b48740 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -978,15 +978,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with 
Logging {
   override def genColumnarTailExec(limit: Int, child: SparkPlan): 
ColumnarCollectTailBaseExec =
     CHColumnarCollectTailExec(limit, child)
 
-  override def genColumnarRangeExec(
-      start: Long,
-      end: Long,
-      step: Long,
-      numSlices: Int,
-      numElements: BigInt,
-      outputAttributes: Seq[Attribute],
-      child: Seq[SparkPlan]): ColumnarRangeBaseExec =
-    CHRangeExecTransformer(start, end, step, numSlices, numElements, 
outputAttributes, child)
+  override def genColumnarRangeExec(rangeExec: RangeExec): 
ColumnarRangeBaseExec =
+    CHRangeExecTransformer(rangeExec.range)
 
   override def expressionFlattenSupported(expr: Expression): Boolean = expr 
match {
     case ca: FlattenedAnd => CHFlattenedExpression.supported(ca.name)
diff --git 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
index bd8ef3f6f4..6b3b7da72f 100644
--- 
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
+++ 
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
@@ -26,8 +26,8 @@ import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo}
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.Partition
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Range => LogicalRange}
+import org.apache.spark.sql.execution.{ColumnarRangeBaseExec, SparkPlan}
 import 
org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 
@@ -36,17 +36,14 @@ import io.substrait.proto.NamedStruct
 
 import scala.collection.JavaConverters;
 
-case class CHRangeExecTransformer(
-    start: Long,
-    end: Long,
-    step: Long,
-    numSlices: Int,
-    numElements: BigInt,
-    outputAttributes: Seq[Attribute],
-    child: Seq[SparkPlan])
-  extends ColumnarRangeBaseExec(start, end, step, numSlices, numElements, 
outputAttributes, child)
+case class CHRangeExecTransformer(range: LogicalRange)
+  extends ColumnarRangeBaseExec
   with LeafTransformSupport {
 
+  override def doCanonicalize(): SparkPlan = {
+    CHRangeExecTransformer(range.canonicalized.asInstanceOf[LogicalRange])
+  }
+
   override def getSplitInfos: Seq[SplitInfo] = {
     (0 until numSlices).map {
       sliceIndex =>
@@ -83,7 +80,6 @@ case class CHRangeExecTransformer(
   override protected def doValidateInternal(): ValidationResult = 
ValidationResult.succeeded
 
   override def doTransform(context: SubstraitContext): TransformContext = {
-    val output = outputAttributes
     val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
     val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output)
     val columnTypeNodes = JavaConverters
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 10a7f6f3ea..69419deb1a 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -1028,15 +1028,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
       offset: Int): ColumnarCollectLimitBaseExec =
     ColumnarCollectLimitExec(limit, child, offset)
 
-  override def genColumnarRangeExec(
-      start: Long,
-      end: Long,
-      step: Long,
-      numSlices: Int,
-      numElements: BigInt,
-      outputAttributes: Seq[Attribute],
-      child: Seq[SparkPlan]): ColumnarRangeBaseExec =
-    ColumnarRangeExec(start, end, step, numSlices, numElements, 
outputAttributes, child)
+  override def genColumnarRangeExec(rangeExec: RangeExec): 
ColumnarRangeBaseExec =
+    ColumnarRangeExec(rangeExec.range)
 
   override def genColumnarTailExec(limit: Int, child: SparkPlan): 
ColumnarCollectTailBaseExec =
     ColumnarCollectTailExec(limit, child)
diff --git 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
index 58a64d891f..cbdf8b924b 100644
--- 
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
+++ 
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
@@ -22,8 +22,10 @@ import org.apache.gluten.iterator.Iterators
 import org.apache.gluten.vectorized.ArrowWritableColumnVector
 
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.catalyst.expressions.SortOrder
+import org.apache.spark.sql.catalyst.plans.logical.{Range => LogicalRange}
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
RangePartitioning, SinglePartition, UnknownPartitioning}
+import org.apache.spark.sql.execution.{ColumnarRangeBaseExec, SparkPlan}
 import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
 
 /**
@@ -31,37 +33,35 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch, 
ColumnVector}
  * operation and supports columnar processing. It generates columnar batches 
for the specified
  * range.
  *
- * @param start
- *   Starting value of the range.
- * @param end
- *   Ending value of the range.
- * @param step
- *   Step size for the range.
- * @param numSlices
- *   Number of slices for partitioning the range.
- * @param numElements
- *   Total number of elements in the range.
- * @param outputAttributes
- *   Attributes defining the output schema of the operator.
- * @param child
- *   Child SparkPlan nodes for this operator, if any.
+ * @param range
+ *   The logical Range plan containing start, end, step, and numSlices 
information.
  */
-case class ColumnarRangeExec(
-    start: Long,
-    end: Long,
-    step: Long,
-    numSlices: Int,
-    numElements: BigInt,
-    outputAttributes: Seq[Attribute],
-    child: Seq[SparkPlan]
-) extends ColumnarRangeBaseExec(start, end, step, numSlices, numElements, 
outputAttributes, child) {
+case class ColumnarRangeExec(range: LogicalRange) extends 
ColumnarRangeBaseExec {
+
+  override def outputOrdering: Seq[SortOrder] = range.outputOrdering
+
+  override def outputPartitioning: Partitioning = {
+    if (numElements > 0) {
+      if (numSlices == 1) {
+        SinglePartition
+      } else {
+        RangePartitioning(outputOrdering, numSlices)
+      }
+    } else {
+      UnknownPartitioning(0)
+    }
+  }
+
+  override def doCanonicalize(): SparkPlan = {
+    ColumnarRangeExec(range.canonicalized.asInstanceOf[LogicalRange])
+  }
 
   override def batchType(): Convention.BatchType = {
     ArrowJavaBatchType
   }
 
   override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
-    if (start == end || (start < end ^ 0 < step)) {
+    if (isEmptyRange) {
       sparkContext.emptyRDD[ColumnarBatch]
     } else {
       sparkContext
@@ -77,9 +77,6 @@ case class ColumnarRangeExec(
               else if (value > 0) Long.MaxValue
               else Long.MinValue
 
-            val partitionStart = getSafeMargin(safePartitionStart)
-            val partitionEnd = getSafeMargin(safePartitionEnd)
-
             /**
              * Generates the columnar batches for the specified range. Each 
batch contains a subset
              * of the range values, managed using Arrow column vectors.
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 980c04e99e..002c8dad7b 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -726,14 +726,7 @@ trait SparkPlanExecApi {
       plan: SparkPlan,
       offset: Int): ColumnarCollectLimitBaseExec
 
-  def genColumnarRangeExec(
-      start: Long,
-      end: Long,
-      step: Long,
-      numSlices: Int,
-      numElements: BigInt,
-      outputAttributes: Seq[Attribute],
-      child: Seq[SparkPlan]): ColumnarRangeBaseExec
+  def genColumnarRangeExec(rangeExec: RangeExec): ColumnarRangeBaseExec
 
   def genColumnarTailExec(limit: Int, plan: SparkPlan): 
ColumnarCollectTailBaseExec
 
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 93d6201c73..57e7373fc3 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -24,9 +24,12 @@ import org.apache.gluten.metrics.MetricsUpdater
 import org.apache.gluten.substrait.SubstraitContext
 import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
 
+import org.apache.spark.SparkContextUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, 
UnknownPartitioning}
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
 import org.apache.spark.sql.utils.StructTypeFWD
@@ -233,7 +236,9 @@ abstract class ProjectExecTransformerBase(val list: 
Seq[NamedExpression], val in
 }
 
 // An alternative for UnionExec.
-case class ColumnarUnionExec(children: Seq[SparkPlan]) extends ValidatablePlan 
{
+case class ColumnarUnionExec(children: Seq[SparkPlan], partitioning: 
Partitioning)
+  extends ValidatablePlan {
+  require(children.nonEmpty, "ColumnarUnionExec requires at least one child.")
   children.foreach {
     case w: WholeStageTransformer =>
       // FIXME: Avoid such practice for plan immutability.
@@ -245,6 +250,8 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) 
extends ValidatablePlan {
 
   override def rowType0(): Convention.RowType = Convention.RowType.None
 
+  override def outputPartitioning: Partitioning = partitioning
+
   override def output: Seq[Attribute] = {
     children.map(_.output).transpose.map {
       attrs =>
@@ -265,19 +272,32 @@ case class ColumnarUnionExec(children: Seq[SparkPlan]) 
extends ValidatablePlan {
       newChildren: IndexedSeq[SparkPlan]): ColumnarUnionExec =
     copy(children = newChildren)
 
-  def columnarInputRDD: RDD[ColumnarBatch] = {
-    if (children.isEmpty) {
-      throw new IllegalArgumentException(s"Empty children")
-    }
-    sparkContext.union(children.map(c => c.executeColumnar()))
+  override protected def doExecute(): RDD[InternalRow] = {
+    throw new UnsupportedOperationException(s"This operator doesn't support 
doExecute().")
   }
 
-  override protected def doExecute()
-      : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
-    throw new UnsupportedOperationException(s"This operator doesn't support 
doExecute().")
+  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    if (outputPartitioning.isInstanceOf[UnknownPartitioning]) {
+      sparkContext.union(children.map(c => c.executeColumnar()))
+    } else {
+      // This union has a known partitioning, i.e., its children have the same 
partitioning
+      // in semantics so this union can choose not to change the partitioning 
by using a
+      // custom partitioning aware union RDD.
+      val nonEmptyRdds = 
children.map(_.executeColumnar()).filter(!_.partitions.isEmpty)
+      SparkContextUtils.createPartitioningAwareUnionRDD(
+        sparkContext,
+        nonEmptyRdds,
+        outputPartitioning.numPartitions)
+    }
   }
+}
 
-  override protected def doExecuteColumnar(): RDD[ColumnarBatch] = 
columnarInputRDD
+object ColumnarUnionExec {
+  def from(union: UnionExec): ColumnarUnionExec = {
+    val children = union.children
+    val outputPartitioning: Partitioning = union.outputPartitioning
+    ColumnarUnionExec(children, outputPartitioning)
+  }
 }
 
 /**
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
index 49af447a35..4634d7c1fb 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
@@ -375,4 +375,11 @@ abstract class BroadcastHashJoinExecTransformerBase(
   override def genJoinParametersInternal(): (Int, Int, String) = {
     (1, if (isNullAwareAntiJoin) 1 else 0, buildHashTableId)
   }
+
+  override def resetMetrics(): Unit = {
+    // see https://github.com/apache/spark/pull/51673
+    // no-op
+    // BroadcastExchangeExec after materialized won't be materialized again, 
so we should not
+    // reset the metrics. Otherwise, we will lose the metrics collected in the 
broadcast job.
+  }
 }
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index 93ee65c255..df32527b78 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -225,9 +225,8 @@ object OffloadOthers {
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
           HashAggregateExecBaseTransformer.from(plan)
         case plan: UnionExec =>
-          val children = plan.children
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
-          ColumnarUnionExec(children)
+          ColumnarUnionExec.from(plan)
         case plan: ExpandExec =>
           val child = plan.child
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
@@ -322,15 +321,7 @@ object OffloadOthers {
           }
         case plan: RangeExec =>
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
-          BackendsApiManager.getSparkPlanExecApiInstance.genColumnarRangeExec(
-            plan.start,
-            plan.end,
-            plan.step,
-            plan.numSlices,
-            plan.numElements,
-            plan.output,
-            plan.children
-          )
+          ColumnarRangeBaseExec.from(plan)
         case plan: SampleExec =>
           logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
           val child = plan.child
diff --git 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala
 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/RangeExecBaseTransformer.scala
similarity index 57%
rename from 
gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala
rename to 
gluten-substrait/src/main/scala/org/apache/spark/sql/execution/RangeExecBaseTransformer.scala
index 4e3888a25c..653b93883a 100644
--- 
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala
+++ 
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/RangeExecBaseTransformer.scala
@@ -14,37 +14,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.gluten.execution
+package org.apache.spark.sql.execution
 
 import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.ValidatablePlan
 import org.apache.gluten.extension.columnar.transition.Convention
 
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.{LeafExecNode, RangeExec, SparkPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Range => LogicalRange}
 
-/**
- * Base class for RangeExec transformation, can be implemented by the by 
supported backends.
- * Currently velox is supported.
- */
-abstract class ColumnarRangeBaseExec(
-    start: Long,
-    end: Long,
-    step: Long,
-    numSlices: Int,
-    numElements: BigInt,
-    outputAttributes: Seq[Attribute],
-    child: Seq[SparkPlan])
-  extends LeafExecNode
-  with ValidatablePlan {
+/** Base class for [[RangeExec]] transformation that can be implemented by 
supported backends. */
+abstract class ColumnarRangeBaseExec extends LeafExecNode with ValidatablePlan 
{
+
+  def range: LogicalRange
+
+  val start: Long = range.start
+  val end: Long = range.end
+  val step: Long = range.step
+  val numElements: BigInt = range.numElements
+  val numSlices: Int = 
range.numSlices.getOrElse(session.leafNodeDefaultParallelism)
+  val isEmptyRange: Boolean = start == end || (start < end ^ 0 < step)
+
+  override def output: Seq[Attribute] = range.output
 
-  override def output: Seq[Attribute] = {
-    outputAttributes
+  override def simpleString(maxFields: Int): String = {
+    s"ColumnarRange $start, $end, $step, $numSlices, $numElements, " +
+      s"${output.mkString("[", ", ", "]")}"
   }
 
   override def rowType0(): Convention.RowType = Convention.RowType.None
 
-  override protected def doExecute()
-      : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
+  override protected def doExecute(): RDD[InternalRow] = {
     throw new UnsupportedOperationException(s"This operator doesn't support 
doExecute().")
   }
 }
@@ -56,14 +58,6 @@ abstract class ColumnarRangeBaseExec(
 object ColumnarRangeBaseExec {
   def from(rangeExec: RangeExec): ColumnarRangeBaseExec = {
     BackendsApiManager.getSparkPlanExecApiInstance
-      .genColumnarRangeExec(
-        rangeExec.start,
-        rangeExec.end,
-        rangeExec.step,
-        rangeExec.numSlices,
-        rangeExec.numElements,
-        rangeExec.output,
-        rangeExec.children
-      )
+      .genColumnarRangeExec(rangeExec)
   }
 }
diff --git 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
index 2819584558..005386634b 100644
--- 
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
+++ 
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
@@ -69,7 +69,7 @@ class GlutenQueryExecutionSuite extends QueryExecutionSuite 
with GlutenSQLTestsB
             "",
             "(1) ColumnarRange",
             "Output [1]: [id#xL]",
-            "Arguments: 0, 10, 1, 2, 10, [id#xL]",
+            "Arguments: Range (0, 10, step=1, splits=Some(2))",
             "",
             "(2) ColumnarToRow [codegen id : 1]",
             "Input [1]: [id#xL]",
diff --git 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
index 2819584558..005386634b 100644
--- 
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
+++ 
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
@@ -69,7 +69,7 @@ class GlutenQueryExecutionSuite extends QueryExecutionSuite 
with GlutenSQLTestsB
             "",
             "(1) ColumnarRange",
             "Output [1]: [id#xL]",
-            "Arguments: 0, 10, 1, 2, 10, [id#xL]",
+            "Arguments: Range (0, 10, step=1, splits=Some(2))",
             "",
             "(2) ColumnarToRow [codegen id : 1]",
             "Input [1]: [id#xL]",
diff --git 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
index 2819584558..005386634b 100644
--- 
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
+++ 
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
@@ -69,7 +69,7 @@ class GlutenQueryExecutionSuite extends QueryExecutionSuite 
with GlutenSQLTestsB
             "",
             "(1) ColumnarRange",
             "Output [1]: [id#xL]",
-            "Arguments: 0, 10, 1, 2, 10, [id#xL]",
+            "Arguments: Range (0, 10, step=1, splits=Some(2))",
             "",
             "(2) ColumnarToRow [codegen id : 1]",
             "Input [1]: [id#xL]",
diff --git 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
index 2819584558..005386634b 100644
--- 
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
+++ 
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
@@ -69,7 +69,7 @@ class GlutenQueryExecutionSuite extends QueryExecutionSuite 
with GlutenSQLTestsB
             "",
             "(1) ColumnarRange",
             "Output [1]: [id#xL]",
-            "Arguments: 0, 10, 1, 2, 10, [id#xL]",
+            "Arguments: Range (0, 10, step=1, splits=Some(2))",
             "",
             "(2) ColumnarToRow [codegen id : 1]",
             "Input [1]: [id#xL]",
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 050ee84f02..9cf46bf1fa 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -604,8 +604,7 @@ class VeloxTestSettings extends BackendTestSettings {
   enableSuite[GlutenOuterJoinSuiteForceShjOff]
   enableSuite[GlutenFallbackStrategiesSuite]
   enableSuite[GlutenBroadcastExchangeSuite]
-    // TODO: fix on Spark-4.1 introduced by see 
https://github.com/apache/spark/pull/51623
-    .exclude("SPARK-52962: broadcast exchange should not reset metrics")
+    .exclude("SPARK-52962: broadcast exchange should not reset metrics") // 
Add Gluten test
   enableSuite[GlutenLocalBroadcastExchangeSuite]
   enableSuite[GlutenCoalesceShufflePartitionsSuite]
     // Rewrite for Gluten. Change details are in the inline comments in 
individual tests.
@@ -751,8 +750,7 @@ class VeloxTestSettings extends BackendTestSettings {
     // Result depends on the implementation for nondeterministic expression 
rand.
     // Not really an issue.
     .exclude("SPARK-10740: handle nondeterministic expressions correctly for 
set operations")
-    // TODO: fix on Spark-4.1
-    .excludeByPrefix("SPARK-52921") // see 
https://github.com/apache/spark/pull/51623
+    .excludeByPrefix("SPARK-52921") // Add Gluten test
   enableSuite[GlutenDataFrameStatSuite]
   enableSuite[GlutenDataFrameSuite]
     // Rewrite these tests because it checks Spark's physical operators.
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameSetOperationsSuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameSetOperationsSuite.scala
index fe7958b677..51ac0a15e7 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameSetOperationsSuite.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameSetOperationsSuite.scala
@@ -16,14 +16,130 @@
  */
 package org.apache.spark.sql
 
+import org.apache.gluten.execution.ColumnarUnionExec
+
 import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
+import org.apache.spark.sql.execution.ColumnarShuffleExchangeExec
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+import org.apache.spark.sql.internal.SQLConf
 
 class GlutenDataFrameSetOperationsSuite
   extends DataFrameSetOperationsSuite
   with GlutenSQLTestsTrait {
+
   override def sparkConf: SparkConf =
     super.sparkConf
       
.set("spark.gluten.sql.columnar.backend.ch.enable.coalesce.project.union", 
"false")
       
.set("spark.gluten.sql.columnar.backend.ch.enable.coalesce.aggregation.union", 
"false")
+      .set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
+
+  import testImplicits._
+
+  testGluten("SPARK-52921: union partitioning - reused shuffle") {
+    withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+      val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", 
"c")
+      val df2 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", 
"c")
+
+      val union = df1.repartition($"a").union(df2.repartition($"a"))
+      val unionExec = union.queryExecution.executedPlan.collect { case u: 
ColumnarUnionExec => u }
+      assert(unionExec.size == 1)
+
+      val shuffle = df1.repartition($"a").queryExecution.executedPlan.collect {
+        case s: ColumnarShuffleExchangeExec => s
+      }
+      assert(shuffle.size == 1)
+
+      val reuseShuffle = union.queryExecution.executedPlan.collect {
+        case r: ReusedExchangeExec => r
+      }
+      assert(reuseShuffle.size == 1)
+
+      val childPartitioning = shuffle.head.outputPartitioning
+      val partitioning = unionExec.head.outputPartitioning
+      assert(partitioning == childPartitioning)
+    }
+  }
+
+  testGluten("SPARK-52921: union partitioning - semantic equality") {
+    val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", 
"c")
+    val df2 = Seq((4, 1, 5), (2, 4, 6), (1, 4, 2), (3, 5, 1)).toDF("d", "e", 
"f")
+
+    val correctResult = withSQLConf(SQLConf.UNION_OUTPUT_PARTITIONING.key -> 
"false") {
+      df1.repartition($"a").union(df2.repartition($"d")).collect()
+    }
+
+    Seq(true, false).foreach {
+      enabled =>
+        withSQLConf(
+          SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+          SQLConf.UNION_OUTPUT_PARTITIONING.key -> enabled.toString) {
+
+          val union = df1.repartition($"a").union(df2.repartition($"d"))
+          val unionExec = union.queryExecution.executedPlan.collect {
+            case u: ColumnarUnionExec => u
+          }
+          assert(unionExec.size == 1)
+
+          val shuffle = 
df1.repartition($"a").queryExecution.executedPlan.collect {
+            case s: ColumnarShuffleExchangeExec => s
+          }
+          assert(shuffle.size == 1)
+
+          val childPartitioning = shuffle.head.outputPartitioning
+          val partitioning = unionExec.head.outputPartitioning
+          if (enabled) {
+            assert(partitioning == childPartitioning)
+          }
+
+          checkAnswer(union, correctResult)
+
+          // Avoid unnecessary shuffle if union output partitioning is enabled
+          val shuffledUnion = union.repartition($"a")
+          val shuffleNumBefore = union.queryExecution.executedPlan.collect {
+            case s: ColumnarShuffleExchangeExec => s
+          }
+          val shuffleNumAfter = 
shuffledUnion.queryExecution.executedPlan.collect {
+            case s: ColumnarShuffleExchangeExec => s
+          }
+
+          if (enabled) {
+            assert(shuffleNumBefore.size == shuffleNumAfter.size)
+          } else {
+            assert(shuffleNumBefore.size + 1 == shuffleNumAfter.size)
+          }
+          checkAnswer(union, shuffledUnion)
+        }
+    }
+  }
+
+  testGluten("SPARK-52921: union partitioning - range partitioning") {
+    val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b", 
"c")
+    val df2 = Seq((4, 1, 5), (2, 4, 6), (1, 4, 2), (3, 5, 1)).toDF("d", "e", 
"f")
+
+    val correctResult = withSQLConf(SQLConf.UNION_OUTPUT_PARTITIONING.key -> 
"false") {
+      
df1.repartitionByRange($"a").union(df2.repartitionByRange($"d")).collect()
+    }
+
+    Seq(true, false).foreach {
+      enabled =>
+        withSQLConf(
+          SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+          SQLConf.UNION_OUTPUT_PARTITIONING.key -> enabled.toString) {
+
+          val union = 
df1.repartitionByRange($"a").union(df2.repartitionByRange($"d"))
+          val unionExec = union.queryExecution.executedPlan.collect {
+            case u: ColumnarUnionExec => u
+          }
+          assert(unionExec.size == 1)
+
+          // For range partitioning, even children have the same partitioning,
+          // the union output partitioning is still UnknownPartitioning.
+          val partitioning = unionExec.head.outputPartitioning
+          assert(partitioning.isInstanceOf[UnknownPartitioning])
 
+          checkAnswer(union, correctResult)
+        }
+    }
+  }
 }
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
index e689c2a3c1..c3f7f80283 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.spark.sql.execution
 
+import org.apache.gluten.execution.BroadcastHashJoinExecTransformer
+
 import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
 import org.apache.spark.broadcast.TorrentBroadcast
 import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, GlutenTestsBaseTrait}
@@ -23,7 +25,24 @@ import org.apache.spark.sql.classic.SparkSession
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.functions.broadcast
 
-class GlutenBroadcastExchangeSuite extends BroadcastExchangeSuite with 
GlutenSQLTestsBaseTrait {}
+class GlutenBroadcastExchangeSuite extends BroadcastExchangeSuite with 
GlutenSQLTestsBaseTrait {
+
+  testGluten("SPARK-52962: broadcast exchange should not reset metrics") {
+    val df = spark.range(1).toDF()
+    val joinDF = df.join(broadcast(df), "id")
+    joinDF.collect()
+    val broadcastExchangeExec = collect(joinDF.queryExecution.executedPlan) {
+      case p: BroadcastHashJoinExecTransformer => p
+    }
+    assert(broadcastExchangeExec.size == 1, "one and only 
BroadcastHashJoinExecTransformer")
+
+    val broadcastExchangeNode = broadcastExchangeExec.head
+    val metrics = broadcastExchangeNode.metrics
+    assert(metrics("numOutputRows").value == 1)
+    broadcastExchangeNode.resetMetrics()
+    assert(metrics("numOutputRows").value == 1)
+  }
+}
 
 // Additional tests run in 'local-cluster' mode.
 class GlutenLocalBroadcastExchangeSuite
diff --git 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
index 2819584558..005386634b 100644
--- 
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
+++ 
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
@@ -69,7 +69,7 @@ class GlutenQueryExecutionSuite extends QueryExecutionSuite 
with GlutenSQLTestsB
             "",
             "(1) ColumnarRange",
             "Output [1]: [id#xL]",
-            "Arguments: 0, 10, 1, 2, 10, [id#xL]",
+            "Arguments: Range (0, 10, step=1, splits=Some(2))",
             "",
             "(2) ColumnarToRow [codegen id : 1]",
             "Input [1]: [id#xL]",
diff --git 
a/gluten-ut/test/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala
 
b/gluten-ut/test/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala
index 03663bc156..cd05692ab3 100644
--- 
a/gluten-ut/test/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala
+++ 
b/gluten-ut/test/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala
@@ -17,7 +17,6 @@
 package org.apache.spark.sql.execution
 
 import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.ColumnarRangeBaseExec
 import org.apache.gluten.utils.BackendTestUtils
 
 import org.apache.spark.SparkConf
diff --git 
a/shims/spark40/src/main/scala/org/apache/spark/SparkContextUtils.scala 
b/shims/spark33/src/main/scala/org/apache/spark/SparkContextUtils.scala
similarity index 71%
copy from shims/spark40/src/main/scala/org/apache/spark/SparkContextUtils.scala
copy to shims/spark33/src/main/scala/org/apache/spark/SparkContextUtils.scala
index 3cbf2b602d..4e9e308dca 100644
--- a/shims/spark40/src/main/scala/org/apache/spark/SparkContextUtils.scala
+++ b/shims/spark33/src/main/scala/org/apache/spark/SparkContextUtils.scala
@@ -16,12 +16,15 @@
  */
 package org.apache.spark
 
-import org.apache.spark.broadcast.Broadcast
-
-import scala.reflect.ClassTag
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.vectorized.ColumnarBatch
 
 object SparkContextUtils {
-  def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T] 
= {
-    sc.broadcastInternal(value, serializedOnly = true)
+  def createPartitioningAwareUnionRDD(
+      sc: SparkContext,
+      rdds: Seq[RDD[ColumnarBatch]],
+      numPartitions: Int): RDD[ColumnarBatch] = {
+    throw new UnsupportedOperationException(
+      "SQLPartitioningAwareUnionRDD is only available in Spark 4.1+")
   }
 }
diff --git 
a/shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala 
b/shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala
index 3cbf2b602d..ec26d934c9 100644
--- a/shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala
+++ b/shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala
@@ -17,6 +17,8 @@
 package org.apache.spark
 
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.vectorized.ColumnarBatch
 
 import scala.reflect.ClassTag
 
@@ -24,4 +26,12 @@ object SparkContextUtils {
   def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T] 
= {
     sc.broadcastInternal(value, serializedOnly = true)
   }
+
+  def createPartitioningAwareUnionRDD(
+      sc: SparkContext,
+      rdds: Seq[RDD[ColumnarBatch]],
+      numPartitions: Int): RDD[ColumnarBatch] = {
+    throw new UnsupportedOperationException(
+      "SQLPartitioningAwareUnionRDD is only available in Spark 4.1+")
+  }
 }
diff --git 
a/shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala 
b/shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala
index 3cbf2b602d..ec26d934c9 100644
--- a/shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala
+++ b/shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala
@@ -17,6 +17,8 @@
 package org.apache.spark
 
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.vectorized.ColumnarBatch
 
 import scala.reflect.ClassTag
 
@@ -24,4 +26,12 @@ object SparkContextUtils {
   def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T] 
= {
     sc.broadcastInternal(value, serializedOnly = true)
   }
+
+  def createPartitioningAwareUnionRDD(
+      sc: SparkContext,
+      rdds: Seq[RDD[ColumnarBatch]],
+      numPartitions: Int): RDD[ColumnarBatch] = {
+    throw new UnsupportedOperationException(
+      "SQLPartitioningAwareUnionRDD is only available in Spark 4.1+")
+  }
 }
diff --git 
a/shims/spark40/src/main/scala/org/apache/spark/SparkContextUtils.scala 
b/shims/spark40/src/main/scala/org/apache/spark/SparkContextUtils.scala
index 3cbf2b602d..ec26d934c9 100644
--- a/shims/spark40/src/main/scala/org/apache/spark/SparkContextUtils.scala
+++ b/shims/spark40/src/main/scala/org/apache/spark/SparkContextUtils.scala
@@ -17,6 +17,8 @@
 package org.apache.spark
 
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.vectorized.ColumnarBatch
 
 import scala.reflect.ClassTag
 
@@ -24,4 +26,12 @@ object SparkContextUtils {
   def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T] 
= {
     sc.broadcastInternal(value, serializedOnly = true)
   }
+
+  def createPartitioningAwareUnionRDD(
+      sc: SparkContext,
+      rdds: Seq[RDD[ColumnarBatch]],
+      numPartitions: Int): RDD[ColumnarBatch] = {
+    throw new UnsupportedOperationException(
+      "SQLPartitioningAwareUnionRDD is only available in Spark 4.1+")
+  }
 }
diff --git 
a/shims/spark41/src/main/scala/org/apache/spark/SparkContextUtils.scala 
b/shims/spark41/src/main/scala/org/apache/spark/SparkContextUtils.scala
index 3cbf2b602d..41e09c6466 100644
--- a/shims/spark41/src/main/scala/org/apache/spark/SparkContextUtils.scala
+++ b/shims/spark41/src/main/scala/org/apache/spark/SparkContextUtils.scala
@@ -17,6 +17,8 @@
 package org.apache.spark
 
 import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.{RDD, SQLPartitioningAwareUnionRDD}
+import org.apache.spark.sql.vectorized.ColumnarBatch
 
 import scala.reflect.ClassTag
 
@@ -24,4 +26,11 @@ object SparkContextUtils {
   def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T] 
= {
     sc.broadcastInternal(value, serializedOnly = true)
   }
+
+  def createPartitioningAwareUnionRDD(
+      sc: SparkContext,
+      rdds: Seq[RDD[ColumnarBatch]],
+      numPartitions: Int): RDD[ColumnarBatch] = {
+    new SQLPartitioningAwareUnionRDD(sc, rdds, numPartitions)
+  }
 }
diff --git 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
index 53280a72cb..2ea814df27 100644
--- 
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
+++ 
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
@@ -69,7 +69,6 @@ abstract class Suite(
   sessionSwitcher.addDefaultConf("spark.sql.broadcastTimeout", "1800")
   sessionSwitcher.addDefaultConf("spark.network.io.preferDirectBufs", "false")
   sessionSwitcher.addDefaultConf("spark.unsafe.exceptionOnMemoryLeak", 
s"$errorOnMemLeak")
-  sessionSwitcher.addDefaultConf("spark.sql.unionOutputPartitioning", "false")
 
   if (dataSource() == "delta") {
     sessionSwitcher.addDefaultConf(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to