AHeise commented on a change in pull request #12460:
URL: https://github.com/apache/flink/pull/12460#discussion_r435861937



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -512,6 +512,123 @@ public void 
testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except
                }
        }
 
+       /**
+        * Tests {@link 
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+        * abort the current pending checkpoint triggered by
+        * {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, 
InputChannelInfo)}.
+        */
+       @Test
+       public void testProcessCancellationBarrierWitchPendingCheckpoint() 
throws Exception {

Review comment:
       How about naming it 
`testProcessCancellationBarrierAfterNotifyBarrierReceived` and the other test 
`testProcessCancellationBarrierAfterProcessBarrier`?

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -512,6 +512,123 @@ public void 
testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except
                }
        }
 
+       /**
+        * Tests {@link 
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+        * abort the current pending checkpoint triggered by
+        * {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, 
InputChannelInfo)}.
+        */
+       @Test
+       public void testProcessCancellationBarrierWitchPendingCheckpoint() 
throws Exception {
+               final long checkpointId = 0L;

Review comment:
       nit: extract `DEFAULT_CHECKPOINT_ID`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -379,12 +376,38 @@ synchronized void resetReceivedBarriers(long 
checkpointId) {
                        return allBarriersReceivedFuture;
                }
 
-               synchronized void onChannelClosed() {
+               synchronized boolean onChannelClosed() throws IOException {
                        numOpenChannels--;
+
+                       if (numBarriersReceived > 0) {
+                               resetReceivedBarriers();
+                               notifyAbort(new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+                               return true;
+                       }
+                       return false;
+               }
+
+               synchronized boolean setCancelledCheckpointId(long 
canceledCheckpointId) {
+                       boolean shouldAbort = false;
+                       if (canceledCheckpointId > currentReceivedCheckpointId) 
{
+                               currentReceivedCheckpointId = 
canceledCheckpointId;
+                               shouldAbort = true;
+

Review comment:
       nit: weird newline

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -379,12 +376,38 @@ synchronized void resetReceivedBarriers(long 
checkpointId) {
                        return allBarriersReceivedFuture;
                }
 
-               synchronized void onChannelClosed() {
+               synchronized boolean onChannelClosed() throws IOException {
                        numOpenChannels--;
+
+                       if (numBarriersReceived > 0) {
+                               resetReceivedBarriers();
+                               notifyAbort(new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+                               return true;
+                       }
+                       return false;
+               }
+
+               synchronized boolean setCancelledCheckpointId(long 
canceledCheckpointId) {
+                       boolean shouldAbort = false;

Review comment:
       nit: shouldAbort can be inlined for improved readability (in my eyes).

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -512,6 +512,123 @@ public void 
testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except
                }
        }
 
+       /**
+        * Tests {@link 
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+        * abort the current pending checkpoint triggered by
+        * {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, 
InputChannelInfo)}.
+        */
+       @Test
+       public void testProcessCancellationBarrierWitchPendingCheckpoint() 
throws Exception {
+               final long checkpointId = 0L;
+               final ValidatingCheckpointInvokable invokable = new 
ValidatingCheckpointInvokable();
+               final CheckpointBarrierUnaligner handler = new 
CheckpointBarrierUnaligner(
+                       new int[] { 1 }, ChannelStateWriter.NO_OP, "test", 
invokable);
+
+               ThreadSafeUnaligner unaligner = 
handler.getThreadSafeUnaligner();
+               // should trigger respective checkpoint
+               
unaligner.notifyBarrierReceived(buildCheckpointBarrier(checkpointId), new 
InputChannelInfo(0, 0));
+
+               assertFalse(handler.isCheckpointPending());
+               assertTrue(unaligner.isCheckpointPending());
+               assertEquals(-1L, handler.getLatestCheckpointId());
+               assertEquals(checkpointId, unaligner.getCurrentCheckpointId());
+
+               testProcessCancellationBarrier(handler, invokable, 
checkpointId);
+       }
+
+       /**
+        * Tests {@link 
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+        * abort the current pending checkpoint triggered by
+        * {@link CheckpointBarrierUnaligner#processBarrier(CheckpointBarrier, 
int)}.
+        */
+       @Test
+       public void testProcessCancellationBarrierWitchPendingCheckpoint2() 
throws Exception {
+               final long checkpointId = 0L;
+               final ValidatingCheckpointInvokable invokable = new 
ValidatingCheckpointInvokable();
+               final CheckpointBarrierUnaligner handler = new 
CheckpointBarrierUnaligner(
+                       new int[] { 1 }, ChannelStateWriter.NO_OP, "test", 
invokable);
+
+               // should trigger respective checkpoint
+               handler.processBarrier(buildCheckpointBarrier(checkpointId), 0);
+
+               assertTrue(handler.isCheckpointPending());
+               
assertTrue(handler.getThreadSafeUnaligner().isCheckpointPending());
+               assertEquals(checkpointId, handler.getLatestCheckpointId());
+               assertEquals(checkpointId, 
handler.getThreadSafeUnaligner().getCurrentCheckpointId());
+
+               testProcessCancellationBarrier(handler, invokable, 
checkpointId);
+       }
+
+       @Test
+       public void 
testProcessCancellationBarrierBeforeProcessAndReceiveBarrier() throws Exception 
{
+               final long checkpointId = 0L;
+               final ValidatingCheckpointInvokable invokable = new 
ValidatingCheckpointInvokable();
+               final CheckpointBarrierUnaligner handler = new 
CheckpointBarrierUnaligner(
+                       new int[] { 1 }, ChannelStateWriter.NO_OP, "test", 
invokable);
+
+               handler.processCancellationBarrier(new 
CancelCheckpointMarker(checkpointId));
+
+               verifyTriggeredCheckpoint(handler, invokable, checkpointId);
+
+               // it would not trigger checkpoint since the respective 
cancellation barrier already happened before
+               handler.processBarrier(buildCheckpointBarrier(checkpointId), 0);
+               
handler.getThreadSafeUnaligner().notifyBarrierReceived(buildCheckpointBarrier(checkpointId),
 new InputChannelInfo(0, 0));
+
+               verifyTriggeredCheckpoint(handler, invokable, checkpointId);
+       }
+
+       private void testProcessCancellationBarrier(
+                       CheckpointBarrierUnaligner handler,
+                       ValidatingCheckpointInvokable invokable,
+                       long currentCheckpointId) throws Exception {
+
+               // should abort current checkpoint while processing 
CancelCheckpointMarker
+               handler.processCancellationBarrier(new 
CancelCheckpointMarker(currentCheckpointId));
+               verifyTriggeredCheckpoint(handler, invokable, 
currentCheckpointId);
+
+               final long canceledCheckpointId = 1L;
+               // should update current checkpoint id and abort notification 
while processing CancelCheckpointMarker
+               handler.processCancellationBarrier(new 
CancelCheckpointMarker(canceledCheckpointId));
+               verifyTriggeredCheckpoint(handler, invokable, 
canceledCheckpointId);

Review comment:
       Would it make sense to extract these assertions in separate test cases? 
There might be a bias in the implementation, when the checkpoint has already 
been canceled. It would only add two test cases afaik.

##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java
##########
@@ -512,6 +512,123 @@ public void 
testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Except
                }
        }
 
+       /**
+        * Tests {@link 
CheckpointBarrierUnaligner#processCancellationBarrier(CancelCheckpointMarker)}
+        * abort the current pending checkpoint triggered by
+        * {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, 
InputChannelInfo)}.
+        */
+       @Test
+       public void testProcessCancellationBarrierWitchPendingCheckpoint() 
throws Exception {

Review comment:
       typo: witch -> with




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to