This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6cb8f28cf219f79a36b90a567692fafc5f85a2a2
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Wed Jun 3 14:12:04 2020 +0200

    [FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] 
Ensuring that ChannelStateWriter aborts previous checkpoints before a new 
checkpoint is started."
    
    This reverts commit 24ff415f1b76392f75dea7c3538558d24fcb7058
    which introduced a race condition when task thread and netty
    thread compete for ChannelStateWriteResult.
    
    Instead, next commits fix it by:
    1. Map size validation error will be prevented simply by increasing the 
limit
    2. When a checkpoint is subsumed, it's write result will be removed from on 
future completion
---
 .../checkpoint/channel/ChannelStateWriter.java        |  4 +++-
 .../checkpoint/channel/ChannelStateWriterImpl.java    |  1 -
 .../channel/ChannelStateWriterImplTest.java           | 19 ++++++++-----------
 3 files changed, 11 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index af2a708..02a3a69 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -179,7 +179,9 @@ public interface ChannelStateWriter extends Closeable {
 
                @Override
                public ChannelStateWriteResult getAndRemoveWriteResult(long 
checkpointId) {
-                       return ChannelStateWriteResult.EMPTY;
+                       return new ChannelStateWriteResult(
+                               
CompletableFuture.completedFuture(Collections.emptyList()),
+                               
CompletableFuture.completedFuture(Collections.emptyList()));
                }
 
                @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 6158358..8996b3b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -98,7 +98,6 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
 
        @Override
        public void start(long checkpointId, CheckpointOptions 
checkpointOptions) {
-               results.keySet().forEach(oldCheckpointId -> 
abort(oldCheckpointId, new Exception("Starting new checkpoint " + 
checkpointId)));
                LOG.debug("{} starting checkpoint {} ({})", taskName, 
checkpointId, checkpointOptions);
                ChannelStateWriteResult result = new ChannelStateWriteResult();
                ChannelStateWriteResult put = 
results.computeIfAbsent(checkpointId, id -> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
index 9d7a7ea..0dae88e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
@@ -186,19 +186,16 @@ public class ChannelStateWriterImplTest {
                unwrappingError(TestException.class, () -> callStart(writer));
        }
 
-       @Test
-       public void testStartAbortsOldCheckpoints() throws Exception {
-               int maxCheckpoints = 10;
-               runWithSyncWorker((writer, worker) -> {
-                       writer.start(0, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-                       ChannelStateWriteResult writeResult = 
writer.getWriteResult(0);
-                       for (int i = 1; i <= maxCheckpoints; i++) {
+       @Test(expected = IllegalStateException.class)
+       public void testLimit() throws IOException {
+               int maxCheckpoints = 3;
+               try (ChannelStateWriterImpl writer = new 
ChannelStateWriterImpl(TASK_NAME, getStreamFactoryFactory(), maxCheckpoints)) {
+                       writer.open();
+                       for (int i = 0; i < maxCheckpoints; i++) {
                                writer.start(i, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-                               worker.processAllRequests();
-                               assertTrue(writeResult.isDone());
-                               writeResult = writer.getWriteResult(i);
                        }
-               });
+                       writer.start(maxCheckpoints, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+               }
        }
 
        @Test(expected = IllegalStateException.class)

Reply via email to