This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f778c6534c [flink] Refactor parallelism options for RescaleProcedure
f778c6534c is described below

commit f778c6534cb3d536acb75c1ab1c248965e1dfd64
Author: JingsongLi <[email protected]>
AuthorDate: Mon Mar 10 16:23:09 2025 +0800

    [flink] Refactor parallelism options for RescaleProcedure
---
 docs/content/flink/procedures.md                           |  6 +++---
 .../java/org/apache/paimon/flink/action/RescaleAction.java |  8 ++++----
 .../apache/paimon/flink/action/RescaleActionFactory.java   | 14 +++++++-------
 .../apache/paimon/flink/procedure/RescaleProcedure.java    | 10 +++++-----
 4 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 1d99414195..45d96fef20 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -746,15 +746,15 @@ All available procedures are listed below.
    <tr>
       <td>rescale</td>
       <td>
-         CALL [catalog.]sys.rescale(`table` => 'identifier', `bucket_num` => 
bucket_num, `partition` => 'partition', `source.parallelism` => 
'source.parallelism', `sink.parallelism` => 'sink.parallelism')
+         CALL [catalog.]sys.rescale(`table` => 'identifier', `bucket_num` => 
bucket_num, `partition` => 'partition', `scan_parallelism` => 
'scan_parallelism', `sink_parallelism` => 'sink_parallelism')
       </td>
       <td>
          Rescale one partition of a table. Arguments:
          <li>identifier: The target table identifier. Cannot be empty.</li>
          <li>bucket_num: Resulting bucket number after rescale. The default 
value of argument bucket_num is the current bucket number of the table. Cannot 
be empty for postpone bucket tables.</li>
          <li>partition: What partition to rescale. For partitioned table this 
argument cannot be empty.</li>
-         <li>source.parallelism: Parallelism of source operator. The default 
value is the current bucket number of the partition.</li>
-         <li>sink.parallelism: Parallelism of sink operator. The default value 
is equal to bucket_num.</li>
+         <li>scan_parallelism: Parallelism of source operator. The default 
value is the current bucket number of the partition.</li>
+         <li>sink_parallelism: Parallelism of sink operator. The default value 
is equal to bucket_num.</li>
       </td>
       <td>
          CALL sys.rescale(`table` => 'default.T', `bucket_num` => 16, 
`partition` => 'dt=20250217,hh=08')
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
index 552020eb14..1e02f6c100 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
@@ -47,7 +47,7 @@ public class RescaleAction extends TableActionBase {
 
     private @Nullable Integer bucketNum;
     private Map<String, String> partition = new HashMap<>();
-    private @Nullable Integer sourceParallelism;
+    private @Nullable Integer scanParallelism;
     private @Nullable Integer sinkParallelism;
 
     public RescaleAction(String databaseName, String tableName, Map<String, 
String> catalogConfig) {
@@ -64,8 +64,8 @@ public class RescaleAction extends TableActionBase {
         return this;
     }
 
-    public RescaleAction withSourceParallelism(int sourceParallelism) {
-        this.sourceParallelism = sourceParallelism;
+    public RescaleAction withScanParallelism(int scanParallelism) {
+        this.scanParallelism = scanParallelism;
         return this;
     }
 
@@ -94,7 +94,7 @@ public class RescaleAction extends TableActionBase {
                         .env(env)
                         .sourceBounded(true)
                         .sourceParallelism(
-                                sourceParallelism == null ? currentBucketNum() 
: sourceParallelism)
+                                scanParallelism == null ? currentBucketNum() : 
scanParallelism)
                         .predicate(partitionPredicate)
                         .build();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
index c7b96e5000..8209393efa 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
@@ -26,8 +26,8 @@ public class RescaleActionFactory implements ActionFactory {
     public static final String IDENTIFIER = "rescale";
     private static final String BUCKET_NUM = "bucket_num";
     private static final String PARTITION = "partition";
-    private static final String SOURCE_PARALLELISM = "source.parallelism";
-    private static final String SINK_PARALLELISM = "sink.parallelism";
+    private static final String SCAN_PARALLELISM = "scan_parallelism";
+    private static final String SINK_PARALLELISM = "sink_parallelism";
 
     @Override
     public String identifier() {
@@ -48,8 +48,8 @@ public class RescaleActionFactory implements ActionFactory {
         if (params.has(PARTITION)) {
             action.withPartition(getPartitions(params).get(0));
         }
-        if (params.has(SOURCE_PARALLELISM)) {
-            
action.withSourceParallelism(Integer.parseInt(params.get(SOURCE_PARALLELISM)));
+        if (params.has(SCAN_PARALLELISM)) {
+            
action.withScanParallelism(Integer.parseInt(params.get(SCAN_PARALLELISM)));
         }
         if (params.has(SINK_PARALLELISM)) {
             
action.withSinkParallelism(Integer.parseInt(params.get(SINK_PARALLELISM)));
@@ -68,15 +68,15 @@ public class RescaleActionFactory implements ActionFactory {
                 "  rescale --warehouse <warehouse_path> --database 
<database_name> "
                         + "--table <table_name> [--bucket_num <bucket_num>] "
                         + "[--partition <partition>] "
-                        + "[--source.parallelism <source.parallelism>] 
[--sink.parallelism <sink.parallelism>]");
+                        + "[--scan_parallelism <scan_parallelism>] 
[--sink_parallelism <sink_parallelism>]");
         System.out.println(
                 "The default value of argument bucket_num is the value of 
'bucket' option of the table. "
                         + "For postpone bucket tables, this argument must be 
specified.");
         System.out.println(
                 "Argument partition must be specified if the table is a 
partitioned table.");
         System.out.println(
-                "The default value of argument source.parallelism is the 
current bucket number of the partition.");
+                "The default value of argument scan_parallelism is the current 
bucket number of the partition.");
         System.out.println(
-                "The default value of argument sink.parallelism is equal to 
bucket_num.");
+                "The default value of argument sink_parallelism is equal to 
bucket_num.");
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
index df8cba982b..f253ec736f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
@@ -43,11 +43,11 @@ public class RescaleProcedure extends ProcedureBase {
                         type = @DataTypeHint("STRING"),
                         isOptional = true),
                 @ArgumentHint(
-                        name = "source.parallelism",
+                        name = "scan_parallelism",
                         type = @DataTypeHint("INT"),
                         isOptional = true),
                 @ArgumentHint(
-                        name = "sink.parallelism",
+                        name = "sink_parallelism",
                         type = @DataTypeHint("INT"),
                         isOptional = true)
             })
@@ -56,7 +56,7 @@ public class RescaleProcedure extends ProcedureBase {
             String tableId,
             @Nullable Integer bucketNum,
             @Nullable String partition,
-            @Nullable Integer sourceParallelism,
+            @Nullable Integer scanParallelism,
             @Nullable Integer sinkParallelism)
             throws Exception {
         Identifier identifier = Identifier.fromString(tableId);
@@ -70,8 +70,8 @@ public class RescaleProcedure extends ProcedureBase {
         if (partition != null) {
             
action.withPartition(ParameterUtils.getPartitions(partition).get(0));
         }
-        if (sourceParallelism != null) {
-            action.withSourceParallelism(sourceParallelism);
+        if (scanParallelism != null) {
+            action.withScanParallelism(scanParallelism);
         }
         if (sinkParallelism != null) {
             action.withSinkParallelism(sinkParallelism);

Reply via email to