This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 5f4b1f0de5 [KYUUBI #7139] Fix Spark extension rules to support
RebalancePartitions
5f4b1f0de5 is described below
commit 5f4b1f0de513ad1085dcea8ab3d4911a82733b9e
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jul 18 11:46:36 2025 +0800
[KYUUBI #7139] Fix Spark extension rules to support RebalancePartitions
### Why are the changes needed?
As title.
### How was this patch tested?
UT are modified.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #7139 from pan3793/rebalance.
Closes #7139
edb070afd [Cheng Pan] fix
4d3984a92 [Cheng Pan] Fix Spark extension rules to support
RebalancePartitions
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala | 9 ---------
.../kyuubi/sql/RepartitionBeforeWritingBase.scala | 5 +++--
.../kyuubi/sql/zorder/InsertZorderBeforeWriting.scala | 6 +++---
.../apache/spark/sql/RebalanceBeforeWritingSuite.scala | 17 ++++++++++++++---
.../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala | 9 ---------
.../kyuubi/sql/RepartitionBeforeWritingBase.scala | 5 +++--
.../kyuubi/sql/zorder/InsertZorderBeforeWriting.scala | 6 +++---
.../apache/spark/sql/RebalanceBeforeWritingSuite.scala | 17 ++++++++++++++---
.../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala | 4 ++--
.../kyuubi/sql/zorder/InsertZorderBeforeWriting.scala | 6 +++---
.../apache/spark/sql/RebalanceBeforeWritingSuite.scala | 17 ++++++++++++++---
11 files changed, 59 insertions(+), 42 deletions(-)
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
index 3cbacdd2f0..2c6980fdb6 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
@@ -45,15 +45,6 @@ trait RepartitionBuilderWithRebalance extends
RepartitionBuilder {
}
}
}
-
- override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = {
- super.canInsertRepartitionByExpression(plan) && {
- plan match {
- case _: RebalancePartitions => false
- case _ => true
- }
- }
- }
}
/**
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
index ace98bb260..0e2e901265 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
@@ -126,6 +126,7 @@ trait RepartitionBeforeWriteHelper extends
Rule[LogicalPlan] {
case _: Window => true
case s: Sort if s.global => true
case _: RepartitionOperation => true
+ case _: RebalancePartitions => true
case _: GlobalLimit => true
case _ => false
}.isDefined
@@ -139,8 +140,8 @@ trait RepartitionBeforeWriteHelper extends
Rule[LogicalPlan] {
case SubqueryAlias(_, child) => canInsert(child)
case Limit(_, _) => false
case _: Sort => false
- case _: RepartitionByExpression => false
- case _: Repartition => false
+ case _: RepartitionOperation => false
+ case _: RebalancePartitions => false
case _ => true
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 4b2494bc84..e03e49b7d8 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -50,12 +50,12 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with
ZorderBuilder {
def canInsertZorder(query: LogicalPlan): Boolean = query match {
case Project(_, child) => canInsertZorder(child)
- case _: RepartitionByExpression | _: Repartition
+ case _: RepartitionOperation | _: RebalancePartitions
if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true
// TODO: actually, we can force zorder even if existed some shuffle
case _: Sort => false
- case _: RepartitionByExpression => false
- case _: Repartition => false
+ case _: RepartitionOperation => false
+ case _: RebalancePartitions => false
case _ => true
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index 2c75e476b1..1a77612346 100644
---
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -28,18 +28,21 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
test("check rebalance exists") {
- def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = {
+ def check(
+ df: => DataFrame,
+ expectedRebalanceNumEnabled: Int = 1,
+ expectedRebalanceNumDisabled: Int = 0): Unit = {
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key ->
"true") {
assert(
df.queryExecution.analyzed.collect {
case r: RebalancePartitions => r
- }.size == expectedRebalanceNum)
+ }.size == expectedRebalanceNumEnabled)
}
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key ->
"false") {
assert(
df.queryExecution.analyzed.collect {
case r: RebalancePartitions => r
- }.isEmpty)
+ }.size == expectedRebalanceNumDisabled)
}
}
@@ -69,6 +72,14 @@ class RebalanceBeforeWritingSuite extends
KyuubiSparkSQLExtensionTest {
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS
t(c1)"))
}
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+ check(
+ sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM
VALUES(1),(2),(3) AS t(c1)"),
+ 1,
+ 1)
+ }
+
withTable("tmp1", "tmp2") {
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
sql(s"CREATE TABLE tmp2 (c1 int) $storage")
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
index 3cbacdd2f0..2c6980fdb6 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
@@ -45,15 +45,6 @@ trait RepartitionBuilderWithRebalance extends
RepartitionBuilder {
}
}
}
-
- override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = {
- super.canInsertRepartitionByExpression(plan) && {
- plan match {
- case _: RebalancePartitions => false
- case _ => true
- }
- }
- }
}
/**
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
index e549d457af..f631c880cd 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
@@ -108,6 +108,7 @@ trait RepartitionBeforeWriteHelper extends
Rule[LogicalPlan] {
case _: Window => true
case s: Sort if s.global => true
case _: RepartitionOperation => true
+ case _: RebalancePartitions => true
case _: GlobalLimit => true
case _ => false
}.isDefined
@@ -121,8 +122,8 @@ trait RepartitionBeforeWriteHelper extends
Rule[LogicalPlan] {
case SubqueryAlias(_, child) => canInsert(child)
case Limit(_, _) => false
case _: Sort => false
- case _: RepartitionByExpression => false
- case _: Repartition => false
+ case _: RepartitionOperation => false
+ case _: RebalancePartitions => false
case _ => true
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 003ba6b68a..d96b546b94 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -49,12 +49,12 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with
ZorderBuilder {
def canInsertZorder(query: LogicalPlan): Boolean = query match {
case Project(_, child) => canInsertZorder(child)
- case _: RepartitionByExpression | _: Repartition
+ case _: RepartitionOperation | _: RebalancePartitions
if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true
// TODO: actually, we can force zorder even if existed some shuffle
case _: Sort => false
- case _: RepartitionByExpression => false
- case _: Repartition => false
+ case _: RepartitionOperation => false
+ case _: RebalancePartitions => false
case _ => true
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index 9e9328fd96..9aa75f53dc 100644
---
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -29,19 +29,22 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
test("check rebalance exists") {
- def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = {
+ def check(
+ df: => DataFrame,
+ expectedRebalanceNumEnabled: Int = 1,
+ expectedRebalanceNumDisabled: Int = 0): Unit = {
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key ->
"true") {
withListener(df) { write =>
assert(write.collect {
case r: RebalancePartitions => r
- }.size == expectedRebalanceNum)
+ }.size == expectedRebalanceNumEnabled)
}
}
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key ->
"false") {
withListener(df) { write =>
assert(write.collect {
case r: RebalancePartitions => r
- }.isEmpty)
+ }.size == expectedRebalanceNumDisabled)
}
}
}
@@ -72,6 +75,14 @@ class RebalanceBeforeWritingSuite extends
KyuubiSparkSQLExtensionTest {
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS
t(c1)"))
}
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+ check(
+ sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM
VALUES(1),(2),(3) AS t(c1)"),
+ 1,
+ 1)
+ }
+
withTable("tmp1", "tmp2") {
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
sql(s"CREATE TABLE tmp2 (c1 int) $storage")
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
index 25ec7240d1..978d86038c 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
@@ -39,6 +39,7 @@ trait RebalanceBeforeWritingBase extends Rule[LogicalPlan] {
case _: Window => true
case s: Sort if s.global => true
case _: RepartitionOperation => true
+ case _: RebalancePartitions => true
case _: GlobalLimit => true
case _ => false
}
@@ -53,8 +54,7 @@ trait RebalanceBeforeWritingBase extends Rule[LogicalPlan] {
case SubqueryAlias(_, child) => canInsert(child)
case Limit(_, _) => false
case _: Sort => false
- case _: RepartitionByExpression => false
- case _: Repartition => false
+ case _: RepartitionOperation => false
case _: RebalancePartitions => false
case _ => true
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index fca03532c9..2b19bef716 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -57,12 +57,12 @@ trait InsertZorderBeforeWritingBase extends
Rule[LogicalPlan] {
def canInsertZorder(query: LogicalPlan): Boolean = query match {
case Project(_, child) => canInsertZorder(child)
- case _: RepartitionByExpression | _: Repartition
+ case _: RepartitionOperation | _: RebalancePartitions
if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true
// TODO: actually, we can force zorder even if existed some shuffle
case _: Sort => false
- case _: RepartitionByExpression => false
- case _: Repartition => false
+ case _: RepartitionOperation => false
+ case _: RebalancePartitions => false
case _ => true
}
diff --git
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index 6fb63b0f1e..d58e9d7895 100644
---
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -30,19 +30,22 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
test("check rebalance exists") {
- def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = {
+ def check(
+ df: => DataFrame,
+ expectedRebalanceNumEnabled: Int = 1,
+ expectedRebalanceNumDisabled: Int = 0): Unit = {
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key ->
"true") {
withListener(df) { write =>
assert(write.collect {
case r: RebalancePartitions => r
- }.size == expectedRebalanceNum)
+ }.size == expectedRebalanceNumEnabled)
}
}
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key ->
"false") {
withListener(df) { write =>
assert(write.collect {
case r: RebalancePartitions => r
- }.isEmpty)
+ }.size == expectedRebalanceNumDisabled)
}
}
}
@@ -73,6 +76,14 @@ class RebalanceBeforeWritingSuite extends
KyuubiSparkSQLExtensionTest {
check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS
t(c1)"))
}
+ withTable("tmp1") {
+ sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+ check(
+ sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM
VALUES(1),(2),(3) AS t(c1)"),
+ 1,
+ 1)
+ }
+
withTable("tmp1", "tmp2") {
sql(s"CREATE TABLE tmp1 (c1 int) $storage")
sql(s"CREATE TABLE tmp2 (c1 int) $storage")