AHeise commented on a change in pull request #12460: URL: https://github.com/apache/flink/pull/12460#discussion_r435861937
########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java ########## @@ -512,6 +512,123 @@ public void testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except } } + /** + * Tests {@link CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)} + * abort the current pending checkpoint triggered by + * {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}. + */ + @Test + public void testProcessCancellationBarrierWitchPendingCheckpoint() throws Exception { Review comment: How about naming it `testProcessCancellationBarrierAfterNotifyBarrierReceived` and the other test `testProcessCancellationBarrierAfterProcessBarrier`? ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java ########## @@ -512,6 +512,123 @@ public void testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except } } + /** + * Tests {@link CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)} + * abort the current pending checkpoint triggered by + * {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}. + */ + @Test + public void testProcessCancellationBarrierWitchPendingCheckpoint() throws Exception { + final long checkpointId = 0L; Review comment: nit: extract `DEFAULT_CHECKPOINT_ID`? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java ########## @@ -379,12 +376,38 @@ synchronized void resetReceivedBarriers(long checkpointId) { return allBarriersReceivedFuture; } - synchronized void onChannelClosed() { + synchronized boolean onChannelClosed() throws IOException { numOpenChannels--; + + if (numBarriersReceived > 0) { + resetReceivedBarriers(); + notifyAbort(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM)); + return true; + } + return false; + } + + synchronized boolean setCancelledCheckpointId(long canceledCheckpointId) { + boolean shouldAbort = false; + if (canceledCheckpointId > currentReceivedCheckpointId) { + currentReceivedCheckpointId = canceledCheckpointId; + shouldAbort = true; + Review comment: nit: weird newline ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java ########## @@ -379,12 +376,38 @@ synchronized void resetReceivedBarriers(long checkpointId) { return allBarriersReceivedFuture; } - synchronized void onChannelClosed() { + synchronized boolean onChannelClosed() throws IOException { numOpenChannels--; + + if (numBarriersReceived > 0) { + resetReceivedBarriers(); + notifyAbort(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM)); + return true; + } + return false; + } + + synchronized boolean setCancelledCheckpointId(long canceledCheckpointId) { + boolean shouldAbort = false; Review comment: nit: shouldAbort can be inlined for improved readability (in my eyes). ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java ########## @@ -512,6 +512,123 @@ public void testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except } } + /** + * Tests {@link CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)} + * abort the current pending checkpoint triggered by + * {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}. + */ + @Test + public void testProcessCancellationBarrierWitchPendingCheckpoint() throws Exception { + final long checkpointId = 0L; + final ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable(); + final CheckpointBarrierUnaligner handler = new CheckpointBarrierUnaligner( + new int[] { 1 }, ChannelStateWriter.NO_OP, "test", invokable); + + ThreadSafeUnaligner unaligner = handler.getThreadSafeUnaligner(); + // should trigger respective checkpoint + unaligner.notifyBarrierReceived(buildCheckpointBarrier(checkpointId), new InputChannelInfo(0, 0)); + + assertFalse(handler.isCheckpointPending()); + assertTrue(unaligner.isCheckpointPending()); + assertEquals(-1L, handler.getLatestCheckpointId()); + assertEquals(checkpointId, unaligner.getCurrentCheckpointId()); + + testProcessCancellationBarrier(handler, invokable, checkpointId); + } + + /** + * Tests {@link CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)} + * abort the current pending checkpoint triggered by + * {@link CheckpointBarrierUnaligner#processBarrier(CheckpointBarrier, int)}. + */ + @Test + public void testProcessCancellationBarrierWitchPendingCheckpoint2() throws Exception { + final long checkpointId = 0L; + final ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable(); + final CheckpointBarrierUnaligner handler = new CheckpointBarrierUnaligner( + new int[] { 1 }, ChannelStateWriter.NO_OP, "test", invokable); + + // should trigger respective checkpoint + handler.processBarrier(buildCheckpointBarrier(checkpointId), 0); + + assertTrue(handler.isCheckpointPending()); + assertTrue(handler.getThreadSafeUnaligner().isCheckpointPending()); + assertEquals(checkpointId, handler.getLatestCheckpointId()); + assertEquals(checkpointId, handler.getThreadSafeUnaligner().getCurrentCheckpointId()); + + testProcessCancellationBarrier(handler, invokable, checkpointId); + } + + @Test + public void testProcessCancellationBarrierBeforeProcessAndReceiveBarrier() throws Exception { + final long checkpointId = 0L; + final ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable(); + final CheckpointBarrierUnaligner handler = new CheckpointBarrierUnaligner( + new int[] { 1 }, ChannelStateWriter.NO_OP, "test", invokable); + + handler.processCancellationBarrier(new CancelCheckpointMarker(checkpointId)); + + verifyTriggeredCheckpoint(handler, invokable, checkpointId); + + // it would not trigger checkpoint since the respective cancellation barrier already happened before + handler.processBarrier(buildCheckpointBarrier(checkpointId), 0); + handler.getThreadSafeUnaligner().notifyBarrierReceived(buildCheckpointBarrier(checkpointId), new InputChannelInfo(0, 0)); + + verifyTriggeredCheckpoint(handler, invokable, checkpointId); + } + + private void testProcessCancellationBarrier( + CheckpointBarrierUnaligner handler, + ValidatingCheckpointInvokable invokable, + long currentCheckpointId) throws Exception { + + // should abort current checkpoint while processing CancelCheckpointMarker + handler.processCancellationBarrier(new CancelCheckpointMarker(currentCheckpointId)); + verifyTriggeredCheckpoint(handler, invokable, currentCheckpointId); + + final long canceledCheckpointId = 1L; + // should update current checkpoint id and abort notification while processing CancelCheckpointMarker + handler.processCancellationBarrier(new CancelCheckpointMarker(canceledCheckpointId)); + verifyTriggeredCheckpoint(handler, invokable, canceledCheckpointId); Review comment: Would it make sense to extract these assertions in separate test cases? There might be a bias in the implementation, when the checkpoint has already been canceled. It would only add two test cases afaik. ########## File path: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java ########## @@ -512,6 +512,123 @@ public void testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except } } + /** + * Tests {@link CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)} + * abort the current pending checkpoint triggered by + * {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}. + */ + @Test + public void testProcessCancellationBarrierWitchPendingCheckpoint() throws Exception { Review comment: typo: witch -> with ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org