This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new df9a90b67a Revert "[flink] support infer parallelism for incremental 
clustering (#6624)" (#6637)
df9a90b67a is described below

commit df9a90b67a76d4a244a2829ffd152665f2eb0188
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Nov 20 13:12:51 2025 +0800

    Revert "[flink] support infer parallelism for incremental clustering 
(#6624)" (#6637)
---
 .../shortcodes/generated/core_configuration.html   |  6 --
 .../main/java/org/apache/paimon/CoreOptions.java   | 10 ---
 .../apache/paimon/flink/action/CompactAction.java  | 35 ++-------
 .../action/IncrementalClusterActionITCase.java     | 88 +++++-----------------
 tools/maven/suppressions.xml                       |  2 -
 5 files changed, 23 insertions(+), 118 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index d1c46f2ff6..c3251aa979 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -182,12 +182,6 @@ under the License.
             <td>Boolean</td>
             <td>Whether enable incremental clustering.</td>
         </tr>
-        <tr>
-            <td><h5>clustering.per-subtask.data-size</h5></td>
-            <td style="word-wrap: break-word;">1 gb</td>
-            <td>MemorySize</td>
-            <td>The data size processed by single parallelism.</td>
-        </tr>
         <tr>
             <td><h5>clustering.strategy</h5></td>
             <td style="word-wrap: break-word;">"auto"</td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e702aceace..ccb490ec41 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1967,12 +1967,6 @@ public class CoreOptions implements Serializable {
                     .defaultValue(false)
                     .withDescription("Whether enable incremental clustering.");
 
-    public static final ConfigOption<MemorySize> CLUSTERING_PER_TASK_DATA_SIZE 
=
-            key("clustering.per-subtask.data-size")
-                    .memoryType()
-                    .defaultValue(MemorySize.ofMebiBytes(1024))
-                    .withDescription("The data size processed by single 
parallelism.");
-
     public static final ConfigOption<Integer> 
CLUSTERING_HISTORY_PARTITION_LIMIT =
             key("clustering.history-partition.limit")
                     .intType()
@@ -3155,10 +3149,6 @@ public class CoreOptions implements Serializable {
         return options.get(CLUSTERING_INCREMENTAL);
     }
 
-    public MemorySize clusteringPerTaskDataSize() {
-        return options.get(CLUSTERING_PER_TASK_DATA_SIZE);
-    }
-
     public Duration clusteringHistoryPartitionIdleTime() {
         return options.get(CLUSTERING_HISTORY_PARTITION_IDLE_TIME);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index e77574c7e1..694b67d705 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -41,7 +41,6 @@ import org.apache.paimon.flink.sink.RowDataChannelComputer;
 import org.apache.paimon.flink.sorter.TableSortInfo;
 import org.apache.paimon.flink.sorter.TableSorter;
 import org.apache.paimon.flink.source.CompactorSourceBuilder;
-import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.PartitionPredicate;
@@ -80,7 +79,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.CoreOptions.CLUSTERING_PER_TASK_DATA_SIZE;
 import static 
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -245,9 +243,6 @@ public class CompactAction extends TableActionBase {
                         table.partitionKeys().toArray(new String[0]),
                         table.coreOptions().legacyPartitionName());
 
-        long perSubtaskDataSize = 
table.coreOptions().clusteringPerTaskDataSize().getBytes();
-        LOGGER.info("{} is {} bytes.", CLUSTERING_PER_TASK_DATA_SIZE.key(), 
perSubtaskDataSize);
-
         // 1. pick cluster files for each partition
         Map<BinaryRow, CompactUnit> compactUnits =
                 incrementalClusterManager.prepareForCluster(fullCompaction);
@@ -281,32 +276,14 @@ public class CompactAction extends TableActionBase {
                     partitionComputer.generatePartValues(partition);
 
             // 2.1 generate source for current partition
-            long partitionFileSize = 0L;
-            int partitionFileCount = 0;
-            for (DataSplit split : splits) {
-                for (DataFileMeta fileMeta : split.dataFiles()) {
-                    partitionFileSize += fileMeta.fileSize();
-                    partitionFileCount++;
-                }
-            }
-            int inferParallelism =
-                    Math.min(
-                            partitionFileCount,
-                            Math.max(1, (int) (partitionFileSize / 
perSubtaskDataSize)));
-            LOGGER.info(
-                    "For partition {}, the total data size is {} bytes, total 
file count is {}, infer parallelism is {}.",
-                    partitionSpec,
-                    partitionFileSize,
-                    partitionFileCount,
-                    inferParallelism);
-
-            Integer scanParallelism = 
options.get(FlinkConnectorOptions.SCAN_PARALLELISM);
-            if (scanParallelism == null) {
-                scanParallelism = inferParallelism;
-            }
             Pair<DataStream<RowData>, DataStream<Committable>> sourcePair =
                     IncrementalClusterSplitSource.buildSource(
-                            env, table, partitionSpec, splits, 
dvCommitMessage, scanParallelism);
+                            env,
+                            table,
+                            partitionSpec,
+                            splits,
+                            dvCommitMessage,
+                            
options.get(FlinkConnectorOptions.SCAN_PARALLELISM));
 
             // 2.2 cluster in partition
             Integer sinkParallelism = 
options.get(FlinkConnectorOptions.SINK_PARALLELISM);
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index 1abc5e43bb..bf7087e77e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -67,7 +67,7 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
 
     @Test
     public void testClusterUnpartitionedTable() throws Exception {
-        FileStoreTable table = createTable(null);
+        FileStoreTable table = createTable(null, 1);
 
         BinaryString randomStr = BinaryString.fromString(randomString(150));
         List<CommitMessage> messages = new ArrayList<>();
@@ -204,7 +204,7 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
 
     @Test
     public void testClusterPartitionedTable() throws Exception {
-        FileStoreTable table = createTable("pt");
+        FileStoreTable table = createTable("pt", 1);
 
         BinaryString randomStr = BinaryString.fromString(randomString(150));
         List<CommitMessage> messages = new ArrayList<>();
@@ -336,7 +336,7 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
 
     @Test
     public void testClusterSpecifyPartition() throws Exception {
-        FileStoreTable table = createTable("pt");
+        FileStoreTable table = createTable("pt", 1);
 
         BinaryString randomStr = BinaryString.fromString(randomString(150));
         List<CommitMessage> messages = new ArrayList<>();
@@ -378,9 +378,9 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
 
     @Test
     public void testClusterHistoryPartition() throws Exception {
-        Map<String, String> options = commonOptions();
+        Map<String, String> options = new HashMap<>();
         options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_IDLE_TIME.key(), 
"3s");
-        FileStoreTable table = createTable("pt", options);
+        FileStoreTable table = createTable("pt", 1, options);
 
         BinaryString randomStr = BinaryString.fromString(randomString(150));
         List<CommitMessage> messages = new ArrayList<>();
@@ -530,15 +530,13 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
 
     @Test
     public void testClusterOnEmptyData() throws Exception {
-        createTable("pt");
+        createTable("pt", 1);
         assertThatCode(() -> 
runAction(Collections.emptyList())).doesNotThrowAnyException();
     }
 
     @Test
     public void testMultiParallelism() throws Exception {
-        Map<String, String> options = commonOptions();
-        options.put("scan.parallelism", "2");
-        FileStoreTable table = createTable(null, options);
+        FileStoreTable table = createTable(null, 2);
 
         BinaryString randomStr = BinaryString.fromString(randomString(150));
         List<CommitMessage> messages = new ArrayList<>();
@@ -579,9 +577,9 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
 
     @Test
     public void testClusterWithDeletionVector() throws Exception {
-        Map<String, String> dynamicOptions = commonOptions();
+        Map<String, String> dynamicOptions = new HashMap<>();
         dynamicOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
-        FileStoreTable table = createTable(null, dynamicOptions);
+        FileStoreTable table = createTable(null, 1, dynamicOptions);
 
         BinaryString randomStr = BinaryString.fromString(randomString(150));
         List<CommitMessage> messages = new ArrayList<>();
@@ -683,43 +681,16 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
         assertThat(splits.get(0).deletionFiles().get().get(0)).isNull();
     }
 
-    @Test
-    public void testClusterWithInferParallelism() throws Exception {
-        Map<String, String> options = commonOptions();
-        options.remove("scan.parallelism");
-        options.remove("sink.parallelism");
-        options.put(CoreOptions.CLUSTERING_PER_TASK_DATA_SIZE.key(), "50kb");
-        FileStoreTable table = createTable(null, options);
-
-        BinaryString randomStr = BinaryString.fromString(randomString(150));
-        List<CommitMessage> messages = new ArrayList<>();
-
-        // first write, generate 100 files, total size 173kb
-        for (int i = 0; i < 10; i++) {
-            for (int j = 0; j < 10; j++) {
-                messages.addAll(write(GenericRow.of(i, j, randomStr, 0)));
-            }
-        }
-        commit(messages);
-        ReadBuilder readBuilder = table.newReadBuilder().withProjection(new 
int[] {0, 1});
-
-        // first cluster
-        runAction(Collections.emptyList());
-        checkSnapshot(table);
-        List<Split> splits = readBuilder.newScan().plan().splits();
-        assertThat(splits.size()).isEqualTo(1);
-        assertThat(((DataSplit) 
splits.get(0)).dataFiles().size()).isEqualTo(3);
-        assertThat(((DataSplit) 
splits.get(0)).dataFiles().get(0).level()).isEqualTo(5);
-    }
-
-    protected FileStoreTable createTable(String partitionKeys) throws 
Exception {
-        return createTable(partitionKeys, commonOptions());
+    protected FileStoreTable createTable(String partitionKeys, int 
sinkParallelism)
+            throws Exception {
+        return createTable(partitionKeys, sinkParallelism, 
Collections.emptyMap());
     }
 
-    protected FileStoreTable createTable(String partitionKeys, Map<String, 
String> options)
+    protected FileStoreTable createTable(
+            String partitionKeys, int sinkParallelism, Map<String, String> 
options)
             throws Exception {
         catalog.createDatabase(database, true);
-        catalog.createTable(identifier(), buildSchema(partitionKeys, options), 
true);
+        catalog.createTable(identifier(), schema(partitionKeys, 
sinkParallelism, options), true);
         return (FileStoreTable) catalog.getTable(identifier());
     }
 
@@ -747,33 +718,8 @@ public class IncrementalClusterActionITCase extends 
ActionITCaseBase {
         commit.close();
     }
 
-    private static Schema buildSchema(String partitionKeys, Map<String, 
String> options) {
-        Schema.Builder schemaBuilder = Schema.newBuilder();
-        schemaBuilder.column("a", DataTypes.INT());
-        schemaBuilder.column("b", DataTypes.INT());
-        schemaBuilder.column("c", DataTypes.STRING());
-        schemaBuilder.column("pt", DataTypes.INT());
-        for (String key : options.keySet()) {
-            schemaBuilder.option(key, options.get(key));
-        }
-        if (!StringUtils.isNullOrWhitespaceOnly(partitionKeys)) {
-            schemaBuilder.partitionKeys(partitionKeys);
-        }
-        return schemaBuilder.build();
-    }
-
-    private static Map<String, String> commonOptions() {
-        Map<String, String> options = new HashMap<>();
-        options.put("bucket", "-1");
-        options.put("num-levels", "6");
-        options.put("num-sorted-run.compaction-trigger", "2");
-        options.put("scan.plan-sort-partition", "true");
-        options.put("clustering.columns", "a,b");
-        options.put("clustering.strategy", "zorder");
-        options.put("clustering.incremental", "true");
-        options.put("scan.parallelism", "1");
-        options.put("sink.parallelism", "1");
-        return options;
+    private static Schema schema(String partitionKeys, int sinkParallelism) {
+        return schema(partitionKeys, sinkParallelism, Collections.emptyMap());
     }
 
     private static Schema schema(
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 12490f9c20..42525cd46d 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -25,8 +25,6 @@ under the License.
 <suppressions>
                <suppress files="DataTypes.java" checks="MethodNameCheck"/>
 
-               <suppress files="CoreOptions.java" checks="FileLength"/>
-
                <!-- target directory is not relevant for checkstyle -->
                <suppress
                        files="[\\/]target[\\/]"

Reply via email to