pnowojski commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r435188801



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -154,46 +154,51 @@ public void processBarrier(CheckpointBarrier 
receivedBarrier, int channelIndex)
        @Override
        public void processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
                long cancelledId = cancelBarrier.getCheckpointId();
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("{}: Checkpoint {} canceled, aborting 
alignment.", taskName, cancelledId);
-               }
-
-               if (currentConsumedCheckpointId >= cancelledId && 
!isCheckpointPending()) {
-                       return;
-               }
+               // tag whether we should abort checkpoint from task thread view
+               boolean shouldAbort1 = false;
 
-               if (isCheckpointPending()) {
+               if (cancelledId > currentConsumedCheckpointId) {
+                       currentConsumedCheckpointId = cancelledId;
+                       shouldAbort1 = true;
+               } else if (cancelledId == currentConsumedCheckpointId && 
isCheckpointPending()) {
                        LOG.warn("{}: Received cancellation barrier for 
checkpoint {} before completing current checkpoint {}. " +
                                        "Skipping current checkpoint.",
                                taskName,
                                cancelledId,
                                currentConsumedCheckpointId);
 
                        resetConsumedBarriers();
+                       shouldAbort1 = true;
+               }
+
+               // tag whether we should abort checkpoint from 
threadSafeUnaligner view
+               boolean shouldAbort2 = 
threadSafeUnaligner.setCancelledCheckpointId(cancelledId);

Review comment:
       > If shouldAbort2 true and shouldAbort1 false, that means the 
notifyBarrierReceived triggered by netty thread happen earlier.
   
   But this `notifyBarrierReceived` from netty thread, would enqueue mailbox 
action. 
   
   If the mailbox action hasn't yet been executed, we could mark this 
checkpoint as aborted here, in this method (task thread), and prevent mail from 
executing (task thread).
   
   If the mailbox action has already executed (task thread), it could have left 
the `currentConsumedCheckpointId` field already up to date, so this method 
would already know whether to abort or not from `shouldAbort1`.
   
   Or is it about aborting the checkpoint after `notifyBarrierReceived` from 
netty thread, but before enqueued mailbox action was executed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to