This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 36609bb5dc2aee4061a47f7a767630f1f5912d96
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Wed Jun 3 23:03:52 2020 +0200

    [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to 
AsyncCheckpointRunnable
    
    OperatorSnapshotFinalizer already waits and holds this future.
    ChannelStateWriter.getWriteResult() can then be non-idempotent.
    ChannelStateWriter.stop() can then be removed.
---
 .../checkpoint/channel/ChannelStateWriter.java     | 21 ++++++----------
 .../checkpoint/channel/ChannelStateWriterImpl.java | 10 ++------
 .../channel/ChannelStateWriterImplTest.java        | 28 ++++++++++------------
 .../checkpoint/channel/MockChannelStateWriter.java |  6 +----
 .../channel/RecordingChannelStateWriter.java       |  5 ----
 .../runtime/state/ChannelPersistenceITCase.java    |  2 +-
 .../runtime/tasks/AsyncCheckpointRunnable.java     |  6 -----
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    | 15 +-----------
 .../runtime/tasks/LocalStateForwardingTest.java    |  2 --
 9 files changed, 24 insertions(+), 71 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index 5dad559..af2a708 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -123,7 +123,7 @@ public interface ChannelStateWriter extends Closeable {
         * Finalize write of channel state data for the given checkpoint id.
         * Must be called after {@link #start(long, CheckpointOptions)} and all 
of the input data of the given checkpoint added.
         * When both {@link #finishInput} and {@link #finishOutput} were called 
the results can be (eventually) obtained
-        * using {@link #getWriteResult}
+        * using {@link #getAndRemoveWriteResult}
         */
        void finishInput(long checkpointId);
 
@@ -131,24 +131,21 @@ public interface ChannelStateWriter extends Closeable {
         * Finalize write of channel state data for the given checkpoint id.
         * Must be called after {@link #start(long, CheckpointOptions)} and all 
of the output data of the given checkpoint added.
         * When both {@link #finishInput} and {@link #finishOutput} were called 
the results can be (eventually) obtained
-        * using {@link #getWriteResult}
+        * using {@link #getAndRemoveWriteResult}
         */
        void finishOutput(long checkpointId);
 
        /**
         * Aborts the checkpoint and fails pending result for this checkpoint.
+        * @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not 
supposed to be called afterwards.
         */
        void abort(long checkpointId, Throwable cause);
 
        /**
-        * Must be called after {@link #start(long, CheckpointOptions)}.
+        * Must be called after {@link #start(long, CheckpointOptions)} once.
+        * @throws IllegalArgumentException if the passed checkpointId is not 
known.
         */
-       ChannelStateWriteResult getWriteResult(long checkpointId);
-
-       /**
-        * Cleans up the internal state for the given checkpoint.
-        */
-       void stop(long checkpointId);
+       ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) 
throws IllegalArgumentException;
 
        ChannelStateWriter NO_OP = new NoOpChannelStateWriter();
 
@@ -181,16 +178,12 @@ public interface ChannelStateWriter extends Closeable {
                }
 
                @Override
-               public ChannelStateWriteResult getWriteResult(long 
checkpointId) {
+               public ChannelStateWriteResult getAndRemoveWriteResult(long 
checkpointId) {
                        return ChannelStateWriteResult.EMPTY;
                }
 
                @Override
                public void close() {
                }
-
-               @Override
-               public void stop(long checkpointId) {
-               }
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index fc8655c..6158358 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -153,19 +153,13 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
        }
 
        @Override
-       public ChannelStateWriteResult getWriteResult(long checkpointId) {
+       public ChannelStateWriteResult getAndRemoveWriteResult(long 
checkpointId) {
                LOG.debug("{} requested write result, checkpoint {}", taskName, 
checkpointId);
-               ChannelStateWriteResult result = results.get(checkpointId);
+               ChannelStateWriteResult result = results.remove(checkpointId);
                Preconditions.checkArgument(result != null, taskName + " 
channel state write result not found for checkpoint " + checkpointId);
                return result;
        }
 
-       @Override
-       public void stop(long checkpointId) {
-               LOG.debug("{} stopping checkpoint {}", taskName, checkpointId);
-               results.remove(checkpointId);
-       }
-
        public void open() {
                executor.start();
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
index 8c7d7f2..9d7a7ea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
@@ -38,7 +38,6 @@ import static 
org.apache.flink.runtime.state.ChannelPersistenceITCase.getStreamF
 import static org.apache.flink.util.CloseableIterator.ofElements;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -69,9 +68,7 @@ public class ChannelStateWriterImplTest {
                ChannelStateWriteResult result;
                try (ChannelStateWriterImpl writer = openWriter()) {
                        callStart(writer);
-                       result = writer.getWriteResult(CHECKPOINT_ID);
-                       ChannelStateWriteResult result2 = 
writer.getWriteResult(CHECKPOINT_ID);
-                       assertSame(result, result2);
+                       result = writer.getAndRemoveWriteResult(CHECKPOINT_ID);
                        
assertFalse(result.resultSubpartitionStateHandles.isDone());
                        assertFalse(result.inputChannelStateHandles.isDone());
                }
@@ -79,22 +76,12 @@ public class ChannelStateWriterImplTest {
                assertTrue(result.resultSubpartitionStateHandles.isDone());
        }
 
-       @Test(expected = IllegalArgumentException.class)
-       public void testResultCleanup() throws IOException {
-               try (ChannelStateWriterImpl writer = openWriter()) {
-                       callStart(writer);
-                       writer.getWriteResult(CHECKPOINT_ID);
-                       writer.stop(CHECKPOINT_ID);
-                       writer.getWriteResult(CHECKPOINT_ID);
-               }
-       }
-
        @Test
        public void testAbort() throws Exception {
                NetworkBuffer buffer = getBuffer();
                runWithSyncWorker((writer, worker) -> {
                        callStart(writer);
-                       ChannelStateWriteResult result = 
writer.getWriteResult(CHECKPOINT_ID);
+                       ChannelStateWriteResult result = 
writer.getAndRemoveWriteResult(CHECKPOINT_ID);
                        callAddInputData(writer, buffer);
                        callAbort(writer);
                        worker.processAllRequests();
@@ -108,9 +95,18 @@ public class ChannelStateWriterImplTest {
                NetworkBuffer buffer = getBuffer();
                runWithSyncWorker((writer, worker) -> {
                        callStart(writer);
+                       writer.abort(CHECKPOINT_ID, new TestException(), true);
+                       writer.getAndRemoveWriteResult(CHECKPOINT_ID);
+               });
+       }
+
+       @Test
+       public void testAbortDoesNotClearsResults() throws Exception {
+               runWithSyncWorker((writer, worker) -> {
+                       callStart(writer);
                        callAbort(writer);
                        worker.processAllRequests();
-                       writer.getWriteResult(CHECKPOINT_ID);
+                       writer.getAndRemoveWriteResult(CHECKPOINT_ID);
                });
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
index 0a61066d..88bd334 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
@@ -101,7 +101,7 @@ public class MockChannelStateWriter implements 
ChannelStateWriter {
        }
 
        @Override
-       public ChannelStateWriteResult getWriteResult(long checkpointId) {
+       public ChannelStateWriteResult getAndRemoveWriteResult(long 
checkpointId) {
                return channelStateWriteResult;
        }
 
@@ -117,8 +117,4 @@ public class MockChannelStateWriter implements 
ChannelStateWriter {
                
channelStateWriteResult.getInputChannelStateHandles().cancel(false);
                
channelStateWriteResult.getResultSubpartitionStateHandles().cancel(false);
        }
-
-       @Override
-       public void stop(long checkpointId) {
-       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
index d0cfe3f..151e38e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/RecordingChannelStateWriter.java
@@ -81,11 +81,6 @@ public class RecordingChannelStateWriter extends 
MockChannelStateWriter {
                return lastFinishedCheckpointId;
        }
 
-       @Override
-       public void stop(long checkpointId) {
-               lastFinishedCheckpointId = checkpointId;
-       }
-
        public ListMultimap<InputChannelInfo, Buffer> getAddedInput() {
                return addedInput;
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
index b696c77..18caac0f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChannelPersistenceITCase.java
@@ -110,7 +110,7 @@ public class ChannelPersistenceITCase {
                                writer.addOutputData(checkpointId, e.getKey(), 
SEQUENCE_NUMBER_UNKNOWN, e.getValue());
                        }
                        writer.finishOutput(checkpointId);
-                       ChannelStateWriteResult result = 
writer.getWriteResult(checkpointId);
+                       ChannelStateWriteResult result = 
writer.getAndRemoveWriteResult(checkpointId);
                        result.getResultSubpartitionStateHandles().join(); // 
prevent abnormal complete in close
                        return result;
                }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
index e89e962..af10411 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/AsyncCheckpointRunnable.java
@@ -33,7 +33,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.util.Map;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
@@ -60,7 +59,6 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
        private final Map<OperatorID, OperatorSnapshotFutures> 
operatorSnapshotsInProgress;
        private final CheckpointMetaData checkpointMetaData;
        private final CheckpointMetrics checkpointMetrics;
-       private final Future<?> channelWrittenFuture;
        private final long asyncStartNanos;
        private final AtomicReference<AsyncCheckpointState> 
asyncCheckpointState = new AtomicReference<>(AsyncCheckpointState.RUNNING);
 
@@ -68,7 +66,6 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
                        Map<OperatorID, OperatorSnapshotFutures> 
operatorSnapshotsInProgress,
                        CheckpointMetaData checkpointMetaData,
                        CheckpointMetrics checkpointMetrics,
-                       Future<?> channelWrittenFuture,
                        long asyncStartNanos,
                        String taskName,
                        Consumer<AsyncCheckpointRunnable> register,
@@ -79,7 +76,6 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
                this.operatorSnapshotsInProgress = 
checkNotNull(operatorSnapshotsInProgress);
                this.checkpointMetaData = checkNotNull(checkpointMetaData);
                this.checkpointMetrics = checkNotNull(checkpointMetrics);
-               this.channelWrittenFuture = checkNotNull(channelWrittenFuture);
                this.asyncStartNanos = asyncStartNanos;
                this.taskName = checkNotNull(taskName);
                this.registerConsumer = register;
@@ -120,8 +116,6 @@ final class AsyncCheckpointRunnable implements Runnable, 
Closeable {
 
                        
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);
 
-                       channelWrittenFuture.get();
-
                        if 
(asyncCheckpointState.compareAndSet(AsyncCheckpointState.RUNNING, 
AsyncCheckpointState.COMPLETED)) {
 
                                reportCompletedSnapshotStates(
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index d14594f..adfaa44 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -28,7 +28,6 @@ import 
org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter.ChannelStateWriteResult;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
-import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
@@ -65,7 +64,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
 
@@ -432,22 +430,11 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
        }
 
        private void finishAndReportAsync(Map<OperatorID, 
OperatorSnapshotFutures> snapshotFutures, CheckpointMetaData metadata, 
CheckpointMetrics metrics, CheckpointOptions options) {
-               final Future<?> channelWrittenFuture;
-               if (includeChannelState(options)) {
-                       ChannelStateWriteResult writeResult = 
channelStateWriter.getWriteResult(metadata.getCheckpointId());
-                       channelWrittenFuture = CompletableFuture.allOf(
-                                       
writeResult.getInputChannelStateHandles(),
-                                       
writeResult.getResultSubpartitionStateHandles())
-                               .whenComplete((dummy, ex) -> 
channelStateWriter.stop(metadata.getCheckpointId()));
-               } else {
-                       channelWrittenFuture = 
FutureUtils.completedVoidFuture();
-               }
                // we are transferring ownership over snapshotInProgressList 
for cleanup to the thread, active on submit
                executorService.execute(new AsyncCheckpointRunnable(
                        snapshotFutures,
                        metadata,
                        metrics,
-                       channelWrittenFuture,
                        System.nanoTime(),
                        taskName,
                        registerConsumer(),
@@ -490,7 +477,7 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                long started = System.nanoTime();
 
                ChannelStateWriteResult channelStateWriteResult = 
includeChannelState(checkpointOptions) ?
-                                                               
channelStateWriter.getWriteResult(checkpointId) :
+                                                               
channelStateWriter.getAndRemoveWriteResult(checkpointId) :
                                                                
ChannelStateWriteResult.EMPTY;
 
                CheckpointStreamFactory storage = 
checkpointStorage.resolveCheckpointStorageLocation(checkpointId, 
checkpointOptions.getTargetLocation());
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
index 3e0703f..2eabca4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
@@ -59,7 +59,6 @@ import javax.annotation.Nullable;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.RunnableFuture;
@@ -115,7 +114,6 @@ public class LocalStateForwardingTest extends TestLogger {
                        snapshots,
                        checkpointMetaData,
                        checkpointMetrics,
-                       CompletableFuture.completedFuture(null),
                        0L,
                        testStreamTask.getName(),
                        asyncCheckpointRunnable -> {},

Reply via email to