zhijiangW commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r436055041



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -512,6 +512,123 @@ public void 
testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except
                }
        }
 
+       /**
+        * Tests {@link 
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+        * abort the current pending checkpoint triggered by
+        * {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, 
InputChannelInfo)}.
+        */
+       @Test
+       public void testProcessCancellationBarrierWitchPendingCheckpoint() 
throws Exception {
+               final long checkpointId = 0L;
+               final ValidatingCheckpointInvokable invokable = new 
ValidatingCheckpointInvokable();
+               final CheckpointBarrierUnaligner handler = new 
CheckpointBarrierUnaligner(
+                       new int[] { 1 }, ChannelStateWriter.NO_OP, "test", 
invokable);
+
+               ThreadSafeUnaligner unaligner = 
handler.getThreadSafeUnaligner();
+               // should trigger respective checkpoint
+               
unaligner.notifyBarrierReceived(buildCheckpointBarrier(checkpointId), new 
InputChannelInfo(0, 0));
+
+               assertFalse(handler.isCheckpointPending());
+               assertTrue(unaligner.isCheckpointPending());
+               assertEquals(-1L, handler.getLatestCheckpointId());
+               assertEquals(checkpointId, unaligner.getCurrentCheckpointId());
+
+               testProcessCancellationBarrier(handler, invokable, 
checkpointId);
+       }
+
+       /**
+        * Tests {@link 
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+        * abort the current pending checkpoint triggered by
+        * {@link CheckpointBarrierUnaligner#processBarrier(CheckpointBarrier, 
int)}.
+        */
+       @Test
+       public void testProcessCancellationBarrierWitchPendingCheckpoint2() 
throws Exception {
+               final long checkpointId = 0L;
+               final ValidatingCheckpointInvokable invokable = new 
ValidatingCheckpointInvokable();
+               final CheckpointBarrierUnaligner handler = new 
CheckpointBarrierUnaligner(
+                       new int[] { 1 }, ChannelStateWriter.NO_OP, "test", 
invokable);
+
+               // should trigger respective checkpoint
+               handler.processBarrier(buildCheckpointBarrier(checkpointId), 0);
+
+               assertTrue(handler.isCheckpointPending());
+               
assertTrue(handler.getThreadSafeUnaligner().isCheckpointPending());
+               assertEquals(checkpointId, handler.getLatestCheckpointId());
+               assertEquals(checkpointId, 
handler.getThreadSafeUnaligner().getCurrentCheckpointId());
+
+               testProcessCancellationBarrier(handler, invokable, 
checkpointId);
+       }
+
+       @Test
+       public void 
testProcessCancellationBarrierBeforeProcessAndReceiveBarrier() throws Exception 
{
+               final long checkpointId = 0L;
+               final ValidatingCheckpointInvokable invokable = new 
ValidatingCheckpointInvokable();
+               final CheckpointBarrierUnaligner handler = new 
CheckpointBarrierUnaligner(
+                       new int[] { 1 }, ChannelStateWriter.NO_OP, "test", 
invokable);
+
+               handler.processCancellationBarrier(new 
CancelCheckpointMarker(checkpointId));
+
+               verifyTriggeredCheckpoint(handler, invokable, checkpointId);
+
+               // it would not trigger checkpoint since the respective 
cancellation barrier already happened before
+               handler.processBarrier(buildCheckpointBarrier(checkpointId), 0);
+               
handler.getThreadSafeUnaligner().notifyBarrierReceived(buildCheckpointBarrier(checkpointId),
 new InputChannelInfo(0, 0));
+
+               verifyTriggeredCheckpoint(handler, invokable, checkpointId);
+       }
+
+       private void testProcessCancellationBarrier(
+                       CheckpointBarrierUnaligner handler,
+                       ValidatingCheckpointInvokable invokable,
+                       long currentCheckpointId) throws Exception {
+
+               // should abort current checkpoint while processing 
CancelCheckpointMarker
+               handler.processCancellationBarrier(new 
CancelCheckpointMarker(currentCheckpointId));
+               verifyTriggeredCheckpoint(handler, invokable, 
currentCheckpointId);
+
+               final long canceledCheckpointId = 1L;
+               // should update current checkpoint id and abort notification 
while processing CancelCheckpointMarker
+               handler.processCancellationBarrier(new 
CancelCheckpointMarker(canceledCheckpointId));
+               verifyTriggeredCheckpoint(handler, invokable, 
canceledCheckpointId);

Review comment:
       I take a random way to cover both cases for avoiding some duplicated 
codes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to