Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2754#discussion_r87043110
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 ---
    @@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() {
        }
     
        @Test
    -   public void testEndOfStreamWhileCheckpoint() {
    +   public void testEndOfStreamWhileCheckpoint() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // one checkpoint
    +                           createBarrier(1, 0), createBarrier(1, 1), 
createBarrier(1, 2),
    +
    +                           // some buffers
    +                           createBuffer(0), createBuffer(0), 
createBuffer(2),
    +
    +                           // start the checkpoint that will be incomplete
    +                           createBarrier(2, 2), createBarrier(2, 0),
    +                           createBuffer(0), createBuffer(2), 
createBuffer(1),
    +
    +                           // close one after the barrier one before the 
barrier
    +                           createEndOfPartition(2), 
createEndOfPartition(1),
    +                           createBuffer(0),
    +
    +                           // final end of stream
    +                           createEndOfPartition(0)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           // data after first checkpoint
    +           check(sequence[3], buffer.getNextNonBlocked());
    +           check(sequence[4], buffer.getNextNonBlocked());
    +           check(sequence[5], buffer.getNextNonBlocked());
    +           assertEquals(1L, buffer.getCurrentCheckpointId());
    +
    +           // alignment of second checkpoint
    +           check(sequence[10], buffer.getNextNonBlocked());
    +           assertEquals(2L, buffer.getCurrentCheckpointId());
    +
    +           // first end-of-partition encountered: checkpoint will not be 
completed
    +           check(sequence[12], buffer.getNextNonBlocked());
    +           check(sequence[8], buffer.getNextNonBlocked());
    +           check(sequence[9], buffer.getNextNonBlocked());
    +           check(sequence[11], buffer.getNextNonBlocked());
    +           check(sequence[13], buffer.getNextNonBlocked());
    +           check(sequence[14], buffer.getNextNonBlocked());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +
    +           checkNoTempFilesRemain();
    +   }
    +
    +   @Test
    +   public void testSingleChannelAbortCheckpoint() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           createBuffer(0),
    +                           createBarrier(1, 0),
    +                           createBuffer(0),
    +                           createBarrier(2, 0),
    +                           createCancellationBarrier(4, 0),
    +                           createBarrier(5, 0),
    +                           createBuffer(0),
    +                           createCancellationBarrier(6, 0),
    +                           createBuffer(0)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           check(sequence[0], buffer.getNextNonBlocked());
    +           check(sequence[2], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           check(sequence[6], buffer.getNextNonBlocked());
    +           assertEquals(5L, buffer.getCurrentCheckpointId());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           check(sequence[8], buffer.getNextNonBlocked());
    +           assertEquals(6L, buffer.getCurrentCheckpointId());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
                
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +   }
    +
    +   @Test
    +   public void testMultiChannelAbortCheckpoint() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // some buffers and a successful checkpoint
    +                           /* 0 */ createBuffer(0), createBuffer(2), 
createBuffer(0),
    +                           /* 3 */ createBarrier(1, 1), createBarrier(1, 
2),
    +                           /* 5 */ createBuffer(2), createBuffer(1),
    +                           /* 7 */ createBarrier(1, 0),
    +                           /* 8 */ createBuffer(0), createBuffer(2),
    +
    +                           // aborted on last barrier
    +                           /* 10 */ createBarrier(2, 0), createBarrier(2, 
2),
    +                           /* 12 */ createBuffer(0), createBuffer(2),
    +                           /* 14 */ createCancellationBarrier(2, 1),
    +
    +                           // successful checkpoint
    +                           /* 15 */ createBuffer(2), createBuffer(1),
    +                           /* 17 */ createBarrier(3, 1), createBarrier(3, 
2), createBarrier(3, 0),
    +
    +                           // abort on first barrier
    +                           /* 20 */ createBuffer(0), createBuffer(1),
    +                           /* 22 */ createCancellationBarrier(4, 1), 
createBarrier(4, 2),
    +                           /* 24 */ createBuffer(0),
    +                           /* 25 */ createBarrier(4, 0),
    +
    +                           // another successful checkpoint
    +                           /* 26 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +                           /* 29 */ createBarrier(5, 2), createBarrier(5, 
1), createBarrier(5, 0),
    +                           /* 32 */ createBuffer(0), createBuffer(1),
    +
    +                           // abort multiple cancellations and a barrier 
after the cancellations
    +                           /* 34 */ createCancellationBarrier(6, 1), 
createCancellationBarrier(6, 2),
    +                           /* 36 */ createBarrier(6, 0),
    +
    +                           /* 37 */ createBuffer(0)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           long startTs;
    +
    +           // successful first checkpoint, with some aligned buffers
    +           check(sequence[0], buffer.getNextNonBlocked());
    +           check(sequence[1], buffer.getNextNonBlocked());
    +           check(sequence[2], buffer.getNextNonBlocked());
    +           startTs = System.nanoTime();
    +           check(sequence[5], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
    +           validateAlignmentTime(startTs, buffer);
    +
    +           check(sequence[6], buffer.getNextNonBlocked());
    +           check(sequence[8], buffer.getNextNonBlocked());
    +           check(sequence[9], buffer.getNextNonBlocked());
    +
    +           // canceled checkpoint on last barrier
    +           startTs = System.nanoTime();
    +           check(sequence[12], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
    +           validateAlignmentTime(startTs, buffer);
    +           check(sequence[13], buffer.getNextNonBlocked());
    +
    +           // one more successful checkpoint
    +           check(sequence[15], buffer.getNextNonBlocked());
    +           check(sequence[16], buffer.getNextNonBlocked());
    +           startTs = System.nanoTime();
    +           check(sequence[20], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
    +           validateAlignmentTime(startTs, buffer);
    +           check(sequence[21], buffer.getNextNonBlocked());
    +
    +           // this checkpoint gets immediately canceled
    +           check(sequence[24], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           // some buffers
    +           check(sequence[26], buffer.getNextNonBlocked());
    +           check(sequence[27], buffer.getNextNonBlocked());
    +           check(sequence[28], buffer.getNextNonBlocked());
    +
    +           // a simple successful checkpoint
    +           startTs = System.nanoTime();
    +           check(sequence[32], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
    +           validateAlignmentTime(startTs, buffer);
    +           check(sequence[33], buffer.getNextNonBlocked());
    +
    +           check(sequence[37], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +   }
    +
    +   @Test
    +   public void testAbortViaQueuedBarriers() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // starting a checkpoint
    +                           /* 0 */ createBuffer(1),
    +                           /* 1 */ createBarrier(1, 1), createBarrier(1, 
2),
    +                           /* 3 */ createBuffer(2), createBuffer(0), 
createBuffer(1),
    +
    +                           // queued barrier and cancellation barrier
    +                           /* 6 */ createCancellationBarrier(2, 2),
    +                           /* 7 */ createBarrier(2, 1),
    +
    +                           // some intermediate buffers (some queued)
    +                           /* 8 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +
    +                           // complete initial checkpoint
    +                           /* 11 */ createBarrier(1, 0),
    +
    +                           // some buffers (none queued, since checkpoint 
is aborted)
    +                           /* 12 */ createBuffer(2), createBuffer(1), 
createBuffer(0),
    +
    +                           // final barrier of aborted checkpoint
    +                           /* 15 */ createBarrier(1, 2),
    +
    +                           // some more buffers
    +                           /* 16 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           long startTs;
    +
    +           check(sequence[0], buffer.getNextNonBlocked());
    +
    +           // starting first checkpoint
    +           startTs = System.nanoTime();
    +           check(sequence[4], buffer.getNextNonBlocked());
    +           check(sequence[8], buffer.getNextNonBlocked());
    +
    +           // finished first checkpoint
    +           check(sequence[3], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
    +           validateAlignmentTime(startTs, buffer);
    +
    +           check(sequence[5], buffer.getNextNonBlocked());
    +
    +           // re-read the queued cancellation barriers
    +           check(sequence[9], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           check(sequence[10], buffer.getNextNonBlocked());
    +           check(sequence[12], buffer.getNextNonBlocked());
    +           check(sequence[13], buffer.getNextNonBlocked());
    +           check(sequence[14], buffer.getNextNonBlocked());
    +
    +           check(sequence[16], buffer.getNextNonBlocked());
    +           check(sequence[17], buffer.getNextNonBlocked());
    +           check(sequence[18], buffer.getNextNonBlocked());
    +
    +           // no further alignment should have happened
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           // no further checkpoint (abort) notifications
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +   }
    +
    +   /**
    +    * This tests the where a replay of queued checkpoint barriers meets
    +    * a canceled checkpoint.
    +    *
    +    * The replayed newer checkpoint barrier must not try to cancel the
    +    * already canceled checkpoint.
    +    */
    +   @Test
    +   public void testAbortWhileHavingQueuedBarriers() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // starting a checkpoint
    +                           /*  0 */ createBuffer(1),
    +                           /*  1 */ createBarrier(1, 1),
    +                           /*  2 */ createBuffer(2), createBuffer(0), 
createBuffer(1),
    +
    +                           // queued barrier and cancellation barrier
    +                           /*  5 */ createBarrier(2, 1),
    +
    +                           // some queued buffers
    +                           /*  6 */ createBuffer(2), createBuffer(1),
    +
    +                           // cancel the initial checkpoint
    +                           /*  8 */ createCancellationBarrier(1, 0),
    +
    +                           // some more buffers
    +                           /*  9 */ createBuffer(2), createBuffer(1), 
createBuffer(0),
    +
    +                           // ignored barrier - already canceled and moved 
to next checkpoint
    +                           /* 12 */ createBarrier(1, 2),
    +
    +                           // some more buffers
    +                           /* 13 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +
    +                           // complete next checkpoint regularly
    +                           /* 16 */ createBarrier(2, 0), createBarrier(2, 
2),
    +
    +                           // some more buffers
    +                           /* 18 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           long startTs;
    +
    +           check(sequence[0], buffer.getNextNonBlocked());
    +
    +           // starting first checkpoint
    +           startTs = System.nanoTime();
    +           check(sequence[2], buffer.getNextNonBlocked());
    +           check(sequence[3], buffer.getNextNonBlocked());
    +           check(sequence[6], buffer.getNextNonBlocked());
    +
    +           // cancelled by cancellation barrier
    +           check(sequence[4], buffer.getNextNonBlocked());
    +           validateAlignmentTime(startTs, buffer);
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
    +
    +           // the next checkpoint alignment starts now
    +           startTs = System.nanoTime();
    +           check(sequence[9], buffer.getNextNonBlocked());
    +           check(sequence[11], buffer.getNextNonBlocked());
    +           check(sequence[13], buffer.getNextNonBlocked());
    +           check(sequence[15], buffer.getNextNonBlocked());
    +
    +           // checkpoint done
    +           check(sequence[7], buffer.getNextNonBlocked());
    +           validateAlignmentTime(startTs, buffer);
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
    +
    +           // queued data
    +           check(sequence[10], buffer.getNextNonBlocked());
    +           check(sequence[14], buffer.getNextNonBlocked());
    +
    +           // trailing data
    +           check(sequence[18], buffer.getNextNonBlocked());
    +           check(sequence[19], buffer.getNextNonBlocked());
    +           check(sequence[20], buffer.getNextNonBlocked());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +
    +           // check overall notifications
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
    +   }
    +
    +   /**
    +    * This tests the where a cancellation barrier is received for a 
checkpoint already
    +    * canceled due to receiving a newer checkpoint barrier.
    +    */
    +   @Test
    +   public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws 
Exception {
    +           BufferOrEvent[] sequence = {
    +                           // starting a checkpoint
    +                           /*  0 */ createBuffer(2),
    +                           /*  1 */ createBarrier(3, 1), createBarrier(3, 
0),
    +                           /*  3 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +
    +                           // newer checkpoint barrier cancels/subsumes 
pending checkpoint
    +                           /*  6 */ createBarrier(5, 2),
    --- End diff --
    
    I totally agree.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to