Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2754#discussion_r87039138
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 ---
    @@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() {
        }
     
        @Test
    -   public void testEndOfStreamWhileCheckpoint() {
    +   public void testEndOfStreamWhileCheckpoint() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // one checkpoint
    +                           createBarrier(1, 0), createBarrier(1, 1), 
createBarrier(1, 2),
    +
    +                           // some buffers
    +                           createBuffer(0), createBuffer(0), 
createBuffer(2),
    +
    +                           // start the checkpoint that will be incomplete
    +                           createBarrier(2, 2), createBarrier(2, 0),
    +                           createBuffer(0), createBuffer(2), 
createBuffer(1),
    +
    +                           // close one after the barrier one before the 
barrier
    +                           createEndOfPartition(2), 
createEndOfPartition(1),
    +                           createBuffer(0),
    +
    +                           // final end of stream
    +                           createEndOfPartition(0)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           // data after first checkpoint
    +           check(sequence[3], buffer.getNextNonBlocked());
    +           check(sequence[4], buffer.getNextNonBlocked());
    +           check(sequence[5], buffer.getNextNonBlocked());
    +           assertEquals(1L, buffer.getCurrentCheckpointId());
    +
    +           // alignment of second checkpoint
    +           check(sequence[10], buffer.getNextNonBlocked());
    +           assertEquals(2L, buffer.getCurrentCheckpointId());
    +
    +           // first end-of-partition encountered: checkpoint will not be 
completed
    +           check(sequence[12], buffer.getNextNonBlocked());
    +           check(sequence[8], buffer.getNextNonBlocked());
    +           check(sequence[9], buffer.getNextNonBlocked());
    +           check(sequence[11], buffer.getNextNonBlocked());
    +           check(sequence[13], buffer.getNextNonBlocked());
    +           check(sequence[14], buffer.getNextNonBlocked());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +
    +           checkNoTempFilesRemain();
    +   }
    +
    +   @Test
    +   public void testSingleChannelAbortCheckpoint() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           createBuffer(0),
    +                           createBarrier(1, 0),
    +                           createBuffer(0),
    +                           createBarrier(2, 0),
    +                           createCancellationBarrier(4, 0),
    +                           createBarrier(5, 0),
    +                           createBuffer(0),
    +                           createCancellationBarrier(6, 0),
    +                           createBuffer(0)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           check(sequence[0], buffer.getNextNonBlocked());
    +           check(sequence[2], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           check(sequence[6], buffer.getNextNonBlocked());
    +           assertEquals(5L, buffer.getCurrentCheckpointId());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           check(sequence[8], buffer.getNextNonBlocked());
    +           assertEquals(6L, buffer.getCurrentCheckpointId());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
                
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +   }
    +
    +   @Test
    +   public void testMultiChannelAbortCheckpoint() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // some buffers and a successful checkpoint
    +                           /* 0 */ createBuffer(0), createBuffer(2), 
createBuffer(0),
    +                           /* 3 */ createBarrier(1, 1), createBarrier(1, 
2),
    +                           /* 5 */ createBuffer(2), createBuffer(1),
    +                           /* 7 */ createBarrier(1, 0),
    +                           /* 8 */ createBuffer(0), createBuffer(2),
    +
    +                           // aborted on last barrier
    +                           /* 10 */ createBarrier(2, 0), createBarrier(2, 
2),
    +                           /* 12 */ createBuffer(0), createBuffer(2),
    +                           /* 14 */ createCancellationBarrier(2, 1),
    +
    +                           // successful checkpoint
    +                           /* 15 */ createBuffer(2), createBuffer(1),
    +                           /* 17 */ createBarrier(3, 1), createBarrier(3, 
2), createBarrier(3, 0),
    +
    +                           // abort on first barrier
    +                           /* 20 */ createBuffer(0), createBuffer(1),
    +                           /* 22 */ createCancellationBarrier(4, 1), 
createBarrier(4, 2),
    +                           /* 24 */ createBuffer(0),
    +                           /* 25 */ createBarrier(4, 0),
    +
    +                           // another successful checkpoint
    +                           /* 26 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +                           /* 29 */ createBarrier(5, 2), createBarrier(5, 
1), createBarrier(5, 0),
    +                           /* 32 */ createBuffer(0), createBuffer(1),
    +
    +                           // abort multiple cancellations and a barrier 
after the cancellations
    +                           /* 34 */ createCancellationBarrier(6, 1), 
createCancellationBarrier(6, 2),
    +                           /* 36 */ createBarrier(6, 0),
    +
    +                           /* 37 */ createBuffer(0)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           long startTs;
    +
    +           // successful first checkpoint, with some aligned buffers
    +           check(sequence[0], buffer.getNextNonBlocked());
    +           check(sequence[1], buffer.getNextNonBlocked());
    +           check(sequence[2], buffer.getNextNonBlocked());
    +           startTs = System.nanoTime();
    +           check(sequence[5], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
    +           validateAlignmentTime(startTs, buffer);
    +
    +           check(sequence[6], buffer.getNextNonBlocked());
    +           check(sequence[8], buffer.getNextNonBlocked());
    +           check(sequence[9], buffer.getNextNonBlocked());
    +
    +           // canceled checkpoint on last barrier
    +           startTs = System.nanoTime();
    +           check(sequence[12], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
    +           validateAlignmentTime(startTs, buffer);
    +           check(sequence[13], buffer.getNextNonBlocked());
    +
    +           // one more successful checkpoint
    +           check(sequence[15], buffer.getNextNonBlocked());
    +           check(sequence[16], buffer.getNextNonBlocked());
    +           startTs = System.nanoTime();
    +           check(sequence[20], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
    +           validateAlignmentTime(startTs, buffer);
    +           check(sequence[21], buffer.getNextNonBlocked());
    +
    +           // this checkpoint gets immediately canceled
    +           check(sequence[24], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           // some buffers
    +           check(sequence[26], buffer.getNextNonBlocked());
    +           check(sequence[27], buffer.getNextNonBlocked());
    +           check(sequence[28], buffer.getNextNonBlocked());
    +
    +           // a simple successful checkpoint
    +           startTs = System.nanoTime();
    +           check(sequence[32], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
    +           validateAlignmentTime(startTs, buffer);
    +           check(sequence[33], buffer.getNextNonBlocked());
    +
    +           check(sequence[37], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +   }
    +
    +   @Test
    +   public void testAbortViaQueuedBarriers() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // starting a checkpoint
    +                           /* 0 */ createBuffer(1),
    +                           /* 1 */ createBarrier(1, 1), createBarrier(1, 
2),
    +                           /* 3 */ createBuffer(2), createBuffer(0), 
createBuffer(1),
    +
    +                           // queued barrier and cancellation barrier
    +                           /* 6 */ createCancellationBarrier(2, 2),
    +                           /* 7 */ createBarrier(2, 1),
    +
    +                           // some intermediate buffers (some queued)
    +                           /* 8 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +
    +                           // complete initial checkpoint
    +                           /* 11 */ createBarrier(1, 0),
    +
    +                           // some buffers (none queued, since checkpoint 
is aborted)
    +                           /* 12 */ createBuffer(2), createBuffer(1), 
createBuffer(0),
    +
    +                           // final barrier of aborted checkpoint
    +                           /* 15 */ createBarrier(1, 2),
    --- End diff --
    
    Both late and aborted barriers are handled in the same code path, so the 
test worked despite that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to