This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 36609bb5dc2aee4061a47f7a767630f1f5912d96 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Wed Jun 3 23:03:52 2020 +0200 [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to AsyncCheckpointRunnable OperatorSnapshotFinalizer already waits and holds this future. ChannelStateWriter.getWriteResult() can then be non-idempotent. ChannelStateWriter.stop() can then be removed. --- .../checkpoint/channel/ChannelStateWriter.java | 21 ++++++---------- .../checkpoint/channel/ChannelStateWriterImpl.java | 10 ++------ .../channel/ChannelStateWriterImplTest.java | 28 ++++++++++------------ .../checkpoint/channel/MockChannelStateWriter.java | 6 +---- .../channel/RecordingChannelStateWriter.java | 5 ---- .../runtime/state/ChannelPersistenceITCase.java | 2 +- .../runtime/tasks/AsyncCheckpointRunnable.java | 6 ----- .../tasks/SubtaskCheckpointCoordinatorImpl.java | 15 +----------- .../runtime/tasks/LocalStateForwardingTest.java | 2 -- 9 files changed, 24 insertions(+), 71 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java index 5dad559..af2a708 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java @@ -123,7 +123,7 @@ public interface ChannelStateWriter extends Closeable { * Finalize write of channel state data for the given checkpoint id. * Must be called after {@link #start(long, CheckpointOptions)} and all of the input data of the given checkpoint added. * When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained - * using {@link #getWriteResult} + * using {@link #getAndRemoveWriteResult} */ void finishInput(long checkpointId); @@ -131,24 +131,21 @@ public interface ChannelStateWriter extends Closeable { * Finalize write of channel state data for the given checkpoint id. * Must be called after {@link #start(long, CheckpointOptions)} and all of the output data of the given checkpoint added. * When both {@link #finishInput} and {@link #finishOutput} were called the results can be (eventually) obtained - * using {@link #getWriteResult} + * using {@link #getAndRemoveWriteResult} */ void finishOutput(long checkpointId); /** * Aborts the checkpoint and fails pending result for this checkpoint. + * @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not supposed to be called afterwards. */ void abort(long checkpointId, Throwable cause); /** - * Must be called after {@link #start(long, CheckpointOptions)}. + * Must be called after {@link #start(long, CheckpointOptions)} once. + * @throws IllegalArgumentException if the passed checkpointId is not known. */ - ChannelStateWriteResult getWriteResult(long checkpointId); - - /** - * Cleans up the internal state for the given checkpoint. - */ - void stop(long checkpointId); + ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) throws IllegalArgumentException; ChannelStateWriter NO_OP = new NoOpChannelStateWriter(); @@ -181,16 +178,12 @@ public interface ChannelStateWriter extends Closeable { } @Override - public ChannelStateWriteResult getWriteResult(long checkpointId) { + public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) { return ChannelStateWriteResult.EMPTY; } @Override public void close() { } - - @Override - public void stop(long checkpointId) { - } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java index fc8655c..6158358 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java @@ -153,19 +153,13 @@ public class ChannelStateWriterImpl implements ChannelStateWriter { } @Override - public ChannelStateWriteResult getWriteResult(long checkpointId) { + public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) { LOG.debug("{} requested write result, checkpoint {}", taskName, checkpointId); - ChannelStateWriteResult result = results.get(checkpointId); + ChannelStateWriteResult result = results.remove(checkpointId); Preconditions.checkArgument(result != null, taskName + " channel state write result not found for checkpoint " + checkpointId); return result; } - @Override - public void stop(long checkpointId) { - LOG.debug("{} stopping checkpoint {}", taskName, checkpointId); - results.remove(checkpointId); - } - public void open() { executor.start(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java index 8c7d7f2..9d7a7ea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java @@ -38,7 +38,6 @@ import static org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamF import static org.apache.flink.util.CloseableIterator.ofElements; import static org.apache.flink.util.ExceptionUtils.findThrowable; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; /** @@ -69,9 +68,7 @@ public class ChannelStateWriterImplTest { ChannelStateWriteResult result; try (ChannelStateWriterImpl writer = openWriter()) { callStart(writer); - result = writer.getWriteResult(CHECKPOINT_ID); - ChannelStateWriteResult result2 = writer.getWriteResult(CHECKPOINT_ID); - assertSame(result, result2); + result = writer.getAndRemoveWriteResult(CHECKPOINT_ID); assertFalse(result.resultSubpartitionStateHandles.isDone()); assertFalse(result.inputChannelStateHandles.isDone()); } @@ -79,22 +76,12 @@ public class ChannelStateWriterImplTest { assertTrue(result.resultSubpartitionStateHandles.isDone()); } - @Test(expected = IllegalArgumentException.class) - public void testResultCleanup() throws IOException { - try (ChannelStateWriterImpl writer = openWriter()) { - callStart(writer); - writer.getWriteResult(CHECKPOINT_ID); - writer.stop(CHECKPOINT_ID); - writer.getWriteResult(CHECKPOINT_ID); - } - } - @Test public void testAbort() throws Exception { NetworkBuffer buffer = getBuffer(); runWithSyncWorker((writer, worker) -> { callStart(writer); - ChannelStateWriteResult result = writer.getWriteResult(CHECKPOINT_ID); + ChannelStateWriteResult result = writer.getAndRemoveWriteResult(CHECKPOINT_ID); callAddInputData(writer, buffer); callAbort(writer); worker.processAllRequests(); @@ -108,9 +95,18 @@ public class ChannelStateWriterImplTest { NetworkBuffer buffer = getBuffer(); runWithSyncWorker((writer, worker) -> { callStart(writer); + writer.abort(CHECKPOINT_ID, new TestException(), true); + writer.getAndRemoveWriteResult(CHECKPOINT_ID); + }); + } + + @Test + public void testAbortDoesNotClearsResults() throws Exception { + runWithSyncWorker((writer, worker) -> { + callStart(writer); callAbort(writer); worker.processAllRequests(); - writer.getWriteResult(CHECKPOINT_ID); + writer.getAndRemoveWriteResult(CHECKPOINT_ID); }); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java index 0a61066d..88bd334 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java @@ -101,7 +101,7 @@ public class MockChannelStateWriter implements ChannelStateWriter { } @Override - public ChannelStateWriteResult getWriteResult(long checkpointId) { + public ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) { return channelStateWriteResult; } @@ -117,8 +117,4 @@ public class MockChannelStateWriter implements ChannelStateWriter { channelStateWriteResult.getInputChannelStateHandles().cancel(false); channelStateWriteResult.getResultSubpartitionStateHandles().cancel(false); } - - @Override - public void stop(long checkpointId) { - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java index d0cfe3f..151e38e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java @@ -81,11 +81,6 @@ public class RecordingChannelStateWriter extends MockChannelStateWriter { return lastFinishedCheckpointId; } - @Override - public void stop(long checkpointId) { - lastFinishedCheckpointId = checkpointId; - } - public ListMultimap<InputChannelInfo, Buffer> getAddedInput() { return addedInput; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java index b696c77..18caac0f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java @@ -110,7 +110,7 @@ public class ChannelPersistenceITCase { writer.addOutputData(checkpointId, e.getKey(), SEQUENCE_NUMBER_UNKNOWN, e.getValue()); } writer.finishOutput(checkpointId); - ChannelStateWriteResult result = writer.getWriteResult(checkpointId); + ChannelStateWriteResult result = writer.getAndRemoveWriteResult(checkpointId); result.getResultSubpartitionStateHandles().join(); // prevent abnormal complete in close return result; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java index e89e962..af10411 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java @@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.util.Map; -import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -60,7 +59,6 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { private final Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress; private final CheckpointMetaData checkpointMetaData; private final CheckpointMetrics checkpointMetrics; - private final Future<?> channelWrittenFuture; private final long asyncStartNanos; private final AtomicReference<AsyncCheckpointState> asyncCheckpointState = new AtomicReference<>(AsyncCheckpointState.RUNNING); @@ -68,7 +66,6 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { Map<OperatorID, OperatorSnapshotFutures> operatorSnapshotsInProgress, CheckpointMetaData checkpointMetaData, CheckpointMetrics checkpointMetrics, - Future<?> channelWrittenFuture, long asyncStartNanos, String taskName, Consumer<AsyncCheckpointRunnable> register, @@ -79,7 +76,6 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { this.operatorSnapshotsInProgress = checkNotNull(operatorSnapshotsInProgress); this.checkpointMetaData = checkNotNull(checkpointMetaData); this.checkpointMetrics = checkNotNull(checkpointMetrics); - this.channelWrittenFuture = checkNotNull(channelWrittenFuture); this.asyncStartNanos = asyncStartNanos; this.taskName = checkNotNull(taskName); this.registerConsumer = register; @@ -120,8 +116,6 @@ final class AsyncCheckpointRunnable implements Runnable, Closeable { checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis); - channelWrittenFuture.get(); - if (asyncCheckpointState.compareAndSet(AsyncCheckpointState.RUNNING, AsyncCheckpointState.COMPLETED)) { reportCompletedSnapshotStates( diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java index d14594f..adfaa44 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java @@ -28,7 +28,6 @@ import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl; -import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -65,7 +64,6 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Supplier; @@ -432,22 +430,11 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { } private void finishAndReportAsync(Map<OperatorID, OperatorSnapshotFutures> snapshotFutures, CheckpointMetaData metadata, CheckpointMetrics metrics, CheckpointOptions options) { - final Future<?> channelWrittenFuture; - if (includeChannelState(options)) { - ChannelStateWriteResult writeResult = channelStateWriter.getWriteResult(metadata.getCheckpointId()); - channelWrittenFuture = CompletableFuture.allOf( - writeResult.getInputChannelStateHandles(), - writeResult.getResultSubpartitionStateHandles()) - .whenComplete((dummy, ex) -> channelStateWriter.stop(metadata.getCheckpointId())); - } else { - channelWrittenFuture = FutureUtils.completedVoidFuture(); - } // we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submit executorService.execute(new AsyncCheckpointRunnable( snapshotFutures, metadata, metrics, - channelWrittenFuture, System.nanoTime(), taskName, registerConsumer(), @@ -490,7 +477,7 @@ class SubtaskCheckpointCoordinatorImpl implements SubtaskCheckpointCoordinator { long started = System.nanoTime(); ChannelStateWriteResult channelStateWriteResult = includeChannelState(checkpointOptions) ? - channelStateWriter.getWriteResult(checkpointId) : + channelStateWriter.getAndRemoveWriteResult(checkpointId) : ChannelStateWriteResult.EMPTY; CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(checkpointId, checkpointOptions.getTargetLocation()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java index 3e0703f..2eabca4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java @@ -59,7 +59,6 @@ import javax.annotation.Nullable; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.RunnableFuture; @@ -115,7 +114,6 @@ public class LocalStateForwardingTest extends TestLogger { snapshots, checkpointMetaData, checkpointMetrics, - CompletableFuture.completedFuture(null), 0L, testStreamTask.getName(), asyncCheckpointRunnable -> {},