This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3ab3b67a89 [flink] Add default parallelism for source and sink in
rescale procedure, also make them configurable (#5193)
3ab3b67a89 is described below
commit 3ab3b67a89933f2aa72b93c985668333f0143f03
Author: tsreaper <[email protected]>
AuthorDate: Mon Mar 10 16:19:51 2025 +0800
[flink] Add default parallelism for source and sink in rescale procedure,
also make them configurable (#5193)
---
docs/content/flink/procedures.md | 4 ++-
.../apache/paimon/flink/action/RescaleAction.java | 35 +++++++++++++++++++++-
.../paimon/flink/action/RescaleActionFactory.java | 18 +++++++++--
.../paimon/flink/procedure/RescaleProcedure.java | 23 ++++++++++++--
4 files changed, 73 insertions(+), 7 deletions(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 495a96b074..1d99414195 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -746,13 +746,15 @@ All available procedures are listed below.
<tr>
<td>rescale</td>
<td>
- CALL [catalog.]sys.rescale(`table` => 'identifier', `bucket_num` =>
bucket_num, `partition` => 'partition')
+ CALL [catalog.]sys.rescale(`table` => 'identifier', `bucket_num` =>
bucket_num, `partition` => 'partition', `source.parallelism` =>
'source.parallelism', `sink.parallelism` => 'sink.parallelism')
</td>
<td>
Rescale one partition of a table. Arguments:
<li>identifier: The target table identifier. Cannot be empty.</li>
<li>bucket_num: Resulting bucket number after rescale. The default
value of argument bucket_num is the current bucket number of the table. Cannot
be empty for postpone bucket tables.</li>
<li>partition: What partition to rescale. For partitioned table this
argument cannot be empty.</li>
+ <li>source.parallelism: Parallelism of source operator. The default
value is the current bucket number of the partition.</li>
+ <li>sink.parallelism: Parallelism of sink operator. The default value
is equal to bucket_num.</li>
</td>
<td>
CALL sys.rescale(`table` => 'default.T', `bucket_num` => 16,
`partition` => 'dt=20250217,hh=08')
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
index 64a205aee0..552020eb14 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.action;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
+import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.BucketMode;
@@ -38,6 +39,7 @@ import org.apache.flink.table.data.RowData;
import javax.annotation.Nullable;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
/** Action to rescale one partition of a table. */
@@ -45,6 +47,8 @@ public class RescaleAction extends TableActionBase {
private @Nullable Integer bucketNum;
private Map<String, String> partition = new HashMap<>();
+ private @Nullable Integer sourceParallelism;
+ private @Nullable Integer sinkParallelism;
public RescaleAction(String databaseName, String tableName, Map<String,
String> catalogConfig) {
super(databaseName, tableName, catalogConfig);
@@ -60,6 +64,16 @@ public class RescaleAction extends TableActionBase {
return this;
}
+ public RescaleAction withSourceParallelism(int sourceParallelism) {
+ this.sourceParallelism = sourceParallelism;
+ return this;
+ }
+
+ public RescaleAction withSinkParallelism(int sinkParallelism) {
+ this.sinkParallelism = sinkParallelism;
+ return this;
+ }
+
@Override
public void build() throws Exception {
Configuration flinkConf = new Configuration();
@@ -79,6 +93,8 @@ public class RescaleAction extends TableActionBase {
new FlinkSourceBuilder(fileStoreTable)
.env(env)
.sourceBounded(true)
+ .sourceParallelism(
+ sourceParallelism == null ? currentBucketNum()
: sourceParallelism)
.predicate(partitionPredicate)
.build();
@@ -92,7 +108,11 @@ public class RescaleAction extends TableActionBase {
}
FileStoreTable rescaledTable =
fileStoreTable.copy(fileStoreTable.schema().copy(bucketOptions));
- new
FlinkSinkBuilder(rescaledTable).overwrite(partition).forRowData(source).build();
+ new FlinkSinkBuilder(rescaledTable)
+ .overwrite(partition)
+ .parallelism(sinkParallelism == null ? bucketNum :
sinkParallelism)
+ .forRowData(source)
+ .build();
}
@Override
@@ -100,4 +120,17 @@ public class RescaleAction extends TableActionBase {
build();
env.execute("Rescale Postpone Bucket : " + table.fullName());
}
+
+ private int currentBucketNum() {
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ Iterator<ManifestEntry> it =
+ fileStoreTable
+ .newSnapshotReader()
+ .withPartitionFilter(partition)
+ .readFileIterator();
+ Preconditions.checkArgument(
+ it.hasNext(),
+ "The specified partition does not have any data files. No need
to rescale.");
+ return it.next().totalBuckets();
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
index 8a5465ff41..c7b96e5000 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleActionFactory.java
@@ -26,6 +26,8 @@ public class RescaleActionFactory implements ActionFactory {
public static final String IDENTIFIER = "rescale";
private static final String BUCKET_NUM = "bucket_num";
private static final String PARTITION = "partition";
+ private static final String SOURCE_PARALLELISM = "source.parallelism";
+ private static final String SINK_PARALLELISM = "sink.parallelism";
@Override
public String identifier() {
@@ -43,10 +45,15 @@ public class RescaleActionFactory implements ActionFactory {
if (params.has(BUCKET_NUM)) {
action.withBucketNum(Integer.parseInt(params.get(BUCKET_NUM)));
}
-
if (params.has(PARTITION)) {
action.withPartition(getPartitions(params).get(0));
}
+ if (params.has(SOURCE_PARALLELISM)) {
+
action.withSourceParallelism(Integer.parseInt(params.get(SOURCE_PARALLELISM)));
+ }
+ if (params.has(SINK_PARALLELISM)) {
+
action.withSinkParallelism(Integer.parseInt(params.get(SINK_PARALLELISM)));
+ }
return Optional.of(action);
}
@@ -60,11 +67,16 @@ public class RescaleActionFactory implements ActionFactory {
System.out.println(
" rescale --warehouse <warehouse_path> --database
<database_name> "
+ "--table <table_name> [--bucket_num <bucket_num>] "
- + "[--partition <partition>]");
+ + "[--partition <partition>] "
+ + "[--source.parallelism <source.parallelism>]
[--sink.parallelism <sink.parallelism>]");
System.out.println(
- "The default value of argument bucket_num is the current
bucket number of the table. "
+ "The default value of argument bucket_num is the value of
'bucket' option of the table. "
+ "For postpone bucket tables, this argument must be
specified.");
System.out.println(
"Argument partition must be specified if the table is a
partitioned table.");
+ System.out.println(
+ "The default value of argument source.parallelism is the
current bucket number of the partition.");
+ System.out.println(
+ "The default value of argument sink.parallelism is equal to
bucket_num.");
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
index c7d2a1f2e7..df8cba982b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RescaleProcedure.java
@@ -38,13 +38,26 @@ public class RescaleProcedure extends ProcedureBase {
argument = {
@ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
@ArgumentHint(name = "bucket_num", type =
@DataTypeHint("INT"), isOptional = true),
- @ArgumentHint(name = "partition", type =
@DataTypeHint("STRING"), isOptional = true)
+ @ArgumentHint(
+ name = "partition",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "source.parallelism",
+ type = @DataTypeHint("INT"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "sink.parallelism",
+ type = @DataTypeHint("INT"),
+ isOptional = true)
})
public String[] call(
ProcedureContext procedureContext,
String tableId,
@Nullable Integer bucketNum,
- @Nullable String partition)
+ @Nullable String partition,
+ @Nullable Integer sourceParallelism,
+ @Nullable Integer sinkParallelism)
throws Exception {
Identifier identifier = Identifier.fromString(tableId);
String databaseName = identifier.getDatabaseName();
@@ -57,6 +70,12 @@ public class RescaleProcedure extends ProcedureBase {
if (partition != null) {
action.withPartition(ParameterUtils.getPartitions(partition).get(0));
}
+ if (sourceParallelism != null) {
+ action.withSourceParallelism(sourceParallelism);
+ }
+ if (sinkParallelism != null) {
+ action.withSinkParallelism(sinkParallelism);
+ }
return execute(
procedureContext, action, "Rescale Postpone Bucket : " +
identifier.getFullName());