This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 74f53ebf45 [Flink] Do not mark done partition if job failover from
checkpoint. (#5581)
74f53ebf45 is described below
commit 74f53ebf453eee491067ee129e8e3b28e1486732
Author: wangwj <[email protected]>
AuthorDate: Wed May 14 12:05:34 2025 +0800
[Flink] Do not mark done partition if job failover from checkpoint. (#5581)
---
.../org/apache/paimon/flink/sink/Committer.java | 10 +++++++---
.../paimon/flink/sink/CommitterOperator.java | 2 +-
.../RestoreAndFailCommittableStateManager.java | 2 +-
.../apache/paimon/flink/sink/StoreCommitter.java | 7 +++++--
.../paimon/flink/sink/StoreMultiCommitter.java | 7 ++++---
.../flink/sink/partition/PartitionListener.java | 2 +-
.../flink/sink/partition/PartitionListeners.java | 9 ++++++++-
.../flink/sink/partition/PartitionMarkDone.java | 6 +++++-
.../sink/partition/ReportPartStatsListener.java | 3 ++-
.../sink/partition/PartitionMarkDoneTest.java | 23 ++++++++++++++++++----
10 files changed, 53 insertions(+), 18 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index 23c6c7faeb..4679318b7e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -53,11 +53,15 @@ public interface Committer<CommitT, GlobalCommitT> extends
AutoCloseable {
* Filter out all {@link GlobalCommitT} which have committed, and commit
the remaining {@link
* GlobalCommitT}.
*/
- int filterAndCommit(List<GlobalCommitT> globalCommittables, boolean
checkAppendFiles)
+ int filterAndCommit(
+ List<GlobalCommitT> globalCommittables,
+ boolean checkAppendFiles,
+ boolean recoveryFromState)
throws IOException;
- default int filterAndCommit(List<GlobalCommitT> globalCommittables) throws
IOException {
- return filterAndCommit(globalCommittables, true);
+ default int filterAndCommitFromState(List<GlobalCommitT>
globalCommittables)
+ throws IOException {
+ return filterAndCommit(globalCommittables, true, true);
}
Map<Long, List<CommitT>> groupByCheckpoint(Collection<CommitT>
committables);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index 6354c0ecd0..b5c73f17f9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -210,7 +210,7 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
// So when `endInput` is called, we must check if the
corresponding snapshot exists.
// However, if the snapshot does not exist, then append files must
be new files. So
// there is no need to check for duplicated append files.
- committer.filterAndCommit(committables, false);
+ committer.filterAndCommit(committables, false, false);
} else {
committer.commit(committables);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
index be795fbab6..dda1abf90d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java
@@ -79,7 +79,7 @@ public class
RestoreAndFailCommittableStateManager<GlobalCommitT>
private void recover(List<GlobalCommitT> committables, Committer<?,
GlobalCommitT> committer)
throws Exception {
- int numCommitted = committer.filterAndCommit(committables);
+ int numCommitted = committer.filterAndCommitFromState(committables);
if (numCommitted > 0) {
throw new RuntimeException(
"This exception is intentionally thrown "
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index 4908b99317..d3d6f206cc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -115,9 +115,12 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
@Override
public int filterAndCommit(
- List<ManifestCommittable> globalCommittables, boolean
checkAppendFiles) {
+ List<ManifestCommittable> globalCommittables,
+ boolean checkAppendFiles,
+ boolean recoveryFromState) {
int committed = commit.filterAndCommitMultiple(globalCommittables,
checkAppendFiles);
- partitionListeners.notifyCommittable(globalCommittables);
+ partitionListeners.notifyCommittable(globalCommittables,
recoveryFromState);
+
return committed;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index 8ad3e4fb08..886378ef7d 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -178,14 +178,15 @@ public class StoreMultiCommitter
@Override
public int filterAndCommit(
- List<WrappedManifestCommittable> globalCommittables, boolean
checkAppendFiles)
- throws IOException {
+ List<WrappedManifestCommittable> globalCommittables,
+ boolean checkAppendFiles,
+ boolean recoveryFromState) {
int result = 0;
for (Map.Entry<Identifier, List<ManifestCommittable>> entry :
groupByTable(globalCommittables).entrySet()) {
result +=
getStoreCommitter(entry.getKey())
- .filterAndCommit(entry.getValue(),
checkAppendFiles);
+ .filterAndCommit(entry.getValue(),
checkAppendFiles, recoveryFromState);
}
return result;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
index 65d25fbc02..f01df4b870 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListener.java
@@ -26,7 +26,7 @@ import java.util.List;
/** The partition listener. */
public interface PartitionListener extends Closeable {
- void notifyCommittable(List<ManifestCommittable> committables);
+ void notifyCommittable(List<ManifestCommittable> committables, boolean
recoverFromState);
void snapshotState() throws Exception;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
index cbf14da456..4e14e772a7 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
@@ -39,7 +39,14 @@ public class PartitionListeners implements Closeable {
public void notifyCommittable(List<ManifestCommittable> committables) {
for (PartitionListener trigger : listeners) {
- trigger.notifyCommittable(committables);
+ trigger.notifyCommittable(committables, false);
+ }
+ }
+
+ public void notifyCommittable(
+ List<ManifestCommittable> committables, boolean recoveryFromState)
{
+ for (PartitionListener trigger : listeners) {
+ trigger.notifyCommittable(committables, recoveryFromState);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index 7e7aa57495..fad23c6085 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -133,7 +133,11 @@ public class PartitionMarkDone implements
PartitionListener {
}
@Override
- public void notifyCommittable(List<ManifestCommittable> committables) {
+ public void notifyCommittable(
+ List<ManifestCommittable> committables, boolean recoverFromState) {
+ if (recoverFromState) {
+ return;
+ }
if (partitionMarkDoneActionMode ==
PartitionMarkDoneActionMode.WATERMARK) {
markDoneByWatermark(committables);
} else {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
index 45e461405c..effb63c87e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/ReportPartStatsListener.java
@@ -85,7 +85,8 @@ public class ReportPartStatsListener implements
PartitionListener {
this.idleTime = idleTime;
}
- public void notifyCommittable(List<ManifestCommittable> committables) {
+ public void notifyCommittable(
+ List<ManifestCommittable> committables, boolean
recoverFromCheckpoint) {
Set<String> partition = new HashSet<>();
boolean endInput = false;
for (ManifestCommittable committable : committables) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
index 9d71f94a36..349d56c71c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
@@ -58,15 +58,20 @@ class PartitionMarkDoneTest extends TableTestBase {
@Test
public void testTriggerByCompaction() throws Exception {
- innerTest(true);
+ innerTest(true, false);
}
@Test
public void testNotTriggerByCompaction() throws Exception {
- innerTest(false);
+ innerTest(false, false);
}
- private void innerTest(boolean deletionVectors) throws Exception {
+ @Test
+ public void testTriggerWhenRecoveryFromState() throws Exception {
+ innerTest(false, true);
+ }
+
+ private void innerTest(boolean deletionVectors, boolean recoveryFromState)
throws Exception {
Identifier identifier = identifier("T");
Schema schema =
Schema.newBuilder()
@@ -94,6 +99,11 @@ class PartitionMarkDoneTest extends TableTestBase {
table)
.get();
+ if (recoveryFromState) {
+ notifyCommits(markDone, false, true);
+ assertThat(table.fileIO().exists(successFile)).isEqualTo(false);
+ }
+
notifyCommits(markDone, true);
assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors);
@@ -104,6 +114,11 @@ class PartitionMarkDoneTest extends TableTestBase {
}
public static void notifyCommits(PartitionMarkDone markDone, boolean
isCompact) {
+ notifyCommits(markDone, isCompact, false);
+ }
+
+ private static void notifyCommits(
+ PartitionMarkDone markDone, boolean isCompact, boolean
recoveryFromState) {
ManifestCommittable committable = new
ManifestCommittable(Long.MAX_VALUE);
DataFileMeta file = DataFileTestUtils.newFile();
CommitMessageImpl compactMessage;
@@ -127,7 +142,7 @@ class PartitionMarkDoneTest extends TableTestBase {
new IndexIncrement(emptyList()));
}
committable.addFileCommittable(compactMessage);
- markDone.notifyCommittable(singletonList(committable));
+ markDone.notifyCommittable(singletonList(committable),
recoveryFromState);
}
public static class MockOperatorStateStore implements OperatorStateStore {