[ 
https://issues.apache.org/jira/browse/FLINK-4975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15648090#comment-15648090
 ] 

ASF GitHub Bot commented on FLINK-4975:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2754#discussion_r87033063
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 ---
    @@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() {
        }
     
        @Test
    -   public void testEndOfStreamWhileCheckpoint() {
    +   public void testEndOfStreamWhileCheckpoint() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // one checkpoint
    +                           createBarrier(1, 0), createBarrier(1, 1), 
createBarrier(1, 2),
    +
    +                           // some buffers
    +                           createBuffer(0), createBuffer(0), 
createBuffer(2),
    +
    +                           // start the checkpoint that will be incomplete
    +                           createBarrier(2, 2), createBarrier(2, 0),
    +                           createBuffer(0), createBuffer(2), 
createBuffer(1),
    +
    +                           // close one after the barrier one before the 
barrier
    +                           createEndOfPartition(2), 
createEndOfPartition(1),
    +                           createBuffer(0),
    +
    +                           // final end of stream
    +                           createEndOfPartition(0)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           // data after first checkpoint
    +           check(sequence[3], buffer.getNextNonBlocked());
    +           check(sequence[4], buffer.getNextNonBlocked());
    +           check(sequence[5], buffer.getNextNonBlocked());
    +           assertEquals(1L, buffer.getCurrentCheckpointId());
    +
    +           // alignment of second checkpoint
    +           check(sequence[10], buffer.getNextNonBlocked());
    +           assertEquals(2L, buffer.getCurrentCheckpointId());
    +
    +           // first end-of-partition encountered: checkpoint will not be 
completed
    +           check(sequence[12], buffer.getNextNonBlocked());
    +           check(sequence[8], buffer.getNextNonBlocked());
    +           check(sequence[9], buffer.getNextNonBlocked());
    +           check(sequence[11], buffer.getNextNonBlocked());
    +           check(sequence[13], buffer.getNextNonBlocked());
    +           check(sequence[14], buffer.getNextNonBlocked());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +
    +           checkNoTempFilesRemain();
    +   }
    +
    +   @Test
    +   public void testSingleChannelAbortCheckpoint() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           createBuffer(0),
    +                           createBarrier(1, 0),
    +                           createBuffer(0),
    +                           createBarrier(2, 0),
    +                           createCancellationBarrier(4, 0),
    +                           createBarrier(5, 0),
    +                           createBuffer(0),
    +                           createCancellationBarrier(6, 0),
    +                           createBuffer(0)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           check(sequence[0], buffer.getNextNonBlocked());
    +           check(sequence[2], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           check(sequence[6], buffer.getNextNonBlocked());
    +           assertEquals(5L, buffer.getCurrentCheckpointId());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           check(sequence[8], buffer.getNextNonBlocked());
    +           assertEquals(6L, buffer.getCurrentCheckpointId());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
                
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +   }
    +
    +   @Test
    +   public void testMultiChannelAbortCheckpoint() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // some buffers and a successful checkpoint
    +                           /* 0 */ createBuffer(0), createBuffer(2), 
createBuffer(0),
    +                           /* 3 */ createBarrier(1, 1), createBarrier(1, 
2),
    +                           /* 5 */ createBuffer(2), createBuffer(1),
    +                           /* 7 */ createBarrier(1, 0),
    +                           /* 8 */ createBuffer(0), createBuffer(2),
    +
    +                           // aborted on last barrier
    +                           /* 10 */ createBarrier(2, 0), createBarrier(2, 
2),
    +                           /* 12 */ createBuffer(0), createBuffer(2),
    +                           /* 14 */ createCancellationBarrier(2, 1),
    +
    +                           // successful checkpoint
    +                           /* 15 */ createBuffer(2), createBuffer(1),
    +                           /* 17 */ createBarrier(3, 1), createBarrier(3, 
2), createBarrier(3, 0),
    +
    +                           // abort on first barrier
    +                           /* 20 */ createBuffer(0), createBuffer(1),
    +                           /* 22 */ createCancellationBarrier(4, 1), 
createBarrier(4, 2),
    +                           /* 24 */ createBuffer(0),
    +                           /* 25 */ createBarrier(4, 0),
    +
    +                           // another successful checkpoint
    +                           /* 26 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +                           /* 29 */ createBarrier(5, 2), createBarrier(5, 
1), createBarrier(5, 0),
    +                           /* 32 */ createBuffer(0), createBuffer(1),
    +
    +                           // abort multiple cancellations and a barrier 
after the cancellations
    +                           /* 34 */ createCancellationBarrier(6, 1), 
createCancellationBarrier(6, 2),
    +                           /* 36 */ createBarrier(6, 0),
    +
    +                           /* 37 */ createBuffer(0)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           long startTs;
    +
    +           // successful first checkpoint, with some aligned buffers
    +           check(sequence[0], buffer.getNextNonBlocked());
    +           check(sequence[1], buffer.getNextNonBlocked());
    +           check(sequence[2], buffer.getNextNonBlocked());
    +           startTs = System.nanoTime();
    +           check(sequence[5], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
    +           validateAlignmentTime(startTs, buffer);
    +
    +           check(sequence[6], buffer.getNextNonBlocked());
    +           check(sequence[8], buffer.getNextNonBlocked());
    +           check(sequence[9], buffer.getNextNonBlocked());
    +
    +           // canceled checkpoint on last barrier
    +           startTs = System.nanoTime();
    +           check(sequence[12], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
    +           validateAlignmentTime(startTs, buffer);
    +           check(sequence[13], buffer.getNextNonBlocked());
    +
    +           // one more successful checkpoint
    +           check(sequence[15], buffer.getNextNonBlocked());
    +           check(sequence[16], buffer.getNextNonBlocked());
    +           startTs = System.nanoTime();
    +           check(sequence[20], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
    +           validateAlignmentTime(startTs, buffer);
    +           check(sequence[21], buffer.getNextNonBlocked());
    +
    +           // this checkpoint gets immediately canceled
    +           check(sequence[24], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           // some buffers
    +           check(sequence[26], buffer.getNextNonBlocked());
    +           check(sequence[27], buffer.getNextNonBlocked());
    +           check(sequence[28], buffer.getNextNonBlocked());
    +
    +           // a simple successful checkpoint
    +           startTs = System.nanoTime();
    +           check(sequence[32], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
    +           validateAlignmentTime(startTs, buffer);
    +           check(sequence[33], buffer.getNextNonBlocked());
    +
    +           check(sequence[37], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +   }
    +
    +   @Test
    +   public void testAbortViaQueuedBarriers() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // starting a checkpoint
    +                           /* 0 */ createBuffer(1),
    +                           /* 1 */ createBarrier(1, 1), createBarrier(1, 
2),
    +                           /* 3 */ createBuffer(2), createBuffer(0), 
createBuffer(1),
    +
    +                           // queued barrier and cancellation barrier
    +                           /* 6 */ createCancellationBarrier(2, 2),
    +                           /* 7 */ createBarrier(2, 1),
    +
    +                           // some intermediate buffers (some queued)
    +                           /* 8 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +
    +                           // complete initial checkpoint
    +                           /* 11 */ createBarrier(1, 0),
    +
    +                           // some buffers (none queued, since checkpoint 
is aborted)
    +                           /* 12 */ createBuffer(2), createBuffer(1), 
createBuffer(0),
    +
    +                           // final barrier of aborted checkpoint
    +                           /* 15 */ createBarrier(1, 2),
    +
    +                           // some more buffers
    +                           /* 16 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           long startTs;
    +
    +           check(sequence[0], buffer.getNextNonBlocked());
    +
    +           // starting first checkpoint
    +           startTs = System.nanoTime();
    +           check(sequence[4], buffer.getNextNonBlocked());
    +           check(sequence[8], buffer.getNextNonBlocked());
    +
    +           // finished first checkpoint
    +           check(sequence[3], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
    +           validateAlignmentTime(startTs, buffer);
    +
    +           check(sequence[5], buffer.getNextNonBlocked());
    +
    +           // re-read the queued cancellation barriers
    +           check(sequence[9], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           check(sequence[10], buffer.getNextNonBlocked());
    +           check(sequence[12], buffer.getNextNonBlocked());
    +           check(sequence[13], buffer.getNextNonBlocked());
    +           check(sequence[14], buffer.getNextNonBlocked());
    +
    +           check(sequence[16], buffer.getNextNonBlocked());
    +           check(sequence[17], buffer.getNextNonBlocked());
    +           check(sequence[18], buffer.getNextNonBlocked());
    +
    +           // no further alignment should have happened
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           // no further checkpoint (abort) notifications
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +   }
    +
    +   /**
    +    * This tests the where a replay of queued checkpoint barriers meets
    +    * a canceled checkpoint.
    +    *
    +    * The replayed newer checkpoint barrier must not try to cancel the
    +    * already canceled checkpoint.
    +    */
    +   @Test
    +   public void testAbortWhileHavingQueuedBarriers() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // starting a checkpoint
    +                           /*  0 */ createBuffer(1),
    +                           /*  1 */ createBarrier(1, 1),
    +                           /*  2 */ createBuffer(2), createBuffer(0), 
createBuffer(1),
    +
    +                           // queued barrier and cancellation barrier
    +                           /*  5 */ createBarrier(2, 1),
    +
    +                           // some queued buffers
    +                           /*  6 */ createBuffer(2), createBuffer(1),
    +
    +                           // cancel the initial checkpoint
    +                           /*  8 */ createCancellationBarrier(1, 0),
    +
    +                           // some more buffers
    +                           /*  9 */ createBuffer(2), createBuffer(1), 
createBuffer(0),
    +
    +                           // ignored barrier - already canceled and moved 
to next checkpoint
    +                           /* 12 */ createBarrier(1, 2),
    +
    +                           // some more buffers
    +                           /* 13 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +
    +                           // complete next checkpoint regularly
    +                           /* 16 */ createBarrier(2, 0), createBarrier(2, 
2),
    +
    +                           // some more buffers
    +                           /* 18 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           long startTs;
    +
    +           check(sequence[0], buffer.getNextNonBlocked());
    +
    +           // starting first checkpoint
    +           startTs = System.nanoTime();
    +           check(sequence[2], buffer.getNextNonBlocked());
    +           check(sequence[3], buffer.getNextNonBlocked());
    +           check(sequence[6], buffer.getNextNonBlocked());
    +
    +           // cancelled by cancellation barrier
    +           check(sequence[4], buffer.getNextNonBlocked());
    +           validateAlignmentTime(startTs, buffer);
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
    +
    +           // the next checkpoint alignment starts now
    +           startTs = System.nanoTime();
    +           check(sequence[9], buffer.getNextNonBlocked());
    +           check(sequence[11], buffer.getNextNonBlocked());
    +           check(sequence[13], buffer.getNextNonBlocked());
    +           check(sequence[15], buffer.getNextNonBlocked());
    +
    +           // checkpoint done
    +           check(sequence[7], buffer.getNextNonBlocked());
    +           validateAlignmentTime(startTs, buffer);
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
    +
    +           // queued data
    +           check(sequence[10], buffer.getNextNonBlocked());
    +           check(sequence[14], buffer.getNextNonBlocked());
    +
    +           // trailing data
    +           check(sequence[18], buffer.getNextNonBlocked());
    +           check(sequence[19], buffer.getNextNonBlocked());
    +           check(sequence[20], buffer.getNextNonBlocked());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +
    +           // check overall notifications
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
    +   }
    +
    +   /**
    +    * This tests the where a cancellation barrier is received for a 
checkpoint already
    +    * canceled due to receiving a newer checkpoint barrier.
    +    */
    +   @Test
    +   public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws 
Exception {
    +           BufferOrEvent[] sequence = {
    +                           // starting a checkpoint
    +                           /*  0 */ createBuffer(2),
    +                           /*  1 */ createBarrier(3, 1), createBarrier(3, 
0),
    +                           /*  3 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +
    +                           // newer checkpoint barrier cancels/subsumes 
pending checkpoint
    +                           /*  6 */ createBarrier(5, 2),
    --- End diff --
    
    It could happen so far, if one source was not ready and the others were.
    
    I think good design is that every component in itself is as robust as it 
can be. Chances are good it can then compensate for other unexpected behavior 
(like an RPC message getting lost).
    
    I guess that the above philosophy is the reason why the alignment code has 
so far worked pretty well, despite the not trivial problem.


> Add a limit for how much data may be buffered during checkpoint alignment
> -------------------------------------------------------------------------
>
>                 Key: FLINK-4975
>                 URL: https://issues.apache.org/jira/browse/FLINK-4975
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.1.3
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.2.0, 1.1.4
>
>
> During checkpoint alignment, data may be buffered/spilled.
> We should introduce an upper limit for the spilled data volume. After 
> exceeding that limit, the checkpoint alignment should abort and the 
> checkpoint be canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to