This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch release-1.15 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push: new 34034a6 [FLINK-26151]Avoid inprogressfileRecoverable not be clean up after restoring the bucket 34034a6 is described below commit 34034a6c54f3b23825bc318806c683b3495f72d1 Author: lovewin99 <wangxiaoyon...@qq.com> AuthorDate: Tue Mar 1 10:18:05 2022 +0800 [FLINK-26151]Avoid inprogressfileRecoverable not be clean up after restoring the bucket This closes #18776. --- .../api/functions/sink/filesystem/Bucket.java | 1 + .../api/functions/sink/filesystem/BucketTest.java | 27 ++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java index f1f34ba..8b5d2ab 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java @@ -142,6 +142,7 @@ public class Bucket<IN, BucketID> { bucketId, inProgressFileRecoverable, state.getInProgressFileCreationTime()); + inProgressFileRecoverablesPerCheckpoint.put(Long.MIN_VALUE, inProgressFileRecoverable); } else { // if the writer does not support resume, then we close the // in-progress part and commit it, as done in the case of pending files. diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java index 9fc09cd..245e895 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java @@ -117,6 +117,33 @@ public class BucketTest { assertThat(recoverableWriter, hasCalledDiscard(0)); // we have no in-progress file. } + @Test + public void shouldCleanupOutdatedResumablesAfterResumed() throws Exception { + final File outDir = TEMP_FOLDER.newFolder(); + final Path path = new Path(outDir.toURI()); + + final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path); + final Bucket<String, String> bucketUnderTest = + createBucket(recoverableWriter, path, 0, 0, OutputFileConfig.builder().build()); + + bucketUnderTest.write("test-element", 0L); + final BucketState<String> state0 = bucketUnderTest.onReceptionOfCheckpoint(0L); + assertThat(state0, hasActiveInProgressFile()); + bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L); + assertThat(recoverableWriter, hasCalledDiscard(0)); + + final File newOutDir = TEMP_FOLDER.newFolder(); + final Path newPath = new Path(newOutDir.toURI()); + final TestRecoverableWriter newRecoverableWriter = getRecoverableWriter(newPath); + final Bucket<String, String> bucketAfterResume = + restoreBucket( + newRecoverableWriter, 0, 0, state0, OutputFileConfig.builder().build()); + final BucketState<String> state1 = bucketAfterResume.onReceptionOfCheckpoint(1L); + assertThat(state1, hasActiveInProgressFile()); + bucketAfterResume.onSuccessfulCompletionOfCheckpoint(1L); + assertThat(newRecoverableWriter, hasCalledDiscard(1)); + } + // --------------------------- Checking Restore --------------------------- @Test