This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c4c0d6aad1 [flink] support infer parallelism for incremental 
clustering (#6624)
c4c0d6aad1 is described below

commit c4c0d6aad1d677990cecf62e90651941c21858cc
Author: LsomeYeah <[email protected]>
AuthorDate: Tue Nov 18 14:18:30 2025 +0800

    [flink] support infer parallelism for incremental clustering (#6624)
---
 .../shortcodes/generated/core_configuration.html   |  6 ++
 .../main/java/org/apache/paimon/CoreOptions.java   | 10 +++
 .../apache/paimon/flink/action/CompactAction.java  | 35 +++++++--
 .../action/IncrementalClusterActionITCase.java     | 88 +++++++++++++++++-----
 tools/maven/suppressions.xml                       |  2 +
 5 files changed, 118 insertions(+), 23 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index c3251aa979..d1c46f2ff6 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -182,6 +182,12 @@ under the License.
             <td>Boolean</td>
             <td>Whether enable incremental clustering.</td>
         </tr>
+        <tr>
+            <td><h5>clustering.per-subtask.data-size</h5></td>
+            <td style="word-wrap: break-word;">1 gb</td>
+            <td>MemorySize</td>
+            <td>The data size processed by single parallelism.</td>
+        </tr>
         <tr>
             <td><h5>clustering.strategy</h5></td>
             <td style="word-wrap: break-word;">"auto"</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index ccb490ec41..e702aceace 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1967,6 +1967,12 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Whether enable incremental clustering.");
 
+    public static final ConfigOption<MemorySize> CLUSTERING_PER_TASK_DATA_SIZE 
=
+            key("clustering.per-subtask.data-size")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(1024))
+                    .withDescription("The data size processed by single 
parallelism.");
+
     public static final ConfigOption<Integer> 
CLUSTERING_HISTORY_PARTITION_LIMIT =
             key("clustering.history-partition.limit")
                     .intType()
@@ -3149,6 +3155,10 @@ public class CoreOptions implements Serializable {
         return options.get(CLUSTERING_INCREMENTAL);
     }
 
+    public MemorySize clusteringPerTaskDataSize() {
+        return options.get(CLUSTERING_PER_TASK_DATA_SIZE);
+    }
+
     public Duration clusteringHistoryPartitionIdleTime() {
         return options.get(CLUSTERING_HISTORY_PARTITION_IDLE_TIME);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 69ca2a5f45..3e3018a2a2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -41,6 +41,7 @@ import org.apache.paimon.flink.sink.RowDataChannelComputer;
 import org.apache.paimon.flink.sorter.TableSortInfo;
 import org.apache.paimon.flink.sorter.TableSorter;
 import org.apache.paimon.flink.source.CompactorSourceBuilder;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
@@ -79,6 +80,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.CLUSTERING_PER_TASK_DATA_SIZE;
 import static 
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -242,6 +244,9 @@ public class CompactAction extends TableActionBase {
                         table.partitionKeys().toArray(new String[0]),
                         table.coreOptions().legacyPartitionName());
 
+        long perSubtaskDataSize = 
table.coreOptions().clusteringPerTaskDataSize().getBytes();
+        LOGGER.info("{} is {} bytes.", CLUSTERING_PER_TASK_DATA_SIZE, 
perSubtaskDataSize);
+
         // 1. pick cluster files for each partition
         Map<BinaryRow, CompactUnit> compactUnits =
                 incrementalClusterManager.prepareForCluster(fullCompaction);
@@ -275,14 +280,32 @@ public class CompactAction extends TableActionBase {
                     partitionComputer.generatePartValues(partition);
 
             // 2.1 generate source for current partition
+            long partitionFileSize = 0L;
+            int partitionFileCount = 0;
+            for (DataSplit split : splits) {
+                for (DataFileMeta fileMeta : split.dataFiles()) {
+                    partitionFileSize += fileMeta.fileSize();
+                    partitionFileCount++;
+                }
+            }
+            int inferParallelism =
+                    Math.min(
+                            partitionFileCount,
+                            Math.max(1, (int) (partitionFileSize / 
perSubtaskDataSize)));
+            LOGGER.info(
+                    "For partition {}, the total data size is {} bytes, total 
file count is {}, infer parallelism is {}.",
+                    partitionSpec,
+                    partitionFileSize,
+                    partitionFileCount,
+                    inferParallelism);
+
+            Integer scanParallelism = 
options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
+            if (scanParallelism == null) {
+                scanParallelism = inferParallelism;
+            }
             Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
                     IncrementalClusterSplitSource.buildSource(
-                            env,
-                            table,
-                            partitionSpec,
-                            splits,
-                            dvCommitMessage,
-                            
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
+                            env, table, partitionSpec, splits, 
dvCommitMessage, scanParallelism);
 
             // 2.2 cluster in partition
             Integer sinkParallelism = 
options.get(FlinkConnectorOptions.SINK_PARALLELISM);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index bf7087e77e..1abc5e43bb 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -67,7 +67,7 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
 
     @Test
     public void testClusterUnpartitionedTable() throws Exception {
-        FileStoreTable table = createTable(null, 1);
+        FileStoreTable table = createTable(null);
 
         BinaryString randomStr = BinaryString.fromString(randomString(150));
         List<CommitMessage> messages = new ArrayList<>();
@@ -204,7 +204,7 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
 
     @Test
     public void testClusterPartitionedTable() throws Exception {
-        FileStoreTable table = createTable("pt", 1);
+        FileStoreTable table = createTable("pt");
 
         BinaryString randomStr = BinaryString.fromString(randomString(150));
         List<CommitMessage> messages = new ArrayList<>();
@@ -336,7 +336,7 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
 
     @Test
     public void testClusterSpecifyPartition() throws Exception {
-        FileStoreTable table = createTable("pt", 1);
+        FileStoreTable table = createTable("pt");
 
         BinaryString randomStr = BinaryString.fromString(randomString(150));
         List<CommitMessage> messages = new ArrayList<>();
@@ -378,9 +378,9 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
 
     @Test
     public void testClusterHistoryPartition() throws Exception {
-        Map<String, String> options = new HashMap<>();
+        Map<String, String> options = commonOptions();
         options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_IDLE_TIME.key(), 
"3s");
-        FileStoreTable table = createTable("pt", 1, options);
+        FileStoreTable table = createTable("pt", options);
 
         BinaryString randomStr = BinaryString.fromString(randomString(150));
         List<CommitMessage> messages = new ArrayList<>();
@@ -530,13 +530,15 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
 
     @Test
     public void testClusterOnEmptyData() throws Exception {
-        createTable("pt", 1);
+        createTable("pt");
         assertThatCode(() -> 
runAction(Collections.emptyList())).doesNotThrowAnyException();
     }
 
     @Test
     public void testMultiParallelism() throws Exception {
-        FileStoreTable table = createTable(null, 2);
+        Map<String, String> options = commonOptions();
+        options.put("scan.parallelism", "2");
+        FileStoreTable table = createTable(null, options);
 
         BinaryString randomStr = BinaryString.fromString(randomString(150));
         List<CommitMessage> messages = new ArrayList<>();
@@ -577,9 +579,9 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
 
     @Test
     public void testClusterWithDeletionVector() throws Exception {
-        Map<String, String> dynamicOptions = new HashMap<>();
+        Map<String, String> dynamicOptions = commonOptions();
         dynamicOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
-        FileStoreTable table = createTable(null, 1, dynamicOptions);
+        FileStoreTable table = createTable(null, dynamicOptions);
 
         BinaryString randomStr = BinaryString.fromString(randomString(150));
         List<CommitMessage> messages = new ArrayList<>();
@@ -681,16 +683,43 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
         assertThat(splits.get(0).deletionFiles().get().get(0)).isNull();
     }
 
-    protected FileStoreTable createTable(String partitionKeys, int 
sinkParallelism)
-            throws Exception {
-        return createTable(partitionKeys, sinkParallelism, 
Collections.emptyMap());
+    @Test
+    public void testClusterWithInferParallelism() throws Exception {
+        Map<String, String> options = commonOptions();
+        options.remove("scan.parallelism");
+        options.remove("sink.parallelism");
+        options.put(CoreOptions.CLUSTERING_PER_TASK_DATA_SIZE.key(), "50kb");
+        FileStoreTable table = createTable(null, options);
+
+        BinaryString randomStr = BinaryString.fromString(randomString(150));
+        List<CommitMessage> messages = new ArrayList<>();
+
+        // first write, generate 100 files, total size 173kb
+        for (int i = 0; i < 10; i++) {
+            for (int j = 0; j < 10; j++) {
+                messages.addAll(write(GenericRow.of(i, j, randomStr, 0)));
+            }
+        }
+        commit(messages);
+        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {0, 1});
+
+        // first cluster
+        runAction(Collections.emptyList());
+        checkSnapshot(table);
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        assertThat(splits.size()).isEqualTo(1);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(3);
+        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
+    }
+
+    protected FileStoreTable createTable(String partitionKeys) throws 
Exception {
+        return createTable(partitionKeys, commonOptions());
     }
 
-    protected FileStoreTable createTable(
-            String partitionKeys, int sinkParallelism, Map<String, String> 
options)
+    protected FileStoreTable createTable(String partitionKeys, Map<String, 
String> options)
             throws Exception {
         catalog.createDatabase(database, true);
-        catalog.createTable(identifier(), schema(partitionKeys, 
sinkParallelism, options), true);
+        catalog.createTable(identifier(), buildSchema(partitionKeys, options), 
true);
         return (FileStoreTable) catalog.getTable(identifier());
     }
 
@@ -718,8 +747,33 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
         commit.close();
     }
 
-    private static Schema schema(String partitionKeys, int sinkParallelism) {
-        return schema(partitionKeys, sinkParallelism, Collections.emptyMap());
+    private static Schema buildSchema(String partitionKeys, Map<String, 
String> options) {
+        Schema.Builder schemaBuilder = Schema.newBuilder();
+        schemaBuilder.column("a", DataTypes.INT());
+        schemaBuilder.column("b", DataTypes.INT());
+        schemaBuilder.column("c", DataTypes.STRING());
+        schemaBuilder.column("pt", DataTypes.INT());
+        for (String key : options.keySet()) {
+            schemaBuilder.option(key, options.get(key));
+        }
+        if (!StringUtils.isNullOrWhitespaceOnly(partitionKeys)) {
+            schemaBuilder.partitionKeys(partitionKeys);
+        }
+        return schemaBuilder.build();
+    }
+
+    private static Map<String, String> commonOptions() {
+        Map<String, String> options = new HashMap<>();
+        options.put("bucket", "-1");
+        options.put("num-levels", "6");
+        options.put("num-sorted-run.compaction-trigger", "2");
+        options.put("scan.plan-sort-partition", "true");
+        options.put("clustering.columns", "a,b");
+        options.put("clustering.strategy", "zorder");
+        options.put("clustering.incremental", "true");
+        options.put("scan.parallelism", "1");
+        options.put("sink.parallelism", "1");
+        return options;
     }
 
     private static Schema schema(
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 42525cd46d..12490f9c20 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -25,6 +25,8 @@ under the License.
 <suppressions>
                <suppress files="DataTypes.java" checks="MethodNameCheck"/>
 
+               <suppress files="CoreOptions.java" checks="FileLength"/>
+
                <!-- target directory is not relevant for checkstyle -->
                <suppress
                        files="[\\/]target[\\/]"

Reply via email to