[ 
https://issues.apache.org/jira/browse/FLINK-4975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15648219#comment-15648219
 ] 

ASF GitHub Bot commented on FLINK-4975:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2754#discussion_r87043110
  
    --- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java
 ---
    @@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() {
        }
     
        @Test
    -   public void testEndOfStreamWhileCheckpoint() {
    +   public void testEndOfStreamWhileCheckpoint() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // one checkpoint
    +                           createBarrier(1, 0), createBarrier(1, 1), 
createBarrier(1, 2),
    +
    +                           // some buffers
    +                           createBuffer(0), createBuffer(0), 
createBuffer(2),
    +
    +                           // start the checkpoint that will be incomplete
    +                           createBarrier(2, 2), createBarrier(2, 0),
    +                           createBuffer(0), createBuffer(2), 
createBuffer(1),
    +
    +                           // close one after the barrier one before the 
barrier
    +                           createEndOfPartition(2), 
createEndOfPartition(1),
    +                           createBuffer(0),
    +
    +                           // final end of stream
    +                           createEndOfPartition(0)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           // data after first checkpoint
    +           check(sequence[3], buffer.getNextNonBlocked());
    +           check(sequence[4], buffer.getNextNonBlocked());
    +           check(sequence[5], buffer.getNextNonBlocked());
    +           assertEquals(1L, buffer.getCurrentCheckpointId());
    +
    +           // alignment of second checkpoint
    +           check(sequence[10], buffer.getNextNonBlocked());
    +           assertEquals(2L, buffer.getCurrentCheckpointId());
    +
    +           // first end-of-partition encountered: checkpoint will not be 
completed
    +           check(sequence[12], buffer.getNextNonBlocked());
    +           check(sequence[8], buffer.getNextNonBlocked());
    +           check(sequence[9], buffer.getNextNonBlocked());
    +           check(sequence[11], buffer.getNextNonBlocked());
    +           check(sequence[13], buffer.getNextNonBlocked());
    +           check(sequence[14], buffer.getNextNonBlocked());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +
    +           checkNoTempFilesRemain();
    +   }
    +
    +   @Test
    +   public void testSingleChannelAbortCheckpoint() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           createBuffer(0),
    +                           createBarrier(1, 0),
    +                           createBuffer(0),
    +                           createBarrier(2, 0),
    +                           createCancellationBarrier(4, 0),
    +                           createBarrier(5, 0),
    +                           createBuffer(0),
    +                           createCancellationBarrier(6, 0),
    +                           createBuffer(0)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           check(sequence[0], buffer.getNextNonBlocked());
    +           check(sequence[2], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           check(sequence[6], buffer.getNextNonBlocked());
    +           assertEquals(5L, buffer.getCurrentCheckpointId());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           check(sequence[8], buffer.getNextNonBlocked());
    +           assertEquals(6L, buffer.getCurrentCheckpointId());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
                
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +   }
    +
    +   @Test
    +   public void testMultiChannelAbortCheckpoint() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // some buffers and a successful checkpoint
    +                           /* 0 */ createBuffer(0), createBuffer(2), 
createBuffer(0),
    +                           /* 3 */ createBarrier(1, 1), createBarrier(1, 
2),
    +                           /* 5 */ createBuffer(2), createBuffer(1),
    +                           /* 7 */ createBarrier(1, 0),
    +                           /* 8 */ createBuffer(0), createBuffer(2),
    +
    +                           // aborted on last barrier
    +                           /* 10 */ createBarrier(2, 0), createBarrier(2, 
2),
    +                           /* 12 */ createBuffer(0), createBuffer(2),
    +                           /* 14 */ createCancellationBarrier(2, 1),
    +
    +                           // successful checkpoint
    +                           /* 15 */ createBuffer(2), createBuffer(1),
    +                           /* 17 */ createBarrier(3, 1), createBarrier(3, 
2), createBarrier(3, 0),
    +
    +                           // abort on first barrier
    +                           /* 20 */ createBuffer(0), createBuffer(1),
    +                           /* 22 */ createCancellationBarrier(4, 1), 
createBarrier(4, 2),
    +                           /* 24 */ createBuffer(0),
    +                           /* 25 */ createBarrier(4, 0),
    +
    +                           // another successful checkpoint
    +                           /* 26 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +                           /* 29 */ createBarrier(5, 2), createBarrier(5, 
1), createBarrier(5, 0),
    +                           /* 32 */ createBuffer(0), createBuffer(1),
    +
    +                           // abort multiple cancellations and a barrier 
after the cancellations
    +                           /* 34 */ createCancellationBarrier(6, 1), 
createCancellationBarrier(6, 2),
    +                           /* 36 */ createBarrier(6, 0),
    +
    +                           /* 37 */ createBuffer(0)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           long startTs;
    +
    +           // successful first checkpoint, with some aligned buffers
    +           check(sequence[0], buffer.getNextNonBlocked());
    +           check(sequence[1], buffer.getNextNonBlocked());
    +           check(sequence[2], buffer.getNextNonBlocked());
    +           startTs = System.nanoTime();
    +           check(sequence[5], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
    +           validateAlignmentTime(startTs, buffer);
    +
    +           check(sequence[6], buffer.getNextNonBlocked());
    +           check(sequence[8], buffer.getNextNonBlocked());
    +           check(sequence[9], buffer.getNextNonBlocked());
    +
    +           // canceled checkpoint on last barrier
    +           startTs = System.nanoTime();
    +           check(sequence[12], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
    +           validateAlignmentTime(startTs, buffer);
    +           check(sequence[13], buffer.getNextNonBlocked());
    +
    +           // one more successful checkpoint
    +           check(sequence[15], buffer.getNextNonBlocked());
    +           check(sequence[16], buffer.getNextNonBlocked());
    +           startTs = System.nanoTime();
    +           check(sequence[20], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)));
    +           validateAlignmentTime(startTs, buffer);
    +           check(sequence[21], buffer.getNextNonBlocked());
    +
    +           // this checkpoint gets immediately canceled
    +           check(sequence[24], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(4L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           // some buffers
    +           check(sequence[26], buffer.getNextNonBlocked());
    +           check(sequence[27], buffer.getNextNonBlocked());
    +           check(sequence[28], buffer.getNextNonBlocked());
    +
    +           // a simple successful checkpoint
    +           startTs = System.nanoTime();
    +           check(sequence[32], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)));
    +           validateAlignmentTime(startTs, buffer);
    +           check(sequence[33], buffer.getNextNonBlocked());
    +
    +           check(sequence[37], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(6L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +   }
    +
    +   @Test
    +   public void testAbortViaQueuedBarriers() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // starting a checkpoint
    +                           /* 0 */ createBuffer(1),
    +                           /* 1 */ createBarrier(1, 1), createBarrier(1, 
2),
    +                           /* 3 */ createBuffer(2), createBuffer(0), 
createBuffer(1),
    +
    +                           // queued barrier and cancellation barrier
    +                           /* 6 */ createCancellationBarrier(2, 2),
    +                           /* 7 */ createBarrier(2, 1),
    +
    +                           // some intermediate buffers (some queued)
    +                           /* 8 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +
    +                           // complete initial checkpoint
    +                           /* 11 */ createBarrier(1, 0),
    +
    +                           // some buffers (none queued, since checkpoint 
is aborted)
    +                           /* 12 */ createBuffer(2), createBuffer(1), 
createBuffer(0),
    +
    +                           // final barrier of aborted checkpoint
    +                           /* 15 */ createBarrier(1, 2),
    +
    +                           // some more buffers
    +                           /* 16 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           long startTs;
    +
    +           check(sequence[0], buffer.getNextNonBlocked());
    +
    +           // starting first checkpoint
    +           startTs = System.nanoTime();
    +           check(sequence[4], buffer.getNextNonBlocked());
    +           check(sequence[8], buffer.getNextNonBlocked());
    +
    +           // finished first checkpoint
    +           check(sequence[3], buffer.getNextNonBlocked());
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)));
    +           validateAlignmentTime(startTs, buffer);
    +
    +           check(sequence[5], buffer.getNextNonBlocked());
    +
    +           // re-read the queued cancellation barriers
    +           check(sequence[9], buffer.getNextNonBlocked());
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(2L);
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           check(sequence[10], buffer.getNextNonBlocked());
    +           check(sequence[12], buffer.getNextNonBlocked());
    +           check(sequence[13], buffer.getNextNonBlocked());
    +           check(sequence[14], buffer.getNextNonBlocked());
    +
    +           check(sequence[16], buffer.getNextNonBlocked());
    +           check(sequence[17], buffer.getNextNonBlocked());
    +           check(sequence[18], buffer.getNextNonBlocked());
    +
    +           // no further alignment should have happened
    +           assertEquals(0L, buffer.getAlignmentDurationNanos());
    +
    +           // no further checkpoint (abort) notifications
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +   }
    +
    +   /**
    +    * This tests the where a replay of queued checkpoint barriers meets
    +    * a canceled checkpoint.
    +    *
    +    * The replayed newer checkpoint barrier must not try to cancel the
    +    * already canceled checkpoint.
    +    */
    +   @Test
    +   public void testAbortWhileHavingQueuedBarriers() throws Exception {
    +           BufferOrEvent[] sequence = {
    +                           // starting a checkpoint
    +                           /*  0 */ createBuffer(1),
    +                           /*  1 */ createBarrier(1, 1),
    +                           /*  2 */ createBuffer(2), createBuffer(0), 
createBuffer(1),
    +
    +                           // queued barrier and cancellation barrier
    +                           /*  5 */ createBarrier(2, 1),
    +
    +                           // some queued buffers
    +                           /*  6 */ createBuffer(2), createBuffer(1),
    +
    +                           // cancel the initial checkpoint
    +                           /*  8 */ createCancellationBarrier(1, 0),
    +
    +                           // some more buffers
    +                           /*  9 */ createBuffer(2), createBuffer(1), 
createBuffer(0),
    +
    +                           // ignored barrier - already canceled and moved 
to next checkpoint
    +                           /* 12 */ createBarrier(1, 2),
    +
    +                           // some more buffers
    +                           /* 13 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +
    +                           // complete next checkpoint regularly
    +                           /* 16 */ createBarrier(2, 0), createBarrier(2, 
2),
    +
    +                           // some more buffers
    +                           /* 18 */ createBuffer(0), createBuffer(1), 
createBuffer(2)
    +           };
    +
    +           MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, 
Arrays.asList(sequence));
    +           BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER);
    +
    +           StatefulTask toNotify = mock(StatefulTask.class);
    +           buffer.registerCheckpointEventHandler(toNotify);
    +
    +           long startTs;
    +
    +           check(sequence[0], buffer.getNextNonBlocked());
    +
    +           // starting first checkpoint
    +           startTs = System.nanoTime();
    +           check(sequence[2], buffer.getNextNonBlocked());
    +           check(sequence[3], buffer.getNextNonBlocked());
    +           check(sequence[6], buffer.getNextNonBlocked());
    +
    +           // cancelled by cancellation barrier
    +           check(sequence[4], buffer.getNextNonBlocked());
    +           validateAlignmentTime(startTs, buffer);
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(1L);
    +
    +           // the next checkpoint alignment starts now
    +           startTs = System.nanoTime();
    +           check(sequence[9], buffer.getNextNonBlocked());
    +           check(sequence[11], buffer.getNextNonBlocked());
    +           check(sequence[13], buffer.getNextNonBlocked());
    +           check(sequence[15], buffer.getNextNonBlocked());
    +
    +           // checkpoint done
    +           check(sequence[7], buffer.getNextNonBlocked());
    +           validateAlignmentTime(startTs, buffer);
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)));
    +
    +           // queued data
    +           check(sequence[10], buffer.getNextNonBlocked());
    +           check(sequence[14], buffer.getNextNonBlocked());
    +
    +           // trailing data
    +           check(sequence[18], buffer.getNextNonBlocked());
    +           check(sequence[19], buffer.getNextNonBlocked());
    +           check(sequence[20], buffer.getNextNonBlocked());
    +
    +           // all done
    +           assertNull(buffer.getNextNonBlocked());
    +           assertNull(buffer.getNextNonBlocked());
    +
    +           buffer.cleanup();
    +           checkNoTempFilesRemain();
    +
    +           // check overall notifications
    +           verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class));
    +           verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong());
    +   }
    +
    +   /**
    +    * This tests the where a cancellation barrier is received for a 
checkpoint already
    +    * canceled due to receiving a newer checkpoint barrier.
    +    */
    +   @Test
    +   public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws 
Exception {
    +           BufferOrEvent[] sequence = {
    +                           // starting a checkpoint
    +                           /*  0 */ createBuffer(2),
    +                           /*  1 */ createBarrier(3, 1), createBarrier(3, 
0),
    +                           /*  3 */ createBuffer(0), createBuffer(1), 
createBuffer(2),
    +
    +                           // newer checkpoint barrier cancels/subsumes 
pending checkpoint
    +                           /*  6 */ createBarrier(5, 2),
    --- End diff --
    
    I totally agree.


> Add a limit for how much data may be buffered during checkpoint alignment
> -------------------------------------------------------------------------
>
>                 Key: FLINK-4975
>                 URL: https://issues.apache.org/jira/browse/FLINK-4975
>             Project: Flink
>          Issue Type: Improvement
>          Components: State Backends, Checkpointing
>    Affects Versions: 1.1.3
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>             Fix For: 1.2.0, 1.1.4
>
>
> During checkpoint alignment, data may be buffered/spilled.
> We should introduce an upper limit for the spilled data volume. After 
> exceeding that limit, the checkpoint alignment should abort and the 
> checkpoint be canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to