This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new 94ca735  [FLINK-13941][fs-connector] Do not delete partial part files 
from S3 upon restore.
94ca735 is described below

commit 94ca735b9d5a3861ccd7c5deb54c7ca6d67300c3
Author: Kostas Kloudas <kklou...@gmail.com>
AuthorDate: Mon Sep 2 14:35:57 2019 +0200

    [FLINK-13941][fs-connector] Do not delete partial part files from S3 upon 
restore.
---
 .../api/functions/sink/filesystem/Bucket.java      | 19 +++++++++++--------
 .../api/functions/sink/filesystem/BucketTest.java  | 22 ----------------------
 2 files changed, 11 insertions(+), 30 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index 3252d9c..cc6726b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -146,10 +146,6 @@ public class Bucket<IN, BucketID> {
 
                        
fsWriter.recoverForCommit(resumable).commitAfterRecovery();
                }
-
-               if (fsWriter.requiresCleanupOfRecoverableState()) {
-                       fsWriter.cleanupRecoverableState(resumable);
-               }
        }
 
        private void commitRecoveredPendingFiles(final BucketState<BucketID> 
state) throws IOException {
@@ -312,12 +308,19 @@ public class Bucket<IN, BucketID> {
 
                while (it.hasNext()) {
                        final ResumeRecoverable recoverable = 
it.next().getValue();
-                       final boolean successfullyDeleted = 
fsWriter.cleanupRecoverableState(recoverable);
-                       it.remove();
 
-                       if (LOG.isDebugEnabled() && successfullyDeleted) {
-                               LOG.debug("Subtask {} successfully deleted 
incomplete part for bucket id={}.", subtaskIndex, bucketId);
+                       // this check is redundant, as we only put entries in 
the resumablesPerCheckpoint map
+                       // list when the requiresCleanupOfRecoverableState() 
returns true, but having it makes
+                       // the code more readable.
+
+                       if (fsWriter.requiresCleanupOfRecoverableState()) {
+                               final boolean successfullyDeleted = 
fsWriter.cleanupRecoverableState(recoverable);
+
+                               if (LOG.isDebugEnabled() && 
successfullyDeleted) {
+                                       LOG.debug("Subtask {} successfully 
deleted incomplete part for bucket id={}.", subtaskIndex, bucketId);
+                               }
                        }
+                       it.remove();
                }
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
index 308bc31..a2e4582 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java
@@ -99,28 +99,6 @@ public class BucketTest {
        }
 
        @Test
-       public void shouldCleanupResumableAfterRestoring() throws Exception {
-               final File outDir = TEMP_FOLDER.newFolder();
-               final Path path = new Path(outDir.toURI());
-
-               final TestRecoverableWriter recoverableWriter = 
getRecoverableWriter(path);
-               final Bucket<String, String> bucketUnderTest =
-                               createBucket(recoverableWriter, path, 0, 0);
-
-               bucketUnderTest.write("test-element", 0L);
-
-               final BucketState<String> state = 
bucketUnderTest.onReceptionOfCheckpoint(0L);
-               assertThat(state, hasActiveInProgressFile());
-
-               bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L);
-
-               final TestRecoverableWriter newRecoverableWriter = 
getRecoverableWriter(path);
-               restoreBucket(newRecoverableWriter, 0, 1, state);
-
-               assertThat(newRecoverableWriter, hasCalledDiscard(1)); // that 
is for checkpoints 0 and 1
-       }
-
-       @Test
        public void shouldNotCallCleanupWithoutInProgressPartFiles() throws 
Exception {
                final File outDir = TEMP_FOLDER.newFolder();
                final Path path = new Path(outDir.toURI());

Reply via email to