This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d75ec8efe3c31f2a5e1a4572b5cfb8000ddbf67
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Thu Jun 4 10:06:15 2020 +0200

    [FLINK-17869][task][checkpointing] Abort writing of channel state by RPC
    notification
---
 .../flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java | 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 5d7a2c9..d904c87 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -306,6 +306,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                                }
                        }
 
+                       channelStateWriter.abort(checkpointId, new 
CancellationException("checkpoint aborted via notification"), false);
+
                        for (StreamOperatorWrapper<?, ?> operatorWrapper : 
operatorChain.getAllOperators(true)) {
                                try {
                                        
operatorWrapper.getStreamOperator().notifyCheckpointAborted(checkpointId);

Reply via email to