This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d171ad8446 [spark] Fix rescale procedure to only read real buckets for 
postpone bucket table (#7184)
d171ad8446 is described below

commit d171ad8446003314fa22c0541fdcc1678e7405b5
Author: Juntao Zhang <[email protected]>
AuthorDate: Tue Feb 3 10:35:31 2026 +0800

    [spark] Fix rescale procedure to only read real buckets for postpone bucket 
table (#7184)
---
 docs/content/spark/procedures.md                     | 13 +++++++++++++
 .../paimon/spark/procedure/RescaleProcedure.java     |  2 +-
 .../spark/procedure/RescaleProcedureTest.scala       | 20 ++++++++++++++++++++
 3 files changed, 34 insertions(+), 1 deletion(-)

diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index c0abd0bae2..1b8959d6bf 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -476,5 +476,18 @@ This section introduce all available spark procedures 
about paimon.
          CALL sys.copy(source_table => "t1", target_table => "t1_copy", where 
=> "day = '2025-08-17'")<br/>
       </td>
    </tr>
+   <tr>
+      <td>rescale</td>
+      <td>
+         Rescale partitions of a table by changing the bucket number. 
Arguments:
+            <li>table: the target table identifier. Cannot be empty.</li>
+            <li>bucket_num: resulting bucket number after rescale. The default 
value is the current bucket number of the table. Cannot be empty for postpone 
bucket tables.</li>
+            <li>partitions: partition filter. Left empty for all partitions. 
(Can't be used together with "where")</li>
+            <li>where: partition predicate. Left empty for all partitions. 
(Can't be used together with "partitions")</li>
+      </td>
+      <td>
+         CALL sys.rescale(table => 'default.T', bucket_num => 16, partitions 
=> 'dt=20250217,hh=08;dt=20250217,hh=09')<br/>
+      </td>
+   </tr>
    </tbody>
 </table>
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java
index 8f8a8874be..94e1619806 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java
@@ -162,7 +162,7 @@ public class RescaleProcedure extends BaseProcedure {
         if (partitionPredicate != null) {
             snapshotReader = 
snapshotReader.withPartitionFilter(partitionPredicate);
         }
-        List<DataSplit> dataSplits = snapshotReader.read().dataSplits();
+        List<DataSplit> dataSplits = 
snapshotReader.onlyReadRealBuckets().read().dataSplits();
 
         if (dataSplits.isEmpty()) {
             LOG.info("No data splits found for the specified partition. No 
need to rescale.");
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala
index b2d9af14ad..986b1f9b6b 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala
@@ -305,6 +305,26 @@ class RescaleProcedureTest extends PaimonSparkTestBase {
         spark.sql("CALL sys.rescale(table => 'T4', bucket_num => 4, where => 
'id = 1')")
       }.getMessage.contains("Only partition predicate is supported"))
     }
+
+  }
+
+  test("Paimon Procedure: rescale postpone bucket table") {
+    // Postpone bucket table needs to compact before rescale
+    withTable("T") {
+      spark.sql(s"""
+                   |CREATE TABLE T (id INT, value STRING, pt STRING)
+                   |TBLPROPERTIES (
+                   |  'primary-key'='id',
+                   |  'bucket'='-2',
+                   |  'postpone.batch-write-fixed-bucket' = 'false',
+                   |  'postpone.default-bucket-num' = '1'
+                   |)
+                   |PARTITIONED BY (pt)
+                   |""".stripMargin)
+      spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1')")
+      spark.sql("CALL sys.rescale(table => 'T', bucket_num => 2, partitions => 
'pt=\"p1\"')")
+      checkAnswer(sql("SELECT count(*) FROM T"), Seq(Row(0)))
+    }
   }
 
   // ----------------------- Helper Methods -----------------------

Reply via email to