This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new f778c6534c [flink] Refactor parallelism options for RescaleProcedure
f778c6534c is described below
commit f778c6534cb3d536acb75c1ab1c248965e1dfd64
Author: JingsongLi <[email protected]>
AuthorDate: Mon Mar 10 16:23:09 2025 +0800
[flink] Refactor parallelism options for RescaleProcedure
---
docs/content/flink/procedures.md | 6 +++---
.../java/org/apache/paimon/flink/action/RescaleAction.java | 8 ++++----
.../apache/paimon/flink/action/RescaleActionFactory.java | 14 +++++++-------
.../apache/paimon/flink/procedure/RescaleProcedure.java | 10 +++++-----
4 files changed, 19 insertions(+), 19 deletions(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 1d99414195..45d96fef20 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -746,15 +746,15 @@ All available procedures are listed below.
<tr>
<td>rescale</td>
<td>
- CALL [catalog.]sys.rescale(`table` => 'identifier', `bucket_num` =>
bucket_num, `partition` => 'partition', `source.parallelism` =>
'source.parallelism', `sink.parallelism` => 'sink.parallelism')
+ CALL [catalog.]sys.rescale(`table` => 'identifier', `bucket_num` =>
bucket_num, `partition` => 'partition', `scan_parallelism` =>
'scan_parallelism', `sink_parallelism` => 'sink_parallelism')
</td>
<td>
Rescale one partition of a table. Arguments:
<li>identifier: The target table identifier. Cannot be empty.</li>
<li>bucket_num: Resulting bucket number after rescale. The default
value of argument bucket_num is the current bucket number of the table. Cannot
be empty for postpone bucket tables.</li>
<li>partition: What partition to rescale. For partitioned table this
argument cannot be empty.</li>
- <li>source.parallelism: Parallelism of source operator. The default
value is the current bucket number of the partition.</li>
- <li>sink.parallelism: Parallelism of sink operator. The default value
is equal to bucket_num.</li>
+ <li>scan_parallelism: Parallelism of source operator. The default
value is the current bucket number of the partition.</li>
+ <li>sink_parallelism: Parallelism of sink operator. The default value
is equal to bucket_num.</li>
</td>
<td>
CALL sys.rescale(`table` => 'default.T', `bucket_num` => 16,
`partition` => 'dt=20250217,hh=08')
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
index 552020eb14..1e02f6c100 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
@@ -47,7 +47,7 @@ public class RescaleAction extends TableActionBase {
private @Nullable Integer bucketNum;
private Map<String, String> partition = new HashMap<>();
- private @Nullable Integer sourceParallelism;
+ private @Nullable Integer scanParallelism;
private @Nullable Integer sinkParallelism;
public RescaleAction(String databaseName, String tableName, Map<String,
String> catalogConfig) {
@@ -64,8 +64,8 @@ public class RescaleAction extends TableActionBase {
return this;
}
- public RescaleAction withSourceParallelism(int sourceParallelism) {
- this.sourceParallelism = sourceParallelism;
+ public RescaleAction withScanParallelism(int scanParallelism) {
+ this.scanParallelism = scanParallelism;
return this;
}
@@ -94,7 +94,7 @@ public class RescaleAction extends TableActionBase {
.env(env)
.sourceBounded(true)
.sourceParallelism(
- sourceParallelism == null ? currentBucketNum()
: sourceParallelism)
+ scanParallelism == null ? currentBucketNum() :
scanParallelism)
.predicate(partitionPredicate)
.build();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
index c7b96e5000..8209393efa 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
@@ -26,8 +26,8 @@ public class RescaleActionFactory implements ActionFactory {
public static final String IDENTIFIER = "rescale";
private static final String BUCKET_NUM = "bucket_num";
private static final String PARTITION = "partition";
- private static final String SOURCE_PARALLELISM = "source.parallelism";
- private static final String SINK_PARALLELISM = "sink.parallelism";
+ private static final String SCAN_PARALLELISM = "scan_parallelism";
+ private static final String SINK_PARALLELISM = "sink_parallelism";
@Override
public String identifier() {
@@ -48,8 +48,8 @@ public class RescaleActionFactory implements ActionFactory {
if (params.has(PARTITION)) {
action.withPartition(getPartitions(params).get(0));
}
- if (params.has(SOURCE_PARALLELISM)) {
-
action.withSourceParallelism(Integer.parseInt(params.get(SOURCE_PARALLELISM)));
+ if (params.has(SCAN_PARALLELISM)) {
+
action.withScanParallelism(Integer.parseInt(params.get(SCAN_PARALLELISM)));
}
if (params.has(SINK_PARALLELISM)) {
action.withSinkParallelism(Integer.parseInt(params.get(SINK_PARALLELISM)));
@@ -68,15 +68,15 @@ public class RescaleActionFactory implements ActionFactory {
" rescale --warehouse <warehouse_path> --database
<database_name> "
+ "--table <table_name> [--bucket_num <bucket_num>] "
+ "[--partition <partition>] "
- + "[--source.parallelism <source.parallelism>]
[--sink.parallelism <sink.parallelism>]");
+ + "[--scan_parallelism <scan_parallelism>]
[--sink_parallelism <sink_parallelism>]");
System.out.println(
"The default value of argument bucket_num is the value of
'bucket' option of the table. "
+ "For postpone bucket tables, this argument must be
specified.");
System.out.println(
"Argument partition must be specified if the table is a
partitioned table.");
System.out.println(
- "The default value of argument source.parallelism is the
current bucket number of the partition.");
+ "The default value of argument scan_parallelism is the current
bucket number of the partition.");
System.out.println(
- "The default value of argument sink.parallelism is equal to
bucket_num.");
+ "The default value of argument sink_parallelism is equal to
bucket_num.");
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
index df8cba982b..f253ec736f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
@@ -43,11 +43,11 @@ public class RescaleProcedure extends ProcedureBase {
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(
- name = "source.parallelism",
+ name = "scan_parallelism",
type = @DataTypeHint("INT"),
isOptional = true),
@ArgumentHint(
- name = "sink.parallelism",
+ name = "sink_parallelism",
type = @DataTypeHint("INT"),
isOptional = true)
})
@@ -56,7 +56,7 @@ public class RescaleProcedure extends ProcedureBase {
String tableId,
@Nullable Integer bucketNum,
@Nullable String partition,
- @Nullable Integer sourceParallelism,
+ @Nullable Integer scanParallelism,
@Nullable Integer sinkParallelism)
throws Exception {
Identifier identifier = Identifier.fromString(tableId);
@@ -70,8 +70,8 @@ public class RescaleProcedure extends ProcedureBase {
if (partition != null) {
action.withPartition(ParameterUtils.getPartitions(partition).get(0));
}
- if (sourceParallelism != null) {
- action.withSourceParallelism(sourceParallelism);
+ if (scanParallelism != null) {
+ action.withScanParallelism(scanParallelism);
}
if (sinkParallelism != null) {
action.withSinkParallelism(sinkParallelism);