This is an automated email from the ASF dual-hosted git repository.
changchen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 89d467a6a1 [GLUTEN-11400][CORE] Implement partitioning-aware union
for ColumnarUnionExec (#11455)
89d467a6a1 is described below
commit 89d467a6a105dd396b079639e7eb5033f7b32fdb
Author: Chang Chen <[email protected]>
AuthorDate: Fri Jan 23 19:40:13 2026 +0800
[GLUTEN-11400][CORE] Implement partitioning-aware union for
ColumnarUnionExec (#11455)
* Refactor doExecute method signature in BasicPhysicalOperatorTransformer
* Spark: Implement partitioning-aware union for ColumnarUnionExec
* Revert "[4.1.0] Add `spark.sql.unionOutputPartitioning=false` to Maven
test args in workflows"
* Spark: Add Gluten tests for SPARK-52921 union partitioning
* Spark: Add test for broadcast exchange metrics reset behavior in
GlutenBroadcastExchangeSuite
* Adding spark.sql.unionOutputPartitioning=false when run with RAS.
* Spark: Propagate outputOrdering and outputPartitioning to
ColumnarRangeExec
* Spark: Refactor RangeExec transformers to pass LogicalRange instead of
individual fields
* fix ut
* fix clickhouse build
---
.github/workflows/velox_backend_x86.yml | 12 ++-
.../clickhouse/CHSparkPlanExecApi.scala | 11 +-
.../gluten/execution/CHRangeExecTransformer.scala | 20 ++--
.../backendsapi/velox/VeloxSparkPlanExecApi.scala | 11 +-
.../gluten/execution/ColumnarRangeExec.scala | 55 +++++-----
.../gluten/backendsapi/SparkPlanExecApi.scala | 9 +-
.../BasicPhysicalOperatorTransformer.scala | 40 +++++--
.../gluten/execution/JoinExecTransformer.scala | 7 ++
.../columnar/offload/OffloadSingleNodeRules.scala | 13 +--
.../sql}/execution/RangeExecBaseTransformer.scala | 52 ++++-----
.../sql/execution/GlutenQueryExecutionSuite.scala | 2 +-
.../sql/execution/GlutenQueryExecutionSuite.scala | 2 +-
.../sql/execution/GlutenQueryExecutionSuite.scala | 2 +-
.../sql/execution/GlutenQueryExecutionSuite.scala | 2 +-
.../gluten/utils/velox/VeloxTestSettings.scala | 6 +-
.../sql/GlutenDataFrameSetOperationsSuite.scala | 116 +++++++++++++++++++++
.../execution/GlutenBroadcastExchangeSuite.scala | 21 +++-
.../sql/execution/GlutenQueryExecutionSuite.scala | 2 +-
.../sql/execution/GlutenSQLRangeExecSuite.scala | 1 -
.../scala/org/apache/spark/SparkContextUtils.scala | 13 ++-
.../scala/org/apache/spark/SparkContextUtils.scala | 10 ++
.../scala/org/apache/spark/SparkContextUtils.scala | 10 ++
.../scala/org/apache/spark/SparkContextUtils.scala | 10 ++
.../scala/org/apache/spark/SparkContextUtils.scala | 9 ++
.../org/apache/gluten/integration/Suite.scala | 1 -
25 files changed, 299 insertions(+), 138 deletions(-)
diff --git a/.github/workflows/velox_backend_x86.yml
b/.github/workflows/velox_backend_x86.yml
index 14fb70e6dd..f324ab5596 100644
--- a/.github/workflows/velox_backend_x86.yml
+++ b/.github/workflows/velox_backend_x86.yml
@@ -286,12 +286,16 @@ jobs:
run: |
echo "JAVA_HOME: $JAVA_HOME"
cd $GITHUB_WORKSPACE/tools/gluten-it
+ SPARK41_CONF=""
+ if [ "${{ matrix.spark }}" = "spark-4.1" ]; then
+ SPARK41_CONF="--extra-conf=spark.sql.unionOutputPartitioning=false"
+ fi
GLUTEN_IT_JVM_ARGS=-Xmx5G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=h --error-on-memleak
--off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
- --extra-conf=spark.gluten.ras.enabled=true \
+ --extra-conf=spark.gluten.ras.enabled=true $SPARK41_CONF \
&& GLUTEN_IT_JVM_ARGS=-Xmx6G sbin/gluten-it.sh queries-compare \
--local --preset=velox --benchmark-type=ds --error-on-memleak
--off-heap-size=10g -s=1.0 --threads=16 --iterations=1 \
- --extra-conf=spark.gluten.ras.enabled=true
+ --extra-conf=spark.gluten.ras.enabled=true $SPARK41_CONF
tpc-test-centos7:
needs: build-native-lib-centos-7
@@ -1440,7 +1444,7 @@ jobs:
export PATH=$JAVA_HOME/bin:$PATH
java -version
$MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17
-Pbackends-velox \
- -Pspark-ut
-DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/
-Dspark.sql.unionOutputPartitioning=false" \
+ -Pspark-ut
-DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
- name: Upload test report
if: always()
@@ -1489,7 +1493,7 @@ jobs:
export PATH=$JAVA_HOME/bin:$PATH
java -version
$MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17
-Pbackends-velox -Pspark-ut \
- -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/
-Dspark.sql.unionOutputPartitioning=false" \
+ -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
-DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest
- name: Upload test report
if: always()
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
index 68ada074e0..9208b48740 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala
@@ -978,15 +978,8 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with
Logging {
override def genColumnarTailExec(limit: Int, child: SparkPlan):
ColumnarCollectTailBaseExec =
CHColumnarCollectTailExec(limit, child)
- override def genColumnarRangeExec(
- start: Long,
- end: Long,
- step: Long,
- numSlices: Int,
- numElements: BigInt,
- outputAttributes: Seq[Attribute],
- child: Seq[SparkPlan]): ColumnarRangeBaseExec =
- CHRangeExecTransformer(start, end, step, numSlices, numElements,
outputAttributes, child)
+ override def genColumnarRangeExec(rangeExec: RangeExec):
ColumnarRangeBaseExec =
+ CHRangeExecTransformer(rangeExec.range)
override def expressionFlattenSupported(expr: Expression): Boolean = expr
match {
case ca: FlattenedAnd => CHFlattenedExpression.supported(ca.name)
diff --git
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
index bd8ef3f6f4..6b3b7da72f 100644
---
a/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
+++
b/backends-clickhouse/src/main/scala/org/apache/gluten/execution/CHRangeExecTransformer.scala
@@ -26,8 +26,8 @@ import org.apache.gluten.substrait.rel.{RelBuilder, SplitInfo}
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.Partition
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Range => LogicalRange}
+import org.apache.spark.sql.execution.{ColumnarRangeBaseExec, SparkPlan}
import
org.apache.spark.sql.execution.datasources.clickhouse.ExtensionTableBuilder
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -36,17 +36,14 @@ import io.substrait.proto.NamedStruct
import scala.collection.JavaConverters;
-case class CHRangeExecTransformer(
- start: Long,
- end: Long,
- step: Long,
- numSlices: Int,
- numElements: BigInt,
- outputAttributes: Seq[Attribute],
- child: Seq[SparkPlan])
- extends ColumnarRangeBaseExec(start, end, step, numSlices, numElements,
outputAttributes, child)
+case class CHRangeExecTransformer(range: LogicalRange)
+ extends ColumnarRangeBaseExec
with LeafTransformSupport {
+ override def doCanonicalize(): SparkPlan = {
+ CHRangeExecTransformer(range.canonicalized.asInstanceOf[LogicalRange])
+ }
+
override def getSplitInfos: Seq[SplitInfo] = {
(0 until numSlices).map {
sliceIndex =>
@@ -83,7 +80,6 @@ case class CHRangeExecTransformer(
override protected def doValidateInternal(): ValidationResult =
ValidationResult.succeeded
override def doTransform(context: SubstraitContext): TransformContext = {
- val output = outputAttributes
val typeNodes = ConverterUtils.collectAttributeTypeNodes(output)
val nameList = ConverterUtils.collectAttributeNamesWithoutExprId(output)
val columnTypeNodes = JavaConverters
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
index 10a7f6f3ea..69419deb1a 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala
@@ -1028,15 +1028,8 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
offset: Int): ColumnarCollectLimitBaseExec =
ColumnarCollectLimitExec(limit, child, offset)
- override def genColumnarRangeExec(
- start: Long,
- end: Long,
- step: Long,
- numSlices: Int,
- numElements: BigInt,
- outputAttributes: Seq[Attribute],
- child: Seq[SparkPlan]): ColumnarRangeBaseExec =
- ColumnarRangeExec(start, end, step, numSlices, numElements,
outputAttributes, child)
+ override def genColumnarRangeExec(rangeExec: RangeExec):
ColumnarRangeBaseExec =
+ ColumnarRangeExec(rangeExec.range)
override def genColumnarTailExec(limit: Int, child: SparkPlan):
ColumnarCollectTailBaseExec =
ColumnarCollectTailExec(limit, child)
diff --git
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
index 58a64d891f..cbdf8b924b 100644
---
a/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
+++
b/backends-velox/src/main/scala/org/apache/gluten/execution/ColumnarRangeExec.scala
@@ -22,8 +22,10 @@ import org.apache.gluten.iterator.Iterators
import org.apache.gluten.vectorized.ArrowWritableColumnVector
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.catalyst.expressions.SortOrder
+import org.apache.spark.sql.catalyst.plans.logical.{Range => LogicalRange}
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning,
RangePartitioning, SinglePartition, UnknownPartitioning}
+import org.apache.spark.sql.execution.{ColumnarRangeBaseExec, SparkPlan}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnVector}
/**
@@ -31,37 +33,35 @@ import org.apache.spark.sql.vectorized.{ColumnarBatch,
ColumnVector}
* operation and supports columnar processing. It generates columnar batches
for the specified
* range.
*
- * @param start
- * Starting value of the range.
- * @param end
- * Ending value of the range.
- * @param step
- * Step size for the range.
- * @param numSlices
- * Number of slices for partitioning the range.
- * @param numElements
- * Total number of elements in the range.
- * @param outputAttributes
- * Attributes defining the output schema of the operator.
- * @param child
- * Child SparkPlan nodes for this operator, if any.
+ * @param range
+ * The logical Range plan containing start, end, step, and numSlices
information.
*/
-case class ColumnarRangeExec(
- start: Long,
- end: Long,
- step: Long,
- numSlices: Int,
- numElements: BigInt,
- outputAttributes: Seq[Attribute],
- child: Seq[SparkPlan]
-) extends ColumnarRangeBaseExec(start, end, step, numSlices, numElements,
outputAttributes, child) {
+case class ColumnarRangeExec(range: LogicalRange) extends
ColumnarRangeBaseExec {
+
+ override def outputOrdering: Seq[SortOrder] = range.outputOrdering
+
+ override def outputPartitioning: Partitioning = {
+ if (numElements > 0) {
+ if (numSlices == 1) {
+ SinglePartition
+ } else {
+ RangePartitioning(outputOrdering, numSlices)
+ }
+ } else {
+ UnknownPartitioning(0)
+ }
+ }
+
+ override def doCanonicalize(): SparkPlan = {
+ ColumnarRangeExec(range.canonicalized.asInstanceOf[LogicalRange])
+ }
override def batchType(): Convention.BatchType = {
ArrowJavaBatchType
}
override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
- if (start == end || (start < end ^ 0 < step)) {
+ if (isEmptyRange) {
sparkContext.emptyRDD[ColumnarBatch]
} else {
sparkContext
@@ -77,9 +77,6 @@ case class ColumnarRangeExec(
else if (value > 0) Long.MaxValue
else Long.MinValue
- val partitionStart = getSafeMargin(safePartitionStart)
- val partitionEnd = getSafeMargin(safePartitionEnd)
-
/**
* Generates the columnar batches for the specified range. Each
batch contains a subset
* of the range values, managed using Arrow column vectors.
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
index 980c04e99e..002c8dad7b 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
@@ -726,14 +726,7 @@ trait SparkPlanExecApi {
plan: SparkPlan,
offset: Int): ColumnarCollectLimitBaseExec
- def genColumnarRangeExec(
- start: Long,
- end: Long,
- step: Long,
- numSlices: Int,
- numElements: BigInt,
- outputAttributes: Seq[Attribute],
- child: Seq[SparkPlan]): ColumnarRangeBaseExec
+ def genColumnarRangeExec(rangeExec: RangeExec): ColumnarRangeBaseExec
def genColumnarTailExec(limit: Int, plan: SparkPlan):
ColumnarCollectTailBaseExec
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
index 93d6201c73..57e7373fc3 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicPhysicalOperatorTransformer.scala
@@ -24,9 +24,12 @@ import org.apache.gluten.metrics.MetricsUpdater
import org.apache.gluten.substrait.SubstraitContext
import org.apache.gluten.substrait.rel.{RelBuilder, RelNode}
+import org.apache.spark.SparkContextUtils
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.physical.{Partitioning,
UnknownPartitioning}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
import org.apache.spark.sql.utils.StructTypeFWD
@@ -233,7 +236,9 @@ abstract class ProjectExecTransformerBase(val list:
Seq[NamedExpression], val in
}
// An alternative for UnionExec.
-case class ColumnarUnionExec(children: Seq[SparkPlan]) extends ValidatablePlan
{
+case class ColumnarUnionExec(children: Seq[SparkPlan], partitioning:
Partitioning)
+ extends ValidatablePlan {
+ require(children.nonEmpty, "ColumnarUnionExec requires at least one child.")
children.foreach {
case w: WholeStageTransformer =>
// FIXME: Avoid such practice for plan immutability.
@@ -245,6 +250,8 @@ case class ColumnarUnionExec(children: Seq[SparkPlan])
extends ValidatablePlan {
override def rowType0(): Convention.RowType = Convention.RowType.None
+ override def outputPartitioning: Partitioning = partitioning
+
override def output: Seq[Attribute] = {
children.map(_.output).transpose.map {
attrs =>
@@ -265,19 +272,32 @@ case class ColumnarUnionExec(children: Seq[SparkPlan])
extends ValidatablePlan {
newChildren: IndexedSeq[SparkPlan]): ColumnarUnionExec =
copy(children = newChildren)
- def columnarInputRDD: RDD[ColumnarBatch] = {
- if (children.isEmpty) {
- throw new IllegalArgumentException(s"Empty children")
- }
- sparkContext.union(children.map(c => c.executeColumnar()))
+ override protected def doExecute(): RDD[InternalRow] = {
+ throw new UnsupportedOperationException(s"This operator doesn't support
doExecute().")
}
- override protected def doExecute()
- : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
- throw new UnsupportedOperationException(s"This operator doesn't support
doExecute().")
+ override protected def doExecuteColumnar(): RDD[ColumnarBatch] = {
+ if (outputPartitioning.isInstanceOf[UnknownPartitioning]) {
+ sparkContext.union(children.map(c => c.executeColumnar()))
+ } else {
+ // This union has a known partitioning, i.e., its children have the same
partitioning
+ // in semantics so this union can choose not to change the partitioning
by using a
+ // custom partitioning aware union RDD.
+ val nonEmptyRdds =
children.map(_.executeColumnar()).filter(!_.partitions.isEmpty)
+ SparkContextUtils.createPartitioningAwareUnionRDD(
+ sparkContext,
+ nonEmptyRdds,
+ outputPartitioning.numPartitions)
+ }
}
+}
- override protected def doExecuteColumnar(): RDD[ColumnarBatch] =
columnarInputRDD
+object ColumnarUnionExec {
+ def from(union: UnionExec): ColumnarUnionExec = {
+ val children = union.children
+ val outputPartitioning: Partitioning = union.outputPartitioning
+ ColumnarUnionExec(children, outputPartitioning)
+ }
}
/**
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
index 49af447a35..4634d7c1fb 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/execution/JoinExecTransformer.scala
@@ -375,4 +375,11 @@ abstract class BroadcastHashJoinExecTransformerBase(
override def genJoinParametersInternal(): (Int, Int, String) = {
(1, if (isNullAwareAntiJoin) 1 else 0, buildHashTableId)
}
+
+ override def resetMetrics(): Unit = {
+ // see https://github.com/apache/spark/pull/51673
+ // no-op
+ // BroadcastExchangeExec after materialized won't be materialized again,
so we should not
+ // reset the metrics. Otherwise, we will lose the metrics collected in the
broadcast job.
+ }
}
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
index 93ee65c255..df32527b78 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
+++
b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala
@@ -225,9 +225,8 @@ object OffloadOthers {
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
HashAggregateExecBaseTransformer.from(plan)
case plan: UnionExec =>
- val children = plan.children
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
- ColumnarUnionExec(children)
+ ColumnarUnionExec.from(plan)
case plan: ExpandExec =>
val child = plan.child
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
@@ -322,15 +321,7 @@ object OffloadOthers {
}
case plan: RangeExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
- BackendsApiManager.getSparkPlanExecApiInstance.genColumnarRangeExec(
- plan.start,
- plan.end,
- plan.step,
- plan.numSlices,
- plan.numElements,
- plan.output,
- plan.children
- )
+ ColumnarRangeBaseExec.from(plan)
case plan: SampleExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
val child = plan.child
diff --git
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/RangeExecBaseTransformer.scala
similarity index 57%
rename from
gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala
rename to
gluten-substrait/src/main/scala/org/apache/spark/sql/execution/RangeExecBaseTransformer.scala
index 4e3888a25c..653b93883a 100644
---
a/gluten-substrait/src/main/scala/org/apache/gluten/execution/RangeExecBaseTransformer.scala
+++
b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/RangeExecBaseTransformer.scala
@@ -14,37 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.gluten.execution
+package org.apache.spark.sql.execution
import org.apache.gluten.backendsapi.BackendsApiManager
+import org.apache.gluten.execution.ValidatablePlan
import org.apache.gluten.extension.columnar.transition.Convention
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.execution.{LeafExecNode, RangeExec, SparkPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Range => LogicalRange}
-/**
- * Base class for RangeExec transformation, can be implemented by the by
supported backends.
- * Currently velox is supported.
- */
-abstract class ColumnarRangeBaseExec(
- start: Long,
- end: Long,
- step: Long,
- numSlices: Int,
- numElements: BigInt,
- outputAttributes: Seq[Attribute],
- child: Seq[SparkPlan])
- extends LeafExecNode
- with ValidatablePlan {
+/** Base class for [[RangeExec]] transformation that can be implemented by
supported backends. */
+abstract class ColumnarRangeBaseExec extends LeafExecNode with ValidatablePlan
{
+
+ def range: LogicalRange
+
+ val start: Long = range.start
+ val end: Long = range.end
+ val step: Long = range.step
+ val numElements: BigInt = range.numElements
+ val numSlices: Int =
range.numSlices.getOrElse(session.leafNodeDefaultParallelism)
+ val isEmptyRange: Boolean = start == end || (start < end ^ 0 < step)
+
+ override def output: Seq[Attribute] = range.output
- override def output: Seq[Attribute] = {
- outputAttributes
+ override def simpleString(maxFields: Int): String = {
+ s"ColumnarRange $start, $end, $step, $numSlices, $numElements, " +
+ s"${output.mkString("[", ", ", "]")}"
}
override def rowType0(): Convention.RowType = Convention.RowType.None
- override protected def doExecute()
- : org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow] = {
+ override protected def doExecute(): RDD[InternalRow] = {
throw new UnsupportedOperationException(s"This operator doesn't support
doExecute().")
}
}
@@ -56,14 +58,6 @@ abstract class ColumnarRangeBaseExec(
object ColumnarRangeBaseExec {
def from(rangeExec: RangeExec): ColumnarRangeBaseExec = {
BackendsApiManager.getSparkPlanExecApiInstance
- .genColumnarRangeExec(
- rangeExec.start,
- rangeExec.end,
- rangeExec.step,
- rangeExec.numSlices,
- rangeExec.numElements,
- rangeExec.output,
- rangeExec.children
- )
+ .genColumnarRangeExec(rangeExec)
}
}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
index 2819584558..005386634b 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
@@ -69,7 +69,7 @@ class GlutenQueryExecutionSuite extends QueryExecutionSuite
with GlutenSQLTestsB
"",
"(1) ColumnarRange",
"Output [1]: [id#xL]",
- "Arguments: 0, 10, 1, 2, 10, [id#xL]",
+ "Arguments: Range (0, 10, step=1, splits=Some(2))",
"",
"(2) ColumnarToRow [codegen id : 1]",
"Input [1]: [id#xL]",
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
index 2819584558..005386634b 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
@@ -69,7 +69,7 @@ class GlutenQueryExecutionSuite extends QueryExecutionSuite
with GlutenSQLTestsB
"",
"(1) ColumnarRange",
"Output [1]: [id#xL]",
- "Arguments: 0, 10, 1, 2, 10, [id#xL]",
+ "Arguments: Range (0, 10, step=1, splits=Some(2))",
"",
"(2) ColumnarToRow [codegen id : 1]",
"Input [1]: [id#xL]",
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
index 2819584558..005386634b 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
@@ -69,7 +69,7 @@ class GlutenQueryExecutionSuite extends QueryExecutionSuite
with GlutenSQLTestsB
"",
"(1) ColumnarRange",
"Output [1]: [id#xL]",
- "Arguments: 0, 10, 1, 2, 10, [id#xL]",
+ "Arguments: Range (0, 10, step=1, splits=Some(2))",
"",
"(2) ColumnarToRow [codegen id : 1]",
"Input [1]: [id#xL]",
diff --git
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
index 2819584558..005386634b 100644
---
a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
+++
b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
@@ -69,7 +69,7 @@ class GlutenQueryExecutionSuite extends QueryExecutionSuite
with GlutenSQLTestsB
"",
"(1) ColumnarRange",
"Output [1]: [id#xL]",
- "Arguments: 0, 10, 1, 2, 10, [id#xL]",
+ "Arguments: Range (0, 10, step=1, splits=Some(2))",
"",
"(2) ColumnarToRow [codegen id : 1]",
"Input [1]: [id#xL]",
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 050ee84f02..9cf46bf1fa 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -604,8 +604,7 @@ class VeloxTestSettings extends BackendTestSettings {
enableSuite[GlutenOuterJoinSuiteForceShjOff]
enableSuite[GlutenFallbackStrategiesSuite]
enableSuite[GlutenBroadcastExchangeSuite]
- // TODO: fix on Spark-4.1 introduced by see
https://github.com/apache/spark/pull/51623
- .exclude("SPARK-52962: broadcast exchange should not reset metrics")
+ .exclude("SPARK-52962: broadcast exchange should not reset metrics") //
Add Gluten test
enableSuite[GlutenLocalBroadcastExchangeSuite]
enableSuite[GlutenCoalesceShufflePartitionsSuite]
// Rewrite for Gluten. Change details are in the inline comments in
individual tests.
@@ -751,8 +750,7 @@ class VeloxTestSettings extends BackendTestSettings {
// Result depends on the implementation for nondeterministic expression
rand.
// Not really an issue.
.exclude("SPARK-10740: handle nondeterministic expressions correctly for
set operations")
- // TODO: fix on Spark-4.1
- .excludeByPrefix("SPARK-52921") // see
https://github.com/apache/spark/pull/51623
+ .excludeByPrefix("SPARK-52921") // Add Gluten test
enableSuite[GlutenDataFrameStatSuite]
enableSuite[GlutenDataFrameSuite]
// Rewrite these tests because it checks Spark's physical operators.
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameSetOperationsSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameSetOperationsSuite.scala
index fe7958b677..51ac0a15e7 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameSetOperationsSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/GlutenDataFrameSetOperationsSuite.scala
@@ -16,14 +16,130 @@
*/
package org.apache.spark.sql
+import org.apache.gluten.execution.ColumnarUnionExec
+
import org.apache.spark.SparkConf
+import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
+import org.apache.spark.sql.execution.ColumnarShuffleExchangeExec
+import org.apache.spark.sql.execution.exchange.ReusedExchangeExec
+import org.apache.spark.sql.internal.SQLConf
class GlutenDataFrameSetOperationsSuite
extends DataFrameSetOperationsSuite
with GlutenSQLTestsTrait {
+
override def sparkConf: SparkConf =
super.sparkConf
.set("spark.gluten.sql.columnar.backend.ch.enable.coalesce.project.union",
"false")
.set("spark.gluten.sql.columnar.backend.ch.enable.coalesce.aggregation.union",
"false")
+ .set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
+
+ import testImplicits._
+
+ testGluten("SPARK-52921: union partitioning - reused shuffle") {
+ withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
+ val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b",
"c")
+ val df2 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b",
"c")
+
+ val union = df1.repartition($"a").union(df2.repartition($"a"))
+ val unionExec = union.queryExecution.executedPlan.collect { case u:
ColumnarUnionExec => u }
+ assert(unionExec.size == 1)
+
+ val shuffle = df1.repartition($"a").queryExecution.executedPlan.collect {
+ case s: ColumnarShuffleExchangeExec => s
+ }
+ assert(shuffle.size == 1)
+
+ val reuseShuffle = union.queryExecution.executedPlan.collect {
+ case r: ReusedExchangeExec => r
+ }
+ assert(reuseShuffle.size == 1)
+
+ val childPartitioning = shuffle.head.outputPartitioning
+ val partitioning = unionExec.head.outputPartitioning
+ assert(partitioning == childPartitioning)
+ }
+ }
+
+ testGluten("SPARK-52921: union partitioning - semantic equality") {
+ val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b",
"c")
+ val df2 = Seq((4, 1, 5), (2, 4, 6), (1, 4, 2), (3, 5, 1)).toDF("d", "e",
"f")
+
+ val correctResult = withSQLConf(SQLConf.UNION_OUTPUT_PARTITIONING.key ->
"false") {
+ df1.repartition($"a").union(df2.repartition($"d")).collect()
+ }
+
+ Seq(true, false).foreach {
+ enabled =>
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+ SQLConf.UNION_OUTPUT_PARTITIONING.key -> enabled.toString) {
+
+ val union = df1.repartition($"a").union(df2.repartition($"d"))
+ val unionExec = union.queryExecution.executedPlan.collect {
+ case u: ColumnarUnionExec => u
+ }
+ assert(unionExec.size == 1)
+
+ val shuffle =
df1.repartition($"a").queryExecution.executedPlan.collect {
+ case s: ColumnarShuffleExchangeExec => s
+ }
+ assert(shuffle.size == 1)
+
+ val childPartitioning = shuffle.head.outputPartitioning
+ val partitioning = unionExec.head.outputPartitioning
+ if (enabled) {
+ assert(partitioning == childPartitioning)
+ }
+
+ checkAnswer(union, correctResult)
+
+ // Avoid unnecessary shuffle if union output partitioning is enabled
+ val shuffledUnion = union.repartition($"a")
+ val shuffleNumBefore = union.queryExecution.executedPlan.collect {
+ case s: ColumnarShuffleExchangeExec => s
+ }
+ val shuffleNumAfter =
shuffledUnion.queryExecution.executedPlan.collect {
+ case s: ColumnarShuffleExchangeExec => s
+ }
+
+ if (enabled) {
+ assert(shuffleNumBefore.size == shuffleNumAfter.size)
+ } else {
+ assert(shuffleNumBefore.size + 1 == shuffleNumAfter.size)
+ }
+ checkAnswer(union, shuffledUnion)
+ }
+ }
+ }
+
+ testGluten("SPARK-52921: union partitioning - range partitioning") {
+ val df1 = Seq((1, 2, 4), (1, 3, 5), (2, 2, 3), (2, 4, 5)).toDF("a", "b",
"c")
+ val df2 = Seq((4, 1, 5), (2, 4, 6), (1, 4, 2), (3, 5, 1)).toDF("d", "e",
"f")
+
+ val correctResult = withSQLConf(SQLConf.UNION_OUTPUT_PARTITIONING.key ->
"false") {
+
df1.repartitionByRange($"a").union(df2.repartitionByRange($"d")).collect()
+ }
+
+ Seq(true, false).foreach {
+ enabled =>
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
+ SQLConf.UNION_OUTPUT_PARTITIONING.key -> enabled.toString) {
+
+ val union =
df1.repartitionByRange($"a").union(df2.repartitionByRange($"d"))
+ val unionExec = union.queryExecution.executedPlan.collect {
+ case u: ColumnarUnionExec => u
+ }
+ assert(unionExec.size == 1)
+
+ // For range partitioning, even children have the same partitioning,
+ // the union output partitioning is still UnknownPartitioning.
+ val partitioning = unionExec.head.outputPartitioning
+ assert(partitioning.isInstanceOf[UnknownPartitioning])
+ checkAnswer(union, correctResult)
+ }
+ }
+ }
}
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
index e689c2a3c1..c3f7f80283 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenBroadcastExchangeSuite.scala
@@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution
+import org.apache.gluten.execution.BroadcastHashJoinExecTransformer
+
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext,
SparkFunSuite}
import org.apache.spark.broadcast.TorrentBroadcast
import org.apache.spark.sql.{GlutenSQLTestsBaseTrait, GlutenTestsBaseTrait}
@@ -23,7 +25,24 @@ import org.apache.spark.sql.classic.SparkSession
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.functions.broadcast
-class GlutenBroadcastExchangeSuite extends BroadcastExchangeSuite with
GlutenSQLTestsBaseTrait {}
+class GlutenBroadcastExchangeSuite extends BroadcastExchangeSuite with
GlutenSQLTestsBaseTrait {
+
+ testGluten("SPARK-52962: broadcast exchange should not reset metrics") {
+ val df = spark.range(1).toDF()
+ val joinDF = df.join(broadcast(df), "id")
+ joinDF.collect()
+ val broadcastExchangeExec = collect(joinDF.queryExecution.executedPlan) {
+ case p: BroadcastHashJoinExecTransformer => p
+ }
+ assert(broadcastExchangeExec.size == 1, "one and only
BroadcastHashJoinExecTransformer")
+
+ val broadcastExchangeNode = broadcastExchangeExec.head
+ val metrics = broadcastExchangeNode.metrics
+ assert(metrics("numOutputRows").value == 1)
+ broadcastExchangeNode.resetMetrics()
+ assert(metrics("numOutputRows").value == 1)
+ }
+}
// Additional tests run in 'local-cluster' mode.
class GlutenLocalBroadcastExchangeSuite
diff --git
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
index 2819584558..005386634b 100644
---
a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
+++
b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/execution/GlutenQueryExecutionSuite.scala
@@ -69,7 +69,7 @@ class GlutenQueryExecutionSuite extends QueryExecutionSuite
with GlutenSQLTestsB
"",
"(1) ColumnarRange",
"Output [1]: [id#xL]",
- "Arguments: 0, 10, 1, 2, 10, [id#xL]",
+ "Arguments: Range (0, 10, step=1, splits=Some(2))",
"",
"(2) ColumnarToRow [codegen id : 1]",
"Input [1]: [id#xL]",
diff --git
a/gluten-ut/test/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala
b/gluten-ut/test/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala
index 03663bc156..cd05692ab3 100644
---
a/gluten-ut/test/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala
+++
b/gluten-ut/test/src/test/scala/org/apache/spark/sql/execution/GlutenSQLRangeExecSuite.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.execution
import org.apache.gluten.config.GlutenConfig
-import org.apache.gluten.execution.ColumnarRangeBaseExec
import org.apache.gluten.utils.BackendTestUtils
import org.apache.spark.SparkConf
diff --git
a/shims/spark40/src/main/scala/org/apache/spark/SparkContextUtils.scala
b/shims/spark33/src/main/scala/org/apache/spark/SparkContextUtils.scala
similarity index 71%
copy from shims/spark40/src/main/scala/org/apache/spark/SparkContextUtils.scala
copy to shims/spark33/src/main/scala/org/apache/spark/SparkContextUtils.scala
index 3cbf2b602d..4e9e308dca 100644
--- a/shims/spark40/src/main/scala/org/apache/spark/SparkContextUtils.scala
+++ b/shims/spark33/src/main/scala/org/apache/spark/SparkContextUtils.scala
@@ -16,12 +16,15 @@
*/
package org.apache.spark
-import org.apache.spark.broadcast.Broadcast
-
-import scala.reflect.ClassTag
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.vectorized.ColumnarBatch
object SparkContextUtils {
- def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T]
= {
- sc.broadcastInternal(value, serializedOnly = true)
+ def createPartitioningAwareUnionRDD(
+ sc: SparkContext,
+ rdds: Seq[RDD[ColumnarBatch]],
+ numPartitions: Int): RDD[ColumnarBatch] = {
+ throw new UnsupportedOperationException(
+ "SQLPartitioningAwareUnionRDD is only available in Spark 4.1+")
}
}
diff --git
a/shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala
b/shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala
index 3cbf2b602d..ec26d934c9 100644
--- a/shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala
+++ b/shims/spark34/src/main/scala/org/apache/spark/SparkContextUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.vectorized.ColumnarBatch
import scala.reflect.ClassTag
@@ -24,4 +26,12 @@ object SparkContextUtils {
def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T]
= {
sc.broadcastInternal(value, serializedOnly = true)
}
+
+ def createPartitioningAwareUnionRDD(
+ sc: SparkContext,
+ rdds: Seq[RDD[ColumnarBatch]],
+ numPartitions: Int): RDD[ColumnarBatch] = {
+ throw new UnsupportedOperationException(
+ "SQLPartitioningAwareUnionRDD is only available in Spark 4.1+")
+ }
}
diff --git
a/shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala
b/shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala
index 3cbf2b602d..ec26d934c9 100644
--- a/shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala
+++ b/shims/spark35/src/main/scala/org/apache/spark/SparkContextUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.vectorized.ColumnarBatch
import scala.reflect.ClassTag
@@ -24,4 +26,12 @@ object SparkContextUtils {
def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T]
= {
sc.broadcastInternal(value, serializedOnly = true)
}
+
+ def createPartitioningAwareUnionRDD(
+ sc: SparkContext,
+ rdds: Seq[RDD[ColumnarBatch]],
+ numPartitions: Int): RDD[ColumnarBatch] = {
+ throw new UnsupportedOperationException(
+ "SQLPartitioningAwareUnionRDD is only available in Spark 4.1+")
+ }
}
diff --git
a/shims/spark40/src/main/scala/org/apache/spark/SparkContextUtils.scala
b/shims/spark40/src/main/scala/org/apache/spark/SparkContextUtils.scala
index 3cbf2b602d..ec26d934c9 100644
--- a/shims/spark40/src/main/scala/org/apache/spark/SparkContextUtils.scala
+++ b/shims/spark40/src/main/scala/org/apache/spark/SparkContextUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.vectorized.ColumnarBatch
import scala.reflect.ClassTag
@@ -24,4 +26,12 @@ object SparkContextUtils {
def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T]
= {
sc.broadcastInternal(value, serializedOnly = true)
}
+
+ def createPartitioningAwareUnionRDD(
+ sc: SparkContext,
+ rdds: Seq[RDD[ColumnarBatch]],
+ numPartitions: Int): RDD[ColumnarBatch] = {
+ throw new UnsupportedOperationException(
+ "SQLPartitioningAwareUnionRDD is only available in Spark 4.1+")
+ }
}
diff --git
a/shims/spark41/src/main/scala/org/apache/spark/SparkContextUtils.scala
b/shims/spark41/src/main/scala/org/apache/spark/SparkContextUtils.scala
index 3cbf2b602d..41e09c6466 100644
--- a/shims/spark41/src/main/scala/org/apache/spark/SparkContextUtils.scala
+++ b/shims/spark41/src/main/scala/org/apache/spark/SparkContextUtils.scala
@@ -17,6 +17,8 @@
package org.apache.spark
import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.{RDD, SQLPartitioningAwareUnionRDD}
+import org.apache.spark.sql.vectorized.ColumnarBatch
import scala.reflect.ClassTag
@@ -24,4 +26,11 @@ object SparkContextUtils {
def broadcastInternal[T: ClassTag](sc: SparkContext, value: T): Broadcast[T]
= {
sc.broadcastInternal(value, serializedOnly = true)
}
+
+ def createPartitioningAwareUnionRDD(
+ sc: SparkContext,
+ rdds: Seq[RDD[ColumnarBatch]],
+ numPartitions: Int): RDD[ColumnarBatch] = {
+ new SQLPartitioningAwareUnionRDD(sc, rdds, numPartitions)
+ }
}
diff --git
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
index 53280a72cb..2ea814df27 100644
---
a/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
+++
b/tools/gluten-it/common/src/main/scala/org/apache/gluten/integration/Suite.scala
@@ -69,7 +69,6 @@ abstract class Suite(
sessionSwitcher.addDefaultConf("spark.sql.broadcastTimeout", "1800")
sessionSwitcher.addDefaultConf("spark.network.io.preferDirectBufs", "false")
sessionSwitcher.addDefaultConf("spark.unsafe.exceptionOnMemoryLeak",
s"$errorOnMemLeak")
- sessionSwitcher.addDefaultConf("spark.sql.unionOutputPartitioning", "false")
if (dataSource() == "delta") {
sessionSwitcher.addDefaultConf(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]