This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d171ad8446 [spark] Fix rescale procedure to only read real buckets for
postpone bucket table (#7184)
d171ad8446 is described below
commit d171ad8446003314fa22c0541fdcc1678e7405b5
Author: Juntao Zhang <[email protected]>
AuthorDate: Tue Feb 3 10:35:31 2026 +0800
[spark] Fix rescale procedure to only read real buckets for postpone bucket
table (#7184)
---
docs/content/spark/procedures.md | 13 +++++++++++++
.../paimon/spark/procedure/RescaleProcedure.java | 2 +-
.../spark/procedure/RescaleProcedureTest.scala | 20 ++++++++++++++++++++
3 files changed, 34 insertions(+), 1 deletion(-)
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index c0abd0bae2..1b8959d6bf 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -476,5 +476,18 @@ This section introduce all available spark procedures
about paimon.
CALL sys.copy(source_table => "t1", target_table => "t1_copy", where
=> "day = '2025-08-17'")<br/>
</td>
</tr>
+ <tr>
+ <td>rescale</td>
+ <td>
+ Rescale partitions of a table by changing the bucket number.
Arguments:
+ <li>table: the target table identifier. Cannot be empty.</li>
+ <li>bucket_num: resulting bucket number after rescale. The default
value is the current bucket number of the table. Cannot be empty for postpone
bucket tables.</li>
+ <li>partitions: partition filter. Left empty for all partitions.
(Can't be used together with "where")</li>
+ <li>where: partition predicate. Left empty for all partitions.
(Can't be used together with "partitions")</li>
+ </td>
+ <td>
+ CALL sys.rescale(table => 'default.T', bucket_num => 16, partitions
=> 'dt=20250217,hh=08;dt=20250217,hh=09')<br/>
+ </td>
+ </tr>
</tbody>
</table>
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java
index 8f8a8874be..94e1619806 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RescaleProcedure.java
@@ -162,7 +162,7 @@ public class RescaleProcedure extends BaseProcedure {
if (partitionPredicate != null) {
snapshotReader =
snapshotReader.withPartitionFilter(partitionPredicate);
}
- List<DataSplit> dataSplits = snapshotReader.read().dataSplits();
+ List<DataSplit> dataSplits =
snapshotReader.onlyReadRealBuckets().read().dataSplits();
if (dataSplits.isEmpty()) {
LOG.info("No data splits found for the specified partition. No
need to rescale.");
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala
index b2d9af14ad..986b1f9b6b 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RescaleProcedureTest.scala
@@ -305,6 +305,26 @@ class RescaleProcedureTest extends PaimonSparkTestBase {
spark.sql("CALL sys.rescale(table => 'T4', bucket_num => 4, where =>
'id = 1')")
}.getMessage.contains("Only partition predicate is supported"))
}
+
+ }
+
+ test("Paimon Procedure: rescale postpone bucket table") {
+ // Postpone bucket table needs to compact before rescale
+ withTable("T") {
+ spark.sql(s"""
+ |CREATE TABLE T (id INT, value STRING, pt STRING)
+ |TBLPROPERTIES (
+ | 'primary-key'='id',
+ | 'bucket'='-2',
+ | 'postpone.batch-write-fixed-bucket' = 'false',
+ | 'postpone.default-bucket-num' = '1'
+ |)
+ |PARTITIONED BY (pt)
+ |""".stripMargin)
+ spark.sql(s"INSERT INTO T VALUES (1, 'a', 'p1'), (2, 'b', 'p1')")
+ spark.sql("CALL sys.rescale(table => 'T', bucket_num => 2, partitions =>
'pt=\"p1\"')")
+ checkAnswer(sql("SELECT count(*) FROM T"), Seq(Row(0)))
+ }
}
// ----------------------- Helper Methods -----------------------