This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f4b1f0de5 [KYUUBI #7139] Fix Spark extension rules to support 
RebalancePartitions
5f4b1f0de5 is described below

commit 5f4b1f0de513ad1085dcea8ab3d4911a82733b9e
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jul 18 11:46:36 2025 +0800

    [KYUUBI #7139] Fix Spark extension rules to support RebalancePartitions
    
    ### Why are the changes needed?
    
    As title.
    
    ### How was this patch tested?
    
    UT are modified.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #7139 from pan3793/rebalance.
    
    Closes #7139
    
    edb070afd [Cheng Pan] fix
    4d3984a92 [Cheng Pan] Fix Spark extension rules to support 
RebalancePartitions
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala  |  9 ---------
 .../kyuubi/sql/RepartitionBeforeWritingBase.scala       |  5 +++--
 .../kyuubi/sql/zorder/InsertZorderBeforeWriting.scala   |  6 +++---
 .../apache/spark/sql/RebalanceBeforeWritingSuite.scala  | 17 ++++++++++++++---
 .../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala  |  9 ---------
 .../kyuubi/sql/RepartitionBeforeWritingBase.scala       |  5 +++--
 .../kyuubi/sql/zorder/InsertZorderBeforeWriting.scala   |  6 +++---
 .../apache/spark/sql/RebalanceBeforeWritingSuite.scala  | 17 ++++++++++++++---
 .../org/apache/kyuubi/sql/RebalanceBeforeWriting.scala  |  4 ++--
 .../kyuubi/sql/zorder/InsertZorderBeforeWriting.scala   |  6 +++---
 .../apache/spark/sql/RebalanceBeforeWritingSuite.scala  | 17 ++++++++++++++---
 11 files changed, 59 insertions(+), 42 deletions(-)

diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
index 3cbacdd2f0..2c6980fdb6 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
@@ -45,15 +45,6 @@ trait RepartitionBuilderWithRebalance extends 
RepartitionBuilder {
       }
     }
   }
-
-  override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = {
-    super.canInsertRepartitionByExpression(plan) && {
-      plan match {
-        case _: RebalancePartitions => false
-        case _ => true
-      }
-    }
-  }
 }
 
 /**
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
index ace98bb260..0e2e901265 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
@@ -126,6 +126,7 @@ trait RepartitionBeforeWriteHelper extends 
Rule[LogicalPlan] {
       case _: Window => true
       case s: Sort if s.global => true
       case _: RepartitionOperation => true
+      case _: RebalancePartitions => true
       case _: GlobalLimit => true
       case _ => false
     }.isDefined
@@ -139,8 +140,8 @@ trait RepartitionBeforeWriteHelper extends 
Rule[LogicalPlan] {
       case SubqueryAlias(_, child) => canInsert(child)
       case Limit(_, _) => false
       case _: Sort => false
-      case _: RepartitionByExpression => false
-      case _: Repartition => false
+      case _: RepartitionOperation => false
+      case _: RebalancePartitions => false
       case _ => true
     }
 
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 4b2494bc84..e03e49b7d8 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -50,12 +50,12 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with 
ZorderBuilder {
 
   def canInsertZorder(query: LogicalPlan): Boolean = query match {
     case Project(_, child) => canInsertZorder(child)
-    case _: RepartitionByExpression | _: Repartition
+    case _: RepartitionOperation | _: RebalancePartitions
         if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true
     // TODO: actually, we can force zorder even if existed some shuffle
     case _: Sort => false
-    case _: RepartitionByExpression => false
-    case _: Repartition => false
+    case _: RepartitionOperation => false
+    case _: RebalancePartitions => false
     case _ => true
   }
 
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index 2c75e476b1..1a77612346 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-3/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -28,18 +28,21 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
 class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
 
   test("check rebalance exists") {
-    def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = {
+    def check(
+        df: => DataFrame,
+        expectedRebalanceNumEnabled: Int = 1,
+        expectedRebalanceNumDisabled: Int = 0): Unit = {
       
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"true") {
         assert(
           df.queryExecution.analyzed.collect {
             case r: RebalancePartitions => r
-          }.size == expectedRebalanceNum)
+          }.size == expectedRebalanceNumEnabled)
       }
       
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"false") {
         assert(
           df.queryExecution.analyzed.collect {
             case r: RebalancePartitions => r
-          }.isEmpty)
+          }.size == expectedRebalanceNumDisabled)
       }
     }
 
@@ -69,6 +72,14 @@ class RebalanceBeforeWritingSuite extends 
KyuubiSparkSQLExtensionTest {
           check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS 
t(c1)"))
         }
 
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+          check(
+            sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM 
VALUES(1),(2),(3) AS t(c1)"),
+            1,
+            1)
+        }
+
         withTable("tmp1", "tmp2") {
           sql(s"CREATE TABLE tmp1 (c1 int) $storage")
           sql(s"CREATE TABLE tmp2 (c1 int) $storage")
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
index 3cbacdd2f0..2c6980fdb6 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
@@ -45,15 +45,6 @@ trait RepartitionBuilderWithRebalance extends 
RepartitionBuilder {
       }
     }
   }
-
-  override def canInsertRepartitionByExpression(plan: LogicalPlan): Boolean = {
-    super.canInsertRepartitionByExpression(plan) && {
-      plan match {
-        case _: RebalancePartitions => false
-        case _ => true
-      }
-    }
-  }
 }
 
 /**
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
index e549d457af..f631c880cd 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/RepartitionBeforeWritingBase.scala
@@ -108,6 +108,7 @@ trait RepartitionBeforeWriteHelper extends 
Rule[LogicalPlan] {
       case _: Window => true
       case s: Sort if s.global => true
       case _: RepartitionOperation => true
+      case _: RebalancePartitions => true
       case _: GlobalLimit => true
       case _ => false
     }.isDefined
@@ -121,8 +122,8 @@ trait RepartitionBeforeWriteHelper extends 
Rule[LogicalPlan] {
       case SubqueryAlias(_, child) => canInsert(child)
       case Limit(_, _) => false
       case _: Sort => false
-      case _: RepartitionByExpression => false
-      case _: Repartition => false
+      case _: RepartitionOperation => false
+      case _: RebalancePartitions => false
       case _ => true
     }
 
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index 003ba6b68a..d96b546b94 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -49,12 +49,12 @@ trait InsertZorderHelper extends Rule[LogicalPlan] with 
ZorderBuilder {
 
   def canInsertZorder(query: LogicalPlan): Boolean = query match {
     case Project(_, child) => canInsertZorder(child)
-    case _: RepartitionByExpression | _: Repartition
+    case _: RepartitionOperation | _: RebalancePartitions
         if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true
     // TODO: actually, we can force zorder even if existed some shuffle
     case _: Sort => false
-    case _: RepartitionByExpression => false
-    case _: Repartition => false
+    case _: RepartitionOperation => false
+    case _: RebalancePartitions => false
     case _ => true
   }
 
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index 9e9328fd96..9aa75f53dc 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-4/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -29,19 +29,22 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
 class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
 
   test("check rebalance exists") {
-    def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = {
+    def check(
+        df: => DataFrame,
+        expectedRebalanceNumEnabled: Int = 1,
+        expectedRebalanceNumDisabled: Int = 0): Unit = {
       
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"true") {
         withListener(df) { write =>
           assert(write.collect {
             case r: RebalancePartitions => r
-          }.size == expectedRebalanceNum)
+          }.size == expectedRebalanceNumEnabled)
         }
       }
       
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"false") {
         withListener(df) { write =>
           assert(write.collect {
             case r: RebalancePartitions => r
-          }.isEmpty)
+          }.size == expectedRebalanceNumDisabled)
         }
       }
     }
@@ -72,6 +75,14 @@ class RebalanceBeforeWritingSuite extends 
KyuubiSparkSQLExtensionTest {
           check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS 
t(c1)"))
         }
 
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+          check(
+            sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM 
VALUES(1),(2),(3) AS t(c1)"),
+            1,
+            1)
+        }
+
         withTable("tmp1", "tmp2") {
           sql(s"CREATE TABLE tmp1 (c1 int) $storage")
           sql(s"CREATE TABLE tmp2 (c1 int) $storage")
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
index 25ec7240d1..978d86038c 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/RebalanceBeforeWriting.scala
@@ -39,6 +39,7 @@ trait RebalanceBeforeWritingBase extends Rule[LogicalPlan] {
       case _: Window => true
       case s: Sort if s.global => true
       case _: RepartitionOperation => true
+      case _: RebalancePartitions => true
       case _: GlobalLimit => true
       case _ => false
     }
@@ -53,8 +54,7 @@ trait RebalanceBeforeWritingBase extends Rule[LogicalPlan] {
       case SubqueryAlias(_, child) => canInsert(child)
       case Limit(_, _) => false
       case _: Sort => false
-      case _: RepartitionByExpression => false
-      case _: Repartition => false
+      case _: RepartitionOperation => false
       case _: RebalancePartitions => false
       case _ => true
     }
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
index fca03532c9..2b19bef716 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/main/scala/org/apache/kyuubi/sql/zorder/InsertZorderBeforeWriting.scala
@@ -57,12 +57,12 @@ trait InsertZorderBeforeWritingBase extends 
Rule[LogicalPlan] {
 
   def canInsertZorder(query: LogicalPlan): Boolean = query match {
     case Project(_, child) => canInsertZorder(child)
-    case _: RepartitionByExpression | _: Repartition
+    case _: RepartitionOperation | _: RebalancePartitions
         if !conf.getConf(KyuubiSQLConf.ZORDER_GLOBAL_SORT_ENABLED) => true
     // TODO: actually, we can force zorder even if existed some shuffle
     case _: Sort => false
-    case _: RepartitionByExpression => false
-    case _: Repartition => false
+    case _: RepartitionOperation => false
+    case _: RebalancePartitions => false
     case _ => true
   }
 
diff --git 
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
index 6fb63b0f1e..d58e9d7895 100644
--- 
a/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
+++ 
b/extensions/spark/kyuubi-extension-spark-3-5/src/test/scala/org/apache/spark/sql/RebalanceBeforeWritingSuite.scala
@@ -30,19 +30,22 @@ import org.apache.kyuubi.sql.KyuubiSQLConf
 class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
 
   test("check rebalance exists") {
-    def check(df: => DataFrame, expectedRebalanceNum: Int = 1): Unit = {
+    def check(
+        df: => DataFrame,
+        expectedRebalanceNumEnabled: Int = 1,
+        expectedRebalanceNumDisabled: Int = 0): Unit = {
       
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"true") {
         withListener(df) { write =>
           assert(write.collect {
             case r: RebalancePartitions => r
-          }.size == expectedRebalanceNum)
+          }.size == expectedRebalanceNumEnabled)
         }
       }
       
withSQLConf(KyuubiSQLConf.INSERT_REPARTITION_BEFORE_WRITE_IF_NO_SHUFFLE.key -> 
"false") {
         withListener(df) { write =>
           assert(write.collect {
             case r: RebalancePartitions => r
-          }.isEmpty)
+          }.size == expectedRebalanceNumDisabled)
         }
       }
     }
@@ -73,6 +76,14 @@ class RebalanceBeforeWritingSuite extends 
KyuubiSparkSQLExtensionTest {
           check(sql("INSERT INTO TABLE tmp1 SELECT * FROM VALUES(1),(2),(3) AS 
t(c1)"))
         }
 
+        withTable("tmp1") {
+          sql(s"CREATE TABLE tmp1 (c1 int) $storage")
+          check(
+            sql("INSERT INTO TABLE tmp1 SELECT /*+ REBALANCE */ * FROM 
VALUES(1),(2),(3) AS t(c1)"),
+            1,
+            1)
+        }
+
         withTable("tmp1", "tmp2") {
           sql(s"CREATE TABLE tmp1 (c1 int) $storage")
           sql(s"CREATE TABLE tmp2 (c1 int) $storage")

Reply via email to