This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.10 by this push:
new 7af96688f7 [KYUUBI #7139] Fix Spark extension rules to support
RebalancePartitions
7af96688f7 is described below
commit 7af96688f705060be47658dcfafad50168a4c8a9
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jul 18 11:46:36 2025 +0800
[KYUUBI #7139] Fix Spark extension rules to support RebalancePartitions
As title.
UT are modified.
No.
Closes #7139 from pan3793/rebalance.
Closes #7139
edb070afd [Cheng Pan] fix
4d3984a92 [Cheng Pan] Fix Spark extension rules to support
RebalancePartitions
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
(cherry picked from commit 5f4b1f0de513ad1085dcea8ab3d4911a82733b9e)
Signed-off-by: Cheng Pan <[email protected]>
---
.../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala | 9 ---------
.../kyuubi/sql/RepartitionBeforeWritingBase.scala | 5 +++--
.../kyuubi/sql/zorder/InsertZorderBeforeWriting.scala | 6 +++---
.../apache/spark/sql/RebalanceBeforeWritingSuite.scala | 17 ++++++++++++++---
.../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala | 9 ---------
.../kyuubi/sql/RepartitionBeforeWritingBase.scala | 5 +++--
.../kyuubi/sql/zorder/InsertZorderBeforeWriting.scala | 6 +++---
.../apache/spark/sql/RebalanceBeforeWritingSuite.scala | 17 ++++++++++++++---
.../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala | 9 ---------
.../kyuubi/sql/RepartitionBeforeWritingBase.scala | 5 +++--
.../kyuubi/sql/zorder/InsertZorderBeforeWriting.scala | 6 +++---
.../apache/spark/sql/RebalanceBeforeWritingSuite.scala | 17 ++++++++++++++---
12 files changed, 60 insertions(+), 51 deletions(-)
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
index 3cbacdd2f0..2c6980fdb6 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
@@ -45,15 +45,6 @@ trait RepartitionBuilderWithRebalance extends
RepartitionBuilder {
}
}
}
-
- override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = {
- super.canInsertRepartitionByExpression(plan) && {
- plan match {
- case _: RebalancePartitions => false
- case _ => true
- }
- }
- }
}
/**
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
index 95f3529e29..5b825cc6ce 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
@@ -118,6 +118,7 @@ trait RepartitionBeforeWriteHelper extends
Rule[LogicalPlan] {
case _: Window => true
case s: Sort if s.global => true
case _: RepartitionOperation => true
+ case _: RebalancePartitions => true
case _: GlobalLimit => true
case _ => false
}.isDefined
@@ -131,8 +132,8 @@ trait RepartitionBeforeWriteHelper extends
Rule[LogicalPlan] {
case SubqueryAlias(_, child) => canInsert(child)
case Limit(_, _) => false
case _: Sort => false
- case _: RepartitionByExpression => false
- case _: Repartition => false
+ case _: RepartitionOperation => false
+ case _: RebalancePartitions => false
case _ => true
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 4b2494bc84..e03e49b7d8 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -50,12 +50,12 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with
ZorderBuilder {
def canInsertZorder(query: LogicalPlan): Boolean = query match {
case Project(_, child) => canInsertZorder(child)
- case _: RepartitionByExpression | _: Repartition
+ case _: RepartitionOperation | _: RebalancePartitions
if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true
// TODO: actually, we can force zorder even if existed some shuffle
case _: Sort => false
- case _: RepartitionByExpression => false
- case _: Repartition => false
+ case _: RepartitionOperation => false
+ case _: RebalancePartitions => false
case _ => true
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index c1295ca04a..0826d385d3 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -28,18 +28,21 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
test("check rebalance exists") {
- def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = {
+ def check(
+ df: => DataFrame,
+ expectedRebalanceNumEnabled: Int = 1,
+ expectedRebalanceNumDisabled: Int = 0): Unit = {
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key ->
"true") {
assert(
df.queryExecution.analyzed.collect {
case r: RebalancePartitions => r
- }.size == expectedRebalanceNum)
+ }.size == expectedRebalanceNumEnabled)
}
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key ->
"false") {
assert(
df.queryExecution.analyzed.collect {
case r: RebalancePartitions => r
- }.isEmpty)
+ }.size == expectedRebalanceNumDisabled)
}
}
@@ -69,6 +72,14 @@ class RebalanceBeforeWritingSuite extends
KyuubiSparkSQLExtensionTest {
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS
t(c1)"))
}
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+ check(
+ sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM
VALUES(1),(2),(3) AS t(c1)"),
+ 1,
+ 1)
+ }
+
withTable("tmp1", "tmp2") {
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
sql(s"CREATE TABLE tmp2 (c1 int) $storage")
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
index 3cbacdd2f0..2c6980fdb6 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
@@ -45,15 +45,6 @@ trait RepartitionBuilderWithRebalance extends
RepartitionBuilder {
}
}
}
-
- override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = {
- super.canInsertRepartitionByExpression(plan) && {
- plan match {
- case _: RebalancePartitions => false
- case _ => true
- }
- }
- }
}
/**
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
index 3ebb9740f5..e15c5a4580 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
@@ -99,6 +99,7 @@ trait RepartitionBeforeWriteHelper extends Rule[LogicalPlan] {
case _: Window => true
case s: Sort if s.global => true
case _: RepartitionOperation => true
+ case _: RebalancePartitions => true
case _: GlobalLimit => true
case _ => false
}.isDefined
@@ -112,8 +113,8 @@ trait RepartitionBeforeWriteHelper extends
Rule[LogicalPlan] {
case SubqueryAlias(_, child) => canInsert(child)
case Limit(_, _) => false
case _: Sort => false
- case _: RepartitionByExpression => false
- case _: Repartition => false
+ case _: RepartitionOperation => false
+ case _: RebalancePartitions => false
case _ => true
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 003ba6b68a..d96b546b94 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -49,12 +49,12 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with
ZorderBuilder {
def canInsertZorder(query: LogicalPlan): Boolean = query match {
case Project(_, child) => canInsertZorder(child)
- case _: RepartitionByExpression | _: Repartition
+ case _: RepartitionOperation | _: RebalancePartitions
if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true
// TODO: actually, we can force zorder even if existed some shuffle
case _: Sort => false
- case _: RepartitionByExpression => false
- case _: Repartition => false
+ case _: RepartitionOperation => false
+ case _: RebalancePartitions => false
case _ => true
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index f739634958..761fec37d9 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -29,19 +29,22 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
test("check rebalance exists") {
- def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = {
+ def check(
+ df: => DataFrame,
+ expectedRebalanceNumEnabled: Int = 1,
+ expectedRebalanceNumDisabled: Int = 0): Unit = {
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key ->
"true") {
withListener(df) { write =>
assert(write.collect {
case r: RebalancePartitions => r
- }.size == expectedRebalanceNum)
+ }.size == expectedRebalanceNumEnabled)
}
}
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key ->
"false") {
withListener(df) { write =>
assert(write.collect {
case r: RebalancePartitions => r
- }.isEmpty)
+ }.size == expectedRebalanceNumDisabled)
}
}
}
@@ -72,6 +75,14 @@ class RebalanceBeforeWritingSuite extends
KyuubiSparkSQLExtensionTest {
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS
t(c1)"))
}
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+ check(
+ sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM
VALUES(1),(2),(3) AS t(c1)"),
+ 1,
+ 1)
+ }
+
withTable("tmp1", "tmp2") {
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
sql(s"CREATE TABLE tmp2 (c1 int) $storage")
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
index 3cbacdd2f0..2c6980fdb6 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
@@ -45,15 +45,6 @@ trait RepartitionBuilderWithRebalance extends
RepartitionBuilder {
}
}
}
-
- override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = {
- super.canInsertRepartitionByExpression(plan) && {
- plan match {
- case _: RebalancePartitions => false
- case _ => true
- }
- }
- }
}
/**
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
index 3ebb9740f5..e15c5a4580 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
@@ -99,6 +99,7 @@ trait RepartitionBeforeWriteHelper extends Rule[LogicalPlan] {
case _: Window => true
case s: Sort if s.global => true
case _: RepartitionOperation => true
+ case _: RebalancePartitions => true
case _: GlobalLimit => true
case _ => false
}.isDefined
@@ -112,8 +113,8 @@ trait RepartitionBeforeWriteHelper extends
Rule[LogicalPlan] {
case SubqueryAlias(_, child) => canInsert(child)
case Limit(_, _) => false
case _: Sort => false
- case _: RepartitionByExpression => false
- case _: Repartition => false
+ case _: RepartitionOperation => false
+ case _: RebalancePartitions => false
case _ => true
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 003ba6b68a..d96b546b94 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -49,12 +49,12 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with
ZorderBuilder {
def canInsertZorder(query: LogicalPlan): Boolean = query match {
case Project(_, child) => canInsertZorder(child)
- case _: RepartitionByExpression | _: Repartition
+ case _: RepartitionOperation | _: RebalancePartitions
if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true
// TODO: actually, we can force zorder even if existed some shuffle
case _: Sort => false
- case _: RepartitionByExpression => false
- case _: Repartition => false
+ case _: RepartitionOperation => false
+ case _: RebalancePartitions => false
case _ => true
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index 46ba272011..d63e79996b 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -29,19 +29,22 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
test("check rebalance exists") {
- def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = {
+ def check(
+ df: => DataFrame,
+ expectedRebalanceNumEnabled: Int = 1,
+ expectedRebalanceNumDisabled: Int = 0): Unit = {
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key ->
"true") {
withListener(df) { write =>
assert(write.collect {
case r: RebalancePartitions => r
- }.size == expectedRebalanceNum)
+ }.size == expectedRebalanceNumEnabled)
}
}
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key ->
"false") {
withListener(df) { write =>
assert(write.collect {
case r: RebalancePartitions => r
- }.isEmpty)
+ }.size == expectedRebalanceNumDisabled)
}
}
}
@@ -72,6 +75,14 @@ class RebalanceBeforeWritingSuite extends
KyuubiSparkSQLExtensionTest {
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS
t(c1)"))
}
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+ check(
+ sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM
VALUES(1),(2),(3) AS t(c1)"),
+ 1,
+ 1)
+ }
+
withTable("tmp1", "tmp2") {
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
sql(s"CREATE TABLE tmp2 (c1 int) $storage")