This is an automated email from the ASF dual-hosted git repository.

gaoyunhaii pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 34034a6  [FLINK-26151]Avoid inprogressfileRecoverable not be clean up 
after restoring the bucket
34034a6 is described below

commit 34034a6c54f3b23825bc318806c683b3495f72d1
Author: lovewin99 <wangxiaoyon...@qq.com>
AuthorDate: Tue Mar 1 10:18:05 2022 +0800

    [FLINK-26151]Avoid inprogressfileRecoverable not be clean up after 
restoring the bucket
    
    This closes #18776.
---
 .../api/functions/sink/filesystem/Bucket.java      |  1 +
 .../api/functions/sink/filesystem/BucketTest.java  | 27 ++++++++++++++++++++++
 2 files changed, 28 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index f1f34ba..8b5d2ab 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -142,6 +142,7 @@ public class Bucket<IN, BucketID> {
                             bucketId,
                             inProgressFileRecoverable,
                             state.getInProgressFileCreationTime());
+            inProgressFileRecoverablesPerCheckpoint.put(Long.MIN_VALUE, 
inProgressFileRecoverable);
         } else {
             // if the writer does not support resume, then we close the
             // in-progress part and commit it, as done in the case of pending 
files.
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
index 9fc09cd..245e895 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -117,6 +117,33 @@ public class BucketTest {
         assertThat(recoverableWriter, hasCalledDiscard(0)); // we have no 
in-progress file.
     }
 
+    @Test
+    public void shouldCleanupOutdatedResumablesAfterResumed() throws Exception 
{
+        final File outDir = TEMP_FOLDER.newFolder();
+        final Path path = new Path(outDir.toURI());
+
+        final TestRecoverableWriter recoverableWriter = 
getRecoverableWriter(path);
+        final Bucket<String, String> bucketUnderTest =
+                createBucket(recoverableWriter, path, 0, 0, 
OutputFileConfig.builder().build());
+
+        bucketUnderTest.write("test-element", 0L);
+        final BucketState<String> state0 = 
bucketUnderTest.onReceptionOfCheckpoint(0L);
+        assertThat(state0, hasActiveInProgressFile());
+        bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
+        assertThat(recoverableWriter, hasCalledDiscard(0));
+
+        final File newOutDir = TEMP_FOLDER.newFolder();
+        final Path newPath = new Path(newOutDir.toURI());
+        final TestRecoverableWriter newRecoverableWriter = 
getRecoverableWriter(newPath);
+        final Bucket<String, String> bucketAfterResume =
+                restoreBucket(
+                        newRecoverableWriter, 0, 0, state0, 
OutputFileConfig.builder().build());
+        final BucketState<String> state1 = 
bucketAfterResume.onReceptionOfCheckpoint(1L);
+        assertThat(state1, hasActiveInProgressFile());
+        bucketAfterResume.onSuccessfulCompletionOfCheckpoint(1L);
+        assertThat(newRecoverableWriter, hasCalledDiscard(1));
+    }
+
     // --------------------------- Checking Restore ---------------------------
 
     @Test

Reply via email to