This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 6cb8f28cf219f79a36b90a567692fafc5f85a2a2 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Wed Jun 3 14:12:04 2020 +0200 [FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous checkpoints before a new checkpoint is started." This reverts commit 24ff415f1b76392f75dea7c3538558d24fcb7058 which introduced a race condition when task thread and netty thread compete for ChannelStateWriteResult. Instead, next commits fix it by: 1. Map size validation error will be prevented simply by increasing the limit 2. When a checkpoint is subsumed, it's write result will be removed from on future completion --- .../checkpoint/channel/ChannelStateWriter.java | 4 +++- .../checkpoint/channel/ChannelStateWriterImpl.java | 1 - .../channel/ChannelStateWriterImplTest.java | 19 ++++++++----------- 3 files changed, 11 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java index af2a708..02a3a69 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java @@ -179,7 +179,9 @@ public interface ChannelStateWriter extends Closeable { @Override public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) { - return ChannelStateWriteResult.EMPTY; + return new ChannelStateWriteResult( + CompletableFuture.completedFuture(Collections.emptyList()), + CompletableFuture.completedFuture(Collections.emptyList())); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index 6158358..8996b3b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -98,7 +98,6 @@ public class ChannelStateWriterImpl implements ChannelStateWriter { @Override public void start(long checkpointId, CheckpointOptions checkpointOptions) { - results.keySet().forEach(oldCheckpointId -> abort(oldCheckpointId, new Exception("Starting new checkpoint " + checkpointId))); LOG.debug("{} starting checkpoint {} ({})", taskName, checkpointId, checkpointOptions); ChannelStateWriteResult result = new ChannelStateWriteResult(); ChannelStateWriteResult put = results.computeIfAbsent(checkpointId, id -> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java index 9d7a7ea..0dae88e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java @@ -186,19 +186,16 @@ public class ChannelStateWriterImplTest { unwrappingError(TestException.class, () -> callStart(writer)); } - @Test - public void testStartAbortsOldCheckpoints() throws Exception { - int maxCheckpoints = 10; - runWithSyncWorker((writer, worker) -> { - writer.start(0, CheckpointOptions.forCheckpointWithDefaultLocation()); - ChannelStateWriteResult writeResult = writer.getWriteResult(0); - for (int i = 1; i <= maxCheckpoints; i++) { + @Test(expected = IllegalStateException.class) + public void testLimit() throws IOException { + int maxCheckpoints = 3; + try (ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, getStreamFactoryFactory(), maxCheckpoints)) { + writer.open(); + for (int i = 0; i < maxCheckpoints; i++) { writer.start(i, CheckpointOptions.forCheckpointWithDefaultLocation()); - worker.processAllRequests(); - assertTrue(writeResult.isDone()); - writeResult = writer.getWriteResult(i); } - }); + writer.start(maxCheckpoints, CheckpointOptions.forCheckpointWithDefaultLocation()); + } } @Test(expected = IllegalStateException.class)