This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c4c0d6aad1 [flink] support infer parallelism for incremental
clustering (#6624)
c4c0d6aad1 is described below
commit c4c0d6aad1d677990cecf62e90651941c21858cc
Author: LsomeYeah <[email protected]>
AuthorDate: Tue Nov 18 14:18:30 2025 +0800
[flink] support infer parallelism for incremental clustering (#6624)
---
.../shortcodes/generated/core_configuration.html | 6 ++
.../main/java/org/apache/paimon/CoreOptions.java | 10 +++
.../apache/paimon/flink/action/CompactAction.java | 35 +++++++--
.../action/IncrementalClusterActionITCase.java | 88 +++++++++++++++++-----
tools/maven/suppressions.xml | 2 +
5 files changed, 118 insertions(+), 23 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index c3251aa979..d1c46f2ff6 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -182,6 +182,12 @@ under the License.
<td>Boolean</td>
<td>Whether enable incremental clustering.</td>
</tr>
+ <tr>
+ <td><h5>clustering.per-subtask.data-size</h5></td>
+ <td style="word-wrap: break-word;">1 gb</td>
+ <td>MemorySize</td>
+ <td>The data size processed by single parallelism.</td>
+ </tr>
<tr>
<td><h5>clustering.strategy</h5></td>
<td style="word-wrap: break-word;">"auto"</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index ccb490ec41..e702aceace 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1967,6 +1967,12 @@ public class CoreOptions implements Serializable {
.defaultValue(false)
.withDescription("Whether enable incremental clustering.");
+ public static final ConfigOption<MemorySize> CLUSTERING_PER_TASK_DATA_SIZE
=
+ key("clustering.per-subtask.data-size")
+ .memoryType()
+ .defaultValue(MemorySize.ofMebiBytes(1024))
+ .withDescription("The data size processed by single
parallelism.");
+
public static final ConfigOption<Integer>
CLUSTERING_HISTORY_PARTITION_LIMIT =
key("clustering.history-partition.limit")
.intType()
@@ -3149,6 +3155,10 @@ public class CoreOptions implements Serializable {
return options.get(CLUSTERING_INCREMENTAL);
}
+ public MemorySize clusteringPerTaskDataSize() {
+ return options.get(CLUSTERING_PER_TASK_DATA_SIZE);
+ }
+
public Duration clusteringHistoryPartitionIdleTime() {
return options.get(CLUSTERING_HISTORY_PARTITION_IDLE_TIME);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 69ca2a5f45..3e3018a2a2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -41,6 +41,7 @@ import org.apache.paimon.flink.sink.RowDataChannelComputer;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
@@ -79,6 +80,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.paimon.CoreOptions.CLUSTERING_PER_TASK_DATA_SIZE;
import static
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -242,6 +244,9 @@ public class CompactAction extends TableActionBase {
table.partitionKeys().toArray(new String[0]),
table.coreOptions().legacyPartitionName());
+ long perSubtaskDataSize =
table.coreOptions().clusteringPerTaskDataSize().getBytes();
+ LOGGER.info("{} is {} bytes.", CLUSTERING_PER_TASK_DATA_SIZE,
perSubtaskDataSize);
+
// 1. pick cluster files for each partition
Map<BinaryRow, CompactUnit> compactUnits =
incrementalClusterManager.prepareForCluster(fullCompaction);
@@ -275,14 +280,32 @@ public class CompactAction extends TableActionBase {
partitionComputer.generatePartValues(partition);
// 2.1 generate source for current partition
+ long partitionFileSize = 0L;
+ int partitionFileCount = 0;
+ for (DataSplit split : splits) {
+ for (DataFileMeta fileMeta : split.dataFiles()) {
+ partitionFileSize += fileMeta.fileSize();
+ partitionFileCount++;
+ }
+ }
+ int inferParallelism =
+ Math.min(
+ partitionFileCount,
+ Math.max(1, (int) (partitionFileSize /
perSubtaskDataSize)));
+ LOGGER.info(
+ "For partition {}, the total data size is {} bytes, total
file count is {}, infer parallelism is {}.",
+ partitionSpec,
+ partitionFileSize,
+ partitionFileCount,
+ inferParallelism);
+
+ Integer scanParallelism =
options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
+ if (scanParallelism == null) {
+ scanParallelism = inferParallelism;
+ }
Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
IncrementalClusterSplitSource.buildSource(
- env,
- table,
- partitionSpec,
- splits,
- dvCommitMessage,
-
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
+ env, table, partitionSpec, splits,
dvCommitMessage, scanParallelism);
// 2.2 cluster in partition
Integer sinkParallelism =
options.get(FlinkConnectorOptions.SINK_PARALLELISM);
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index bf7087e77e..1abc5e43bb 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -67,7 +67,7 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterUnpartitionedTable() throws Exception {
- FileStoreTable table = createTable(null, 1);
+ FileStoreTable table = createTable(null);
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -204,7 +204,7 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterPartitionedTable() throws Exception {
- FileStoreTable table = createTable("pt", 1);
+ FileStoreTable table = createTable("pt");
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -336,7 +336,7 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterSpecifyPartition() throws Exception {
- FileStoreTable table = createTable("pt", 1);
+ FileStoreTable table = createTable("pt");
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -378,9 +378,9 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterHistoryPartition() throws Exception {
- Map<String, String> options = new HashMap<>();
+ Map<String, String> options = commonOptions();
options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_IDLE_TIME.key(),
"3s");
- FileStoreTable table = createTable("pt", 1, options);
+ FileStoreTable table = createTable("pt", options);
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -530,13 +530,15 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterOnEmptyData() throws Exception {
- createTable("pt", 1);
+ createTable("pt");
assertThatCode(() ->
runAction(Collections.emptyList())).doesNotThrowAnyException();
}
@Test
public void testMultiParallelism() throws Exception {
- FileStoreTable table = createTable(null, 2);
+ Map<String, String> options = commonOptions();
+ options.put("scan.parallelism", "2");
+ FileStoreTable table = createTable(null, options);
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -577,9 +579,9 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterWithDeletionVector() throws Exception {
- Map<String, String> dynamicOptions = new HashMap<>();
+ Map<String, String> dynamicOptions = commonOptions();
dynamicOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
- FileStoreTable table = createTable(null, 1, dynamicOptions);
+ FileStoreTable table = createTable(null, dynamicOptions);
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -681,16 +683,43 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
assertThat(splits.get(0).deletionFiles().get().get(0)).isNull();
}
- protected FileStoreTable createTable(String partitionKeys, int
sinkParallelism)
- throws Exception {
- return createTable(partitionKeys, sinkParallelism,
Collections.emptyMap());
+ @Test
+ public void testClusterWithInferParallelism() throws Exception {
+ Map<String, String> options = commonOptions();
+ options.remove("scan.parallelism");
+ options.remove("sink.parallelism");
+ options.put(CoreOptions.CLUSTERING_PER_TASK_DATA_SIZE.key(), "50kb");
+ FileStoreTable table = createTable(null, options);
+
+ BinaryString randomStr = BinaryString.fromString(randomString(150));
+ List<CommitMessage> messages = new ArrayList<>();
+
+ // first write, generate 100 files, total size 173kb
+ for (int i = 0; i < 10; i++) {
+ for (int j = 0; j < 10; j++) {
+ messages.addAll(write(GenericRow.of(i, j, randomStr, 0)));
+ }
+ }
+ commit(messages);
+ ReadBuilder readBuilder = table.newReadBuilder().withProjection(new
int[] {0, 1});
+
+ // first cluster
+ runAction(Collections.emptyList());
+ checkSnapshot(table);
+ List<Split> splits = readBuilder.newScan().plan().splits();
+ assertThat(splits.size()).isEqualTo(1);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().size()).isEqualTo(3);
+ assertThat(((DataSplit)
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+ }
+
+ protected FileStoreTable createTable(String partitionKeys) throws
Exception {
+ return createTable(partitionKeys, commonOptions());
}
- protected FileStoreTable createTable(
- String partitionKeys, int sinkParallelism, Map<String, String>
options)
+ protected FileStoreTable createTable(String partitionKeys, Map<String,
String> options)
throws Exception {
catalog.createDatabase(database, true);
- catalog.createTable(identifier(), schema(partitionKeys,
sinkParallelism, options), true);
+ catalog.createTable(identifier(), buildSchema(partitionKeys, options),
true);
return (FileStoreTable) catalog.getTable(identifier());
}
@@ -718,8 +747,33 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
commit.close();
}
- private static Schema schema(String partitionKeys, int sinkParallelism) {
- return schema(partitionKeys, sinkParallelism, Collections.emptyMap());
+ private static Schema buildSchema(String partitionKeys, Map<String,
String> options) {
+ Schema.Builder schemaBuilder = Schema.newBuilder();
+ schemaBuilder.column("a", DataTypes.INT());
+ schemaBuilder.column("b", DataTypes.INT());
+ schemaBuilder.column("c", DataTypes.STRING());
+ schemaBuilder.column("pt", DataTypes.INT());
+ for (String key : options.keySet()) {
+ schemaBuilder.option(key, options.get(key));
+ }
+ if (!StringUtils.isNullOrWhitespaceOnly(partitionKeys)) {
+ schemaBuilder.partitionKeys(partitionKeys);
+ }
+ return schemaBuilder.build();
+ }
+
+ private static Map<String, String> commonOptions() {
+ Map<String, String> options = new HashMap<>();
+ options.put("bucket", "-1");
+ options.put("num-levels", "6");
+ options.put("num-sorted-run.compaction-trigger", "2");
+ options.put("scan.plan-sort-partition", "true");
+ options.put("clustering.columns", "a,b");
+ options.put("clustering.strategy", "zorder");
+ options.put("clustering.incremental", "true");
+ options.put("scan.parallelism", "1");
+ options.put("sink.parallelism", "1");
+ return options;
}
private static Schema schema(
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 42525cd46d..12490f9c20 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -25,6 +25,8 @@ under the License.
<suppressions>
<suppress files="DataTypes.java" checks="MethodNameCheck"/>
+ <suppress files="CoreOptions.java" checks="FileLength"/>
+
<!-- target directory is not relevant for checkstyle -->
<suppress
files="[\\/]target[\\/]"