[ https://issues.apache.org/jira/browse/FLINK-4975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15648167#comment-15648167 ]
ASF GitHub Bot commented on FLINK-4975: --------------------------------------- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2754#discussion_r87039138 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTest.java --- @@ -899,26 +926,480 @@ public void testStartAlignmentWithClosedChannels() { } @Test - public void testEndOfStreamWhileCheckpoint() { + public void testEndOfStreamWhileCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + // one checkpoint + createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2), + + // some buffers + createBuffer(0), createBuffer(0), createBuffer(2), + + // start the checkpoint that will be incomplete + createBarrier(2, 2), createBarrier(2, 0), + createBuffer(0), createBuffer(2), createBuffer(1), + + // close one after the barrier one before the barrier + createEndOfPartition(2), createEndOfPartition(1), + createBuffer(0), + + // final end of stream + createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + // data after first checkpoint + check(sequence[3], buffer.getNextNonBlocked()); + check(sequence[4], buffer.getNextNonBlocked()); + check(sequence[5], buffer.getNextNonBlocked()); + assertEquals(1L, buffer.getCurrentCheckpointId()); + + // alignment of second checkpoint + check(sequence[10], buffer.getNextNonBlocked()); + assertEquals(2L, buffer.getCurrentCheckpointId()); + + // first end-of-partition encountered: checkpoint will not be completed + check(sequence[12], buffer.getNextNonBlocked()); + check(sequence[8], buffer.getNextNonBlocked()); + check(sequence[9], buffer.getNextNonBlocked()); + check(sequence[11], buffer.getNextNonBlocked()); + check(sequence[13], buffer.getNextNonBlocked()); + check(sequence[14], buffer.getNextNonBlocked()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + + checkNoTempFilesRemain(); + } + + @Test + public void testSingleChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + createBuffer(0), + createBarrier(1, 0), + createBuffer(0), + createBarrier(2, 0), + createCancellationBarrier(4, 0), + createBarrier(5, 0), + createBuffer(0), + createCancellationBarrier(6, 0), + createBuffer(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + check(sequence[0], buffer.getNextNonBlocked()); + check(sequence[2], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence[6], buffer.getNextNonBlocked()); + assertEquals(5L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L))); + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence[8], buffer.getNextNonBlocked()); + assertEquals(6L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + @Test + public void testMultiChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + // some buffers and a successful checkpoint + /* 0 */ createBuffer(0), createBuffer(2), createBuffer(0), + /* 3 */ createBarrier(1, 1), createBarrier(1, 2), + /* 5 */ createBuffer(2), createBuffer(1), + /* 7 */ createBarrier(1, 0), + /* 8 */ createBuffer(0), createBuffer(2), + + // aborted on last barrier + /* 10 */ createBarrier(2, 0), createBarrier(2, 2), + /* 12 */ createBuffer(0), createBuffer(2), + /* 14 */ createCancellationBarrier(2, 1), + + // successful checkpoint + /* 15 */ createBuffer(2), createBuffer(1), + /* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0), + + // abort on first barrier + /* 20 */ createBuffer(0), createBuffer(1), + /* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2), + /* 24 */ createBuffer(0), + /* 25 */ createBarrier(4, 0), + + // another successful checkpoint + /* 26 */ createBuffer(0), createBuffer(1), createBuffer(2), + /* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0), + /* 32 */ createBuffer(0), createBuffer(1), + + // abort multiple cancellations and a barrier after the cancellations + /* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2), + /* 36 */ createBarrier(6, 0), + + /* 37 */ createBuffer(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = new BarrierBuffer(gate, IO_MANAGER); + + StatefulTask toNotify = mock(StatefulTask.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + // successful first checkpoint, with some aligned buffers + check(sequence[0], buffer.getNextNonBlocked()); + check(sequence[1], buffer.getNextNonBlocked()); + check(sequence[2], buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence[5], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L))); + validateAlignmentTime(startTs, buffer); + + check(sequence[6], buffer.getNextNonBlocked()); + check(sequence[8], buffer.getNextNonBlocked()); + check(sequence[9], buffer.getNextNonBlocked()); + + // canceled checkpoint on last barrier + startTs = System.nanoTime(); + check(sequence[12], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(2L); + validateAlignmentTime(startTs, buffer); + check(sequence[13], buffer.getNextNonBlocked()); + + // one more successful checkpoint + check(sequence[15], buffer.getNextNonBlocked()); + check(sequence[16], buffer.getNextNonBlocked()); + startTs = System.nanoTime(); + check(sequence[20], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L))); + validateAlignmentTime(startTs, buffer); + check(sequence[21], buffer.getNextNonBlocked()); + + // this checkpoint gets immediately canceled + check(sequence[24], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(4L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // some buffers + check(sequence[26], buffer.getNextNonBlocked()); + check(sequence[27], buffer.getNextNonBlocked()); + check(sequence[28], buffer.getNextNonBlocked()); + + // a simple successful checkpoint + startTs = System.nanoTime(); + check(sequence[32], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L))); + validateAlignmentTime(startTs, buffer); + check(sequence[33], buffer.getNextNonBlocked()); + + check(sequence[37], buffer.getNextNonBlocked()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(6L); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + checkNoTempFilesRemain(); + } + + @Test + public void testAbortViaQueuedBarriers() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(1), + /* 1 */ createBarrier(1, 1), createBarrier(1, 2), + /* 3 */ createBuffer(2), createBuffer(0), createBuffer(1), + + // queued barrier and cancellation barrier + /* 6 */ createCancellationBarrier(2, 2), + /* 7 */ createBarrier(2, 1), + + // some intermediate buffers (some queued) + /* 8 */ createBuffer(0), createBuffer(1), createBuffer(2), + + // complete initial checkpoint + /* 11 */ createBarrier(1, 0), + + // some buffers (none queued, since checkpoint is aborted) + /* 12 */ createBuffer(2), createBuffer(1), createBuffer(0), + + // final barrier of aborted checkpoint + /* 15 */ createBarrier(1, 2), --- End diff -- Both late and aborted barriers are handled in the same code path, so the test worked despite that. > Add a limit for how much data may be buffered during checkpoint alignment > ------------------------------------------------------------------------- > > Key: FLINK-4975 > URL: https://issues.apache.org/jira/browse/FLINK-4975 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing > Affects Versions: 1.1.3 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Fix For: 1.2.0, 1.1.4 > > > During checkpoint alignment, data may be buffered/spilled. > We should introduce an upper limit for the spilled data volume. After > exceeding that limit, the checkpoint alignment should abort and the > checkpoint be canceled. -- This message was sent by Atlassian JIRA (v6.3.4#6332)