This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c95dcec412 [flink] slightly refactor IncrementalClusterActionITCase
(#6639)
c95dcec412 is described below
commit c95dcec412c1f48569ce03ad9478e06cdbc9392f
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Nov 20 15:31:40 2025 +0800
[flink] slightly refactor IncrementalClusterActionITCase (#6639)
---
.../action/IncrementalClusterActionITCase.java | 60 +++++++++++-----------
tools/maven/suppressions.xml | 2 +
2 files changed, 32 insertions(+), 30 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
index bf7087e77e..6c80070528 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/IncrementalClusterActionITCase.java
@@ -67,7 +67,7 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterUnpartitionedTable() throws Exception {
- FileStoreTable table = createTable(null, 1);
+ FileStoreTable table = createTable(null);
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -204,7 +204,7 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterPartitionedTable() throws Exception {
- FileStoreTable table = createTable("pt", 1);
+ FileStoreTable table = createTable("pt");
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -336,7 +336,7 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterSpecifyPartition() throws Exception {
- FileStoreTable table = createTable("pt", 1);
+ FileStoreTable table = createTable("pt");
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -378,9 +378,9 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterHistoryPartition() throws Exception {
- Map<String, String> options = new HashMap<>();
+ Map<String, String> options = commonOptions();
options.put(CoreOptions.CLUSTERING_HISTORY_PARTITION_IDLE_TIME.key(),
"3s");
- FileStoreTable table = createTable("pt", 1, options);
+ FileStoreTable table = createTable("pt", options);
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -530,13 +530,15 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterOnEmptyData() throws Exception {
- createTable("pt", 1);
+ createTable("pt");
assertThatCode(() ->
runAction(Collections.emptyList())).doesNotThrowAnyException();
}
@Test
public void testMultiParallelism() throws Exception {
- FileStoreTable table = createTable(null, 2);
+ Map<String, String> options = commonOptions();
+ options.put("scan.parallelism", "2");
+ FileStoreTable table = createTable(null, options);
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -577,9 +579,9 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
@Test
public void testClusterWithDeletionVector() throws Exception {
- Map<String, String> dynamicOptions = new HashMap<>();
+ Map<String, String> dynamicOptions = commonOptions();
dynamicOptions.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
- FileStoreTable table = createTable(null, 1, dynamicOptions);
+ FileStoreTable table = createTable(null, dynamicOptions);
BinaryString randomStr = BinaryString.fromString(randomString(150));
List<CommitMessage> messages = new ArrayList<>();
@@ -681,16 +683,14 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
assertThat(splits.get(0).deletionFiles().get().get(0)).isNull();
}
- protected FileStoreTable createTable(String partitionKeys, int
sinkParallelism)
- throws Exception {
- return createTable(partitionKeys, sinkParallelism,
Collections.emptyMap());
+ protected FileStoreTable createTable(String partitionKeys) throws
Exception {
+ return createTable(partitionKeys, commonOptions());
}
- protected FileStoreTable createTable(
- String partitionKeys, int sinkParallelism, Map<String, String>
options)
+ protected FileStoreTable createTable(String partitionKeys, Map<String,
String> options)
throws Exception {
catalog.createDatabase(database, true);
- catalog.createTable(identifier(), schema(partitionKeys,
sinkParallelism, options), true);
+ catalog.createTable(identifier(), buildSchema(partitionKeys, options),
true);
return (FileStoreTable) catalog.getTable(identifier());
}
@@ -718,26 +718,12 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
commit.close();
}
- private static Schema schema(String partitionKeys, int sinkParallelism) {
- return schema(partitionKeys, sinkParallelism, Collections.emptyMap());
- }
-
- private static Schema schema(
- String partitionKeys, int sinkParallelism, Map<String, String>
options) {
+ private static Schema buildSchema(String partitionKeys, Map<String,
String> options) {
Schema.Builder schemaBuilder = Schema.newBuilder();
schemaBuilder.column("a", DataTypes.INT());
schemaBuilder.column("b", DataTypes.INT());
schemaBuilder.column("c", DataTypes.STRING());
schemaBuilder.column("pt", DataTypes.INT());
- schemaBuilder.option("bucket", "-1");
- schemaBuilder.option("num-levels", "6");
- schemaBuilder.option("num-sorted-run.compaction-trigger", "2");
- schemaBuilder.option("scan.plan-sort-partition", "true");
- schemaBuilder.option("clustering.columns", "a,b");
- schemaBuilder.option("clustering.strategy", "zorder");
- schemaBuilder.option("clustering.incremental", "true");
- schemaBuilder.option("scan.parallelism", "1");
- schemaBuilder.option("sink.parallelism",
String.valueOf(sinkParallelism));
for (String key : options.keySet()) {
schemaBuilder.option(key, options.get(key));
}
@@ -747,6 +733,20 @@ public class IncrementalClusterActionITCase extends
ActionITCaseBase {
return schemaBuilder.build();
}
+ private static Map<String, String> commonOptions() {
+ Map<String, String> options = new HashMap<>();
+ options.put("bucket", "-1");
+ options.put("num-levels", "6");
+ options.put("num-sorted-run.compaction-trigger", "2");
+ options.put("scan.plan-sort-partition", "true");
+ options.put("clustering.columns", "a,b");
+ options.put("clustering.strategy", "zorder");
+ options.put("clustering.incremental", "true");
+ options.put("scan.parallelism", "1");
+ options.put("sink.parallelism", "1");
+ return options;
+ }
+
private static String randomString(int length) {
String chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
StringBuilder sb = new StringBuilder(length);
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 42525cd46d..12490f9c20 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -25,6 +25,8 @@ under the License.
<suppressions>
<suppress files="DataTypes.java" checks="MethodNameCheck"/>
+ <suppress files="CoreOptions.java" checks="FileLength"/>
+
<!-- target directory is not relevant for checkstyle -->
<suppress
files="[\\/]target[\\/]"