This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 13cd198fd [FLINK-39363][test] Fix flaky test PaimonSinkITCase (#4366)
13cd198fd is described below
commit 13cd198fde65cc7e383bdc3387650f11d8f5faec
Author: Jia Fan <[email protected]>
AuthorDate: Wed Apr 8 21:12:19 2026 +0800
[FLINK-39363][test] Fix flaky test PaimonSinkITCase (#4366)
---
.../cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java | 17 ++++++++---------
1 file changed, 8 insertions(+), 9 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
index d3277e3aa..53764cf5a 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -133,7 +133,7 @@ public class PaimonSinkITCase {
private final TableId table1 = TableId.tableId("test", "table1");
private final TableId table2 = TableId.tableId("test", "table2");
- private static int checkpointId = 1;
+ private int checkpointId = 1;
public static final String TEST_DATABASE = "test";
private static final String HADOOP_CONF_DIR =
@@ -756,18 +756,17 @@ public class PaimonSinkITCase {
.containsExactlyInAnyOrder(Row.ofKind(RowKind.INSERT, "1",
"1"));
}
- private static void commit(
- PaimonWriter<Event> writer, Committer<MultiTableCommittable>
committer)
+ private void commit(PaimonWriter<Event> writer,
Committer<MultiTableCommittable> committer)
throws IOException, InterruptedException {
Collection<Committer.CommitRequest<MultiTableCommittable>>
commitRequests =
writer.prepareCommit().stream()
- .map(PaimonSinkITCase::correctCheckpointId)
+ .map(this::correctCheckpointId)
.map(MockCommitRequestImpl::new)
.collect(Collectors.toList());
committer.commit(commitRequests);
}
- private static void writeAndCommit(
+ private void writeAndCommit(
PaimonWriter<Event> writer, Committer<MultiTableCommittable>
committer, Event... events)
throws IOException, InterruptedException {
for (Event event : events) {
@@ -777,7 +776,7 @@ public class PaimonSinkITCase {
commit(writer, committer);
}
- private static void writeAndCommit(
+ private void writeAndCommit(
BucketAssignOperator bucketAssignOperator,
PaimonWriter<Event> writer,
Committer<MultiTableCommittable> committer,
@@ -868,7 +867,7 @@ public class PaimonSinkITCase {
writer.flush(false);
Collection<Committer.CommitRequest<MultiTableCommittable>>
commitRequests =
writer.prepareCommit().stream()
- .map(PaimonSinkITCase::correctCheckpointId)
+ .map(this::correctCheckpointId)
.map(MockCommitRequestImpl::new)
.collect(Collectors.toList());
committer.commit(commitRequests);
@@ -900,7 +899,7 @@ public class PaimonSinkITCase {
// Checkpoint id start from 1
committer.commit(
writer.prepareCommit().stream()
- .map(PaimonSinkITCase::correctCheckpointId)
+ .map(this::correctCheckpointId)
.map(MockCommitRequestImpl::new)
.collect(Collectors.toList()));
}
@@ -1039,7 +1038,7 @@ public class PaimonSinkITCase {
env.execute("runJobWithEvents").getJobExecutionResult();
}
- private static MultiTableCommittable
correctCheckpointId(MultiTableCommittable committable) {
+ private MultiTableCommittable correctCheckpointId(MultiTableCommittable
committable) {
// update the right checkpointId for MultiTableCommittable
return new MultiTableCommittable(
committable.getDatabase(),