This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ab3b67a89 [flink] Add default parallelism for source and sink in 
rescale procedure, also make them configurable (#5193)
3ab3b67a89 is described below

commit 3ab3b67a89933f2aa72b93c985668333f0143f03
Author: tsreaper <[email protected]>
AuthorDate: Mon Mar 10 16:19:51 2025 +0800

    [flink] Add default parallelism for source and sink in rescale procedure, 
also make them configurable (#5193)
---
 docs/content/flink/procedures.md                   |  4 ++-
 .../apache/paimon/flink/action/RescaleAction.java  | 35 +++++++++++++++++++++-
 .../paimon/flink/action/RescaleActionFactory.java  | 18 +++++++++--
 .../paimon/flink/procedure/RescaleProcedure.java   | 23 ++++++++++++--
 4 files changed, 73 insertions(+), 7 deletions(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 495a96b074..1d99414195 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -746,13 +746,15 @@ All available procedures are listed below.
    <tr>
       <td>rescale</td>
       <td>
-         CALL [catalog.]sys.rescale(`table` => 'identifier', `bucket_num` => 
bucket_num, `partition` => 'partition')
+         CALL [catalog.]sys.rescale(`table` => 'identifier', `bucket_num` => 
bucket_num, `partition` => 'partition', `source.parallelism` => 
'source.parallelism', `sink.parallelism` => 'sink.parallelism')
       </td>
       <td>
          Rescale one partition of a table. Arguments:
          <li>identifier: The target table identifier. Cannot be empty.</li>
          <li>bucket_num: Resulting bucket number after rescale. The default 
value of argument bucket_num is the current bucket number of the table. Cannot 
be empty for postpone bucket tables.</li>
          <li>partition: What partition to rescale. For partitioned table this 
argument cannot be empty.</li>
+         <li>source.parallelism: Parallelism of source operator. The default 
value is the current bucket number of the partition.</li>
+         <li>sink.parallelism: Parallelism of sink operator. The default value 
is equal to bucket_num.</li>
       </td>
       <td>
          CALL sys.rescale(`table` => 'default.T', `bucket_num` => 16, 
`partition` => 'dt=20250217,hh=08')
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
index 64a205aee0..552020eb14 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.action;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.flink.sink.FlinkSinkBuilder;
 import org.apache.paimon.flink.source.FlinkSourceBuilder;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.BucketMode;
@@ -38,6 +39,7 @@ import org.apache.flink.table.data.RowData;
 import javax.annotation.Nullable;
 
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 
 /** Action to rescale one partition of a table. */
@@ -45,6 +47,8 @@ public class RescaleAction extends TableActionBase {
 
     private @Nullable Integer bucketNum;
     private Map<String, String> partition = new HashMap<>();
+    private @Nullable Integer sourceParallelism;
+    private @Nullable Integer sinkParallelism;
 
     public RescaleAction(String databaseName, String tableName, Map<String, 
String> catalogConfig) {
         super(databaseName, tableName, catalogConfig);
@@ -60,6 +64,16 @@ public class RescaleAction extends TableActionBase {
         return this;
     }
 
+    public RescaleAction withSourceParallelism(int sourceParallelism) {
+        this.sourceParallelism = sourceParallelism;
+        return this;
+    }
+
+    public RescaleAction withSinkParallelism(int sinkParallelism) {
+        this.sinkParallelism = sinkParallelism;
+        return this;
+    }
+
     @Override
     public void build() throws Exception {
         Configuration flinkConf = new Configuration();
@@ -79,6 +93,8 @@ public class RescaleAction extends TableActionBase {
                 new FlinkSourceBuilder(fileStoreTable)
                         .env(env)
                         .sourceBounded(true)
+                        .sourceParallelism(
+                                sourceParallelism == null ? currentBucketNum() 
: sourceParallelism)
                         .predicate(partitionPredicate)
                         .build();
 
@@ -92,7 +108,11 @@ public class RescaleAction extends TableActionBase {
         }
         FileStoreTable rescaledTable =
                 
fileStoreTable.copy(fileStoreTable.schema().copy(bucketOptions));
-        new 
FlinkSinkBuilder(rescaledTable).overwrite(partition).forRowData(source).build();
+        new FlinkSinkBuilder(rescaledTable)
+                .overwrite(partition)
+                .parallelism(sinkParallelism == null ? bucketNum : 
sinkParallelism)
+                .forRowData(source)
+                .build();
     }
 
     @Override
@@ -100,4 +120,17 @@ public class RescaleAction extends TableActionBase {
         build();
         env.execute("Rescale Postpone Bucket : " + table.fullName());
     }
+
+    private int currentBucketNum() {
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+        Iterator<ManifestEntry> it =
+                fileStoreTable
+                        .newSnapshotReader()
+                        .withPartitionFilter(partition)
+                        .readFileIterator();
+        Preconditions.checkArgument(
+                it.hasNext(),
+                "The specified partition does not have any data files. No need 
to rescale.");
+        return it.next().totalBuckets();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
index 8a5465ff41..c7b96e5000 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
@@ -26,6 +26,8 @@ public class RescaleActionFactory implements ActionFactory {
     public static final String IDENTIFIER = "rescale";
     private static final String BUCKET_NUM = "bucket_num";
     private static final String PARTITION = "partition";
+    private static final String SOURCE_PARALLELISM = "source.parallelism";
+    private static final String SINK_PARALLELISM = "sink.parallelism";
 
     @Override
     public String identifier() {
@@ -43,10 +45,15 @@ public class RescaleActionFactory implements ActionFactory {
         if (params.has(BUCKET_NUM)) {
             action.withBucketNum(Integer.parseInt(params.get(BUCKET_NUM)));
         }
-
         if (params.has(PARTITION)) {
             action.withPartition(getPartitions(params).get(0));
         }
+        if (params.has(SOURCE_PARALLELISM)) {
+            
action.withSourceParallelism(Integer.parseInt(params.get(SOURCE_PARALLELISM)));
+        }
+        if (params.has(SINK_PARALLELISM)) {
+            
action.withSinkParallelism(Integer.parseInt(params.get(SINK_PARALLELISM)));
+        }
 
         return Optional.of(action);
     }
@@ -60,11 +67,16 @@ public class RescaleActionFactory implements ActionFactory {
         System.out.println(
                 "  rescale --warehouse <warehouse_path> --database 
<database_name> "
                         + "--table <table_name> [--bucket_num <bucket_num>] "
-                        + "[--partition <partition>]");
+                        + "[--partition <partition>] "
+                        + "[--source.parallelism <source.parallelism>] 
[--sink.parallelism <sink.parallelism>]");
         System.out.println(
-                "The default value of argument bucket_num is the current 
bucket number of the table. "
+                "The default value of argument bucket_num is the value of 
'bucket' option of the table. "
                         + "For postpone bucket tables, this argument must be 
specified.");
         System.out.println(
                 "Argument partition must be specified if the table is a 
partitioned table.");
+        System.out.println(
+                "The default value of argument source.parallelism is the 
current bucket number of the partition.");
+        System.out.println(
+                "The default value of argument sink.parallelism is equal to 
bucket_num.");
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
index c7d2a1f2e7..df8cba982b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
@@ -38,13 +38,26 @@ public class RescaleProcedure extends ProcedureBase {
             argument = {
                 @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
                 @ArgumentHint(name = "bucket_num", type = 
@DataTypeHint("INT"), isOptional = true),
-                @ArgumentHint(name = "partition", type = 
@DataTypeHint("STRING"), isOptional = true)
+                @ArgumentHint(
+                        name = "partition",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "source.parallelism",
+                        type = @DataTypeHint("INT"),
+                        isOptional = true),
+                @ArgumentHint(
+                        name = "sink.parallelism",
+                        type = @DataTypeHint("INT"),
+                        isOptional = true)
             })
     public String[] call(
             ProcedureContext procedureContext,
             String tableId,
             @Nullable Integer bucketNum,
-            @Nullable String partition)
+            @Nullable String partition,
+            @Nullable Integer sourceParallelism,
+            @Nullable Integer sinkParallelism)
             throws Exception {
         Identifier identifier = Identifier.fromString(tableId);
         String databaseName = identifier.getDatabaseName();
@@ -57,6 +70,12 @@ public class RescaleProcedure extends ProcedureBase {
         if (partition != null) {
             
action.withPartition(ParameterUtils.getPartitions(partition).get(0));
         }
+        if (sourceParallelism != null) {
+            action.withSourceParallelism(sourceParallelism);
+        }
+        if (sinkParallelism != null) {
+            action.withSinkParallelism(sinkParallelism);
+        }
 
         return execute(
                 procedureContext, action, "Rescale Postpone Bucket : " + 
identifier.getFullName());

Reply via email to