zhijiangW commented on a change in pull request #12460: URL: https://github.com/apache/flink/pull/12460#discussion_r436055041
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java ########## @@ -512,6 +512,123 @@ public void testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except } } + /** + * Tests {@link CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)} + * abort the current pending checkpoint triggered by + * {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}. + */ + @Test + public void testProcessCancellationBarrierWitchPendingCheckpoint() throws Exception { + final long checkpointId = 0L; + final ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable(); + final CheckpointBarrierUnaligner handler = new CheckpointBarrierUnaligner( + new int[] { 1 }, ChannelStateWriter.NO_OP, "test", invokable); + + ThreadSafeUnaligner unaligner = handler.getThreadSafeUnaligner(); + // should trigger respective checkpoint + unaligner.notifyBarrierReceived(buildCheckpointBarrier(checkpointId), new InputChannelInfo(0, 0)); + + assertFalse(handler.isCheckpointPending()); + assertTrue(unaligner.isCheckpointPending()); + assertEquals(-1L, handler.getLatestCheckpointId()); + assertEquals(checkpointId, unaligner.getCurrentCheckpointId()); + + testProcessCancellationBarrier(handler, invokable, checkpointId); + } + + /** + * Tests {@link CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)} + * abort the current pending checkpoint triggered by + * {@link CheckpointBarrierUnaligner#processBarrier(CheckpointBarrier, int)}. + */ + @Test + public void testProcessCancellationBarrierWitchPendingCheckpoint2() throws Exception { + final long checkpointId = 0L; + final ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable(); + final CheckpointBarrierUnaligner handler = new CheckpointBarrierUnaligner( + new int[] { 1 }, ChannelStateWriter.NO_OP, "test", invokable); + + // should trigger respective checkpoint + handler.processBarrier(buildCheckpointBarrier(checkpointId), 0); + + assertTrue(handler.isCheckpointPending()); + assertTrue(handler.getThreadSafeUnaligner().isCheckpointPending()); + assertEquals(checkpointId, handler.getLatestCheckpointId()); + assertEquals(checkpointId, handler.getThreadSafeUnaligner().getCurrentCheckpointId()); + + testProcessCancellationBarrier(handler, invokable, checkpointId); + } + + @Test + public void testProcessCancellationBarrierBeforeProcessAndReceiveBarrier() throws Exception { + final long checkpointId = 0L; + final ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable(); + final CheckpointBarrierUnaligner handler = new CheckpointBarrierUnaligner( + new int[] { 1 }, ChannelStateWriter.NO_OP, "test", invokable); + + handler.processCancellationBarrier(new CancelCheckpointMarker(checkpointId)); + + verifyTriggeredCheckpoint(handler, invokable, checkpointId); + + // it would not trigger checkpoint since the respective cancellation barrier already happened before + handler.processBarrier(buildCheckpointBarrier(checkpointId), 0); + handler.getThreadSafeUnaligner().notifyBarrierReceived(buildCheckpointBarrier(checkpointId), new InputChannelInfo(0, 0)); + + verifyTriggeredCheckpoint(handler, invokable, checkpointId); + } + + private void testProcessCancellationBarrier( + CheckpointBarrierUnaligner handler, + ValidatingCheckpointInvokable invokable, + long currentCheckpointId) throws Exception { + + // should abort current checkpoint while processing CancelCheckpointMarker + handler.processCancellationBarrier(new CancelCheckpointMarker(currentCheckpointId)); + verifyTriggeredCheckpoint(handler, invokable, currentCheckpointId); + + final long canceledCheckpointId = 1L; + // should update current checkpoint id and abort notification while processing CancelCheckpointMarker + handler.processCancellationBarrier(new CancelCheckpointMarker(canceledCheckpointId)); + verifyTriggeredCheckpoint(handler, invokable, canceledCheckpointId); Review comment: FYI: I take a random way to cover both cases for avoiding some duplicated codes. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org