[ 
https://issues.apache.org/jira/browse/FLINK-4975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15648061#comment-15648061
 ] 

ASF GitHub Bot commented on FLINK-4975:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2754#discussion_r87030844
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 ---
    @@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() {
        }
     
        @Test
    -   public void testEndOfStreamWhileCheckpoint() {
    +   public void testEndOfStreamWhileCheckpoint() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // one checkpoint
    +                           createBarrier(1, 0), createBarrier(1, 1), 
createBarrier(1, 2),
    +
    +                           // some buffers
    +                           createBuffer(0), createBuffer(0), 
createBuffer(2),
    +
    +                           // start the checkpoint that will be incomplete
    +                           createBarrier(2, 2), createBarrier(2, 0),
    +                           createBuffer(0), createBuffer(2), 
createBuffer(1),
    +
    +                           // close one after the barrier one before the 
barrier
    +                           createEndOfPartition(2), 
createEndOfPartition(1),
    +                           createBuffer(0),
    +
    +                           // final end of stream
    +                           createEndOfPartition(0)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           // data after first checkpoint
    +           check(sequence[3], buffer.getNextNonBlocked());
    +           check(sequence[4], buffer.getNextNonBlocked());
    +           check(sequence[5], buffer.getNextNonBlocked());
    +           assertEquals(1L, buffer.getCurrentCheckpointId());
    +
    +           // alignment of second checkpoint
    +           check(sequence[10], buffer.getNextNonBlocked());
    +           assertEquals(2L, buffer.getCurrentCheckpointId());
    +
    +           // first end-of-partition encountered: checkpoint will not be 
completed
    +           check(sequence[12], buffer.getNextNonBlocked());
    +           check(sequence[8], buffer.getNextNonBlocked());
    +           check(sequence[9], buffer.getNextNonBlocked());
    +           check(sequence[11], buffer.getNextNonBlocked());
    +           check(sequence[13], buffer.getNextNonBlocked());
    +           check(sequence[14], buffer.getNextNonBlocked());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +
    +           checkNoTempFilesRemain();
    +   }
    +
    +   @Test
    +   public void testSingleChannelAbortCheckpoint() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           createBuffer(0),
    +                           createBarrier(1, 0),
    +                           createBuffer(0),
    +                           createBarrier(2, 0),
    +                           createCancellationBarrier(4, 0),
    +                           createBarrier(5, 0),
    +                           createBuffer(0),
    +                           createCancellationBarrier(6, 0),
    +                           createBuffer(0)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           check(sequence[0], buffer.getNextNonBlocked());
    +           check(sequence[2], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           check(sequence[6], buffer.getNextNonBlocked());
    +           assertEquals(5L, buffer.getCurrentCheckpointId());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           check(sequence[8], buffer.getNextNonBlocked());
    +           assertEquals(6L, buffer.getCurrentCheckpointId());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
                
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +   }
    +
    +   @Test
    +   public void testMultiChannelAbortCheckpoint() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // some buffers and a successful checkpoint
    +                           /* 0 */ createBuffer(0), createBuffer(2), 
createBuffer(0),
    +                           /* 3 */ createBarrier(1, 1), createBarrier(1, 
2),
    +                           /* 5 */ createBuffer(2), createBuffer(1),
    +                           /* 7 */ createBarrier(1, 0),
    +                           /* 8 */ createBuffer(0), createBuffer(2),
    +
    +                           // aborted on last barrier
    +                           /* 10 */ createBarrier(2, 0), createBarrier(2, 
2),
    +                           /* 12 */ createBuffer(0), createBuffer(2),
    +                           /* 14 */ createCancellationBarrier(2, 1),
    +
    +                           // successful checkpoint
    +                           /* 15 */ createBuffer(2), createBuffer(1),
    +                           /* 17 */ createBarrier(3, 1), createBarrier(3, 
2), createBarrier(3, 0),
    +
    +                           // abort on first barrier
    +                           /* 20 */ createBuffer(0), createBuffer(1),
    +                           /* 22 */ createCancellationBarrier(4, 1), 
createBarrier(4, 2),
    +                           /* 24 */ createBuffer(0),
    +                           /* 25 */ createBarrier(4, 0),
    +
    +                           // another successful checkpoint
    +                           /* 26 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +                           /* 29 */ createBarrier(5, 2), createBarrier(5, 
1), createBarrier(5, 0),
    +                           /* 32 */ createBuffer(0), createBuffer(1),
    +
    +                           // abort multiple cancellations and a barrier 
after the cancellations
    +                           /* 34 */ createCancellationBarrier(6, 1), 
createCancellationBarrier(6, 2),
    +                           /* 36 */ createBarrier(6, 0),
    +
    +                           /* 37 */ createBuffer(0)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           long startTs;
    +
    +           // successful first checkpoint, with some aligned buffers
    +           check(sequence[0], buffer.getNextNonBlocked());
    +           check(sequence[1], buffer.getNextNonBlocked());
    +           check(sequence[2], buffer.getNextNonBlocked());
    +           startTs = System.nanoTime();
    +           check(sequence[5], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
    +           validateAlignmentTime(startTs, buffer);
    +
    +           check(sequence[6], buffer.getNextNonBlocked());
    +           check(sequence[8], buffer.getNextNonBlocked());
    +           check(sequence[9], buffer.getNextNonBlocked());
    +
    +           // canceled checkpoint on last barrier
    +           startTs = System.nanoTime();
    +           check(sequence[12], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
    +           validateAlignmentTime(startTs, buffer);
    +           check(sequence[13], buffer.getNextNonBlocked());
    +
    +           // one more successful checkpoint
    +           check(sequence[15], buffer.getNextNonBlocked());
    +           check(sequence[16], buffer.getNextNonBlocked());
    +           startTs = System.nanoTime();
    +           check(sequence[20], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
    +           validateAlignmentTime(startTs, buffer);
    +           check(sequence[21], buffer.getNextNonBlocked());
    +
    +           // this checkpoint gets immediately canceled
    +           check(sequence[24], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           // some buffers
    +           check(sequence[26], buffer.getNextNonBlocked());
    +           check(sequence[27], buffer.getNextNonBlocked());
    +           check(sequence[28], buffer.getNextNonBlocked());
    +
    +           // a simple successful checkpoint
    +           startTs = System.nanoTime();
    +           check(sequence[32], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
    +           validateAlignmentTime(startTs, buffer);
    +           check(sequence[33], buffer.getNextNonBlocked());
    +
    +           check(sequence[37], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +   }
    +
    +   @Test
    +   public void testAbortViaQueuedBarriers() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // starting a checkpoint
    +                           /* 0 */ createBuffer(1),
    +                           /* 1 */ createBarrier(1, 1), createBarrier(1, 
2),
    +                           /* 3 */ createBuffer(2), createBuffer(0), 
createBuffer(1),
    +
    +                           // queued barrier and cancellation barrier
    +                           /* 6 */ createCancellationBarrier(2, 2),
    +                           /* 7 */ createBarrier(2, 1),
    +
    +                           // some intermediate buffers (some queued)
    +                           /* 8 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +
    +                           // complete initial checkpoint
    +                           /* 11 */ createBarrier(1, 0),
    +
    +                           // some buffers (none queued, since checkpoint 
is aborted)
    +                           /* 12 */ createBuffer(2), createBuffer(1), 
createBuffer(0),
    +
    +                           // final barrier of aborted checkpoint
    +                           /* 15 */ createBarrier(1, 2),
    --- End diff --
    
    Isn't this a barrier of the initial checkpoint which has been completed 
successfully? Just wondering because of the comment.


> Add a limit for how much data may be buffered during checkpoint alignment
> -------------------------------------------------------------------------
>
>                 Key: FLINK-4975
>                 URL: https://issues.apache.org/jira/browse/FLINK-4975
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.1.3
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.2.0, 1.1.4
>
>
> During checkpoint alignment, data may be buffered/spilled.
> We should introduce an upper limit for the spilled data volume. After 
> exceeding that limit, the checkpoint alignment should abort and the 
> checkpoint be canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to