This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 0147cf6 [FLINK-13941][fs-connector] Do not delete partial part files from S3 upon restore. 0147cf6 is described below commit 0147cf601701c87dd330898f68b47939f2ef1226 Author: Kostas Kloudas <kklou...@gmail.com> AuthorDate: Mon Sep 2 14:35:57 2019 +0200 [FLINK-13941][fs-connector] Do not delete partial part files from S3 upon restore. --- .../api/functions/sink/filesystem/Bucket.java | 19 +++++++++++-------- .../api/functions/sink/filesystem/BucketTest.java | 22 ---------------------- 2 files changed, 11 insertions(+), 30 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java index 5fe535d..4a996e7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java @@ -150,10 +150,6 @@ public class Bucket<IN, BucketID> { fsWriter.recoverForCommit(resumable).commitAfterRecovery(); } - - if (fsWriter.requiresCleanupOfRecoverableState()) { - fsWriter.cleanupRecoverableState(resumable); - } } private void commitRecoveredPendingFiles(final BucketState<BucketID> state) throws IOException { @@ -316,12 +312,19 @@ public class Bucket<IN, BucketID> { while (it.hasNext()) { final ResumeRecoverable recoverable = it.next().getValue(); - final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable); - it.remove(); - if (LOG.isDebugEnabled() && successfullyDeleted) { - LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId); + // this check is redundant, as we only put entries in the resumablesPerCheckpoint map + // list when the requiresCleanupOfRecoverableState() returns true, but having it makes + // the code more readable. + + if (fsWriter.requiresCleanupOfRecoverableState()) { + final boolean successfullyDeleted = fsWriter.cleanupRecoverableState(recoverable); + + if (LOG.isDebugEnabled() && successfullyDeleted) { + LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", subtaskIndex, bucketId); + } } + it.remove(); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java index 546a08c..583bacf 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/filesystem/BucketTest.java @@ -99,28 +99,6 @@ public class BucketTest { } @Test - public void shouldCleanupResumableAfterRestoring() throws Exception { - final File outDir = TEMP_FOLDER.newFolder(); - final Path path = new Path(outDir.toURI()); - - final TestRecoverableWriter recoverableWriter = getRecoverableWriter(path); - final Bucket<String, String> bucketUnderTest = - createBucket(recoverableWriter, path, 0, 0, new PartFileConfig()); - - bucketUnderTest.write("test-element", 0L); - - final BucketState<String> state = bucketUnderTest.onReceptionOfCheckpoint(0L); - assertThat(state, hasActiveInProgressFile()); - - bucketUnderTest.onSuccessfulCompletionOfCheckpoint(0L); - - final TestRecoverableWriter newRecoverableWriter = getRecoverableWriter(path); - restoreBucket(newRecoverableWriter, 0, 1, state, new PartFileConfig()); - - assertThat(newRecoverableWriter, hasCalledDiscard(1)); // that is for checkpoints 0 and 1 - } - - @Test public void shouldNotCallCleanupWithoutInProgressPartFiles() throws Exception { final File outDir = TEMP_FOLDER.newFolder(); final Path path = new Path(outDir.toURI());