http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java new file mode 100644 index 0000000..b2ed781 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java @@ -0,0 +1,1426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineOnCancellationBarrierException; +import org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineSubsumedException; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.operators.testutils.DummyEnvironment; + +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Tests for the behavior of the {@link BarrierBuffer} with different {@link BufferBlocker} implements. + */ +public abstract class BarrierBufferTestBase { + + protected static final int PAGE_SIZE = 512; + + private static final Random RND = new Random(); + + private static int sizeCounter = 1; + + abstract BarrierBuffer createBarrierHandler(InputGate gate) throws IOException; + + abstract void validateAlignmentBuffered(long actualBytesBuffered, BufferOrEvent... sequence); + + // ------------------------------------------------------------------------ + // Tests + // ------------------------------------------------------------------------ + + /** + * Validates that the buffer behaves correctly if no checkpoint barriers come, + * for a single input channel. + */ + @Test + public void testSingleChannelNoBarriers() throws Exception { + BufferOrEvent[] sequence = { + createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + for (BufferOrEvent boe : sequence) { + assertEquals(boe, buffer.getNextNonBlocked()); + } + + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + } + + /** + * Validates that the buffer behaves correctly if no checkpoint barriers come, + * for an input with multiple input channels. + */ + @Test + public void testMultiChannelNoBarriers() throws Exception { + BufferOrEvent[] sequence = { + createBuffer(2, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createEndOfPartition(0), + createBuffer(3, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createEndOfPartition(3), + createBuffer(1, PAGE_SIZE), createEndOfPartition(1), createBuffer(2, PAGE_SIZE), createEndOfPartition(2) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 4, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + for (BufferOrEvent boe : sequence) { + assertEquals(boe, buffer.getNextNonBlocked()); + } + + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + } + + /** + * Validates that the buffer preserved the order of elements for a + * input with a single input channel, and checkpoint events. + */ + @Test + public void testSingleChannelWithBarriers() throws Exception { + BufferOrEvent[] sequence = { + createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(1, 0), + createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(2, 0), createBarrier(3, 0), + createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0), + createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); + buffer.registerCheckpointEventHandler(handler); + handler.setNextExpectedCheckpointId(1L); + + for (BufferOrEvent boe : sequence) { + if (boe.isBuffer() || boe.getEvent().getClass() != CheckpointBarrier.class) { + assertEquals(boe, buffer.getNextNonBlocked()); + } + } + + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + } + + /** + * Validates that the buffer correctly aligns the streams for inputs with + * multiple input channels, by buffering and blocking certain inputs. + */ + @Test + public void testMultiChannelWithBarriers() throws Exception { + BufferOrEvent[] sequence = { + // checkpoint with blocked data + createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(1, 1), createBarrier(1, 2), + createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(1, 0), + + // checkpoint without blocked data + createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBarrier(2, 0), createBarrier(2, 1), createBarrier(2, 2), + + // checkpoint with data only from one channel + createBuffer(2, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBarrier(3, 2), + createBuffer(2, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBarrier(3, 0), createBarrier(3, 1), + + // empty checkpoint + createBarrier(4, 1), createBarrier(4, 2), createBarrier(4, 0), + + // checkpoint with blocked data in mixed order + createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(5, 1), + createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + createBarrier(5, 2), + createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + createBarrier(5, 0), + + // some trailing data + createBuffer(0, PAGE_SIZE), + createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); + buffer.registerCheckpointEventHandler(handler); + handler.setNextExpectedCheckpointId(1L); + + // pre checkpoint 1 + check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(1L, handler.getNextExpectedCheckpointId()); + + long startTs = System.nanoTime(); + + // blocking while aligning for checkpoint 1 + check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(1L, handler.getNextExpectedCheckpointId()); + + // checkpoint 1 done, returning buffered data + check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(2L, handler.getNextExpectedCheckpointId()); + validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); + validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment(), sequence[5], sequence[6]); + + check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + + // pre checkpoint 2 + check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[11], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(2L, handler.getNextExpectedCheckpointId()); + + // checkpoint 2 barriers come together + startTs = System.nanoTime(); + check(sequence[17], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(3L, handler.getNextExpectedCheckpointId()); + validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); + validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment()); + + check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint 3 starts, data buffered + check(sequence[20], buffer.getNextNonBlocked(), PAGE_SIZE); + validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment(), sequence[20], sequence[21]); + assertEquals(4L, handler.getNextExpectedCheckpointId()); + check(sequence[21], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint 4 happens without extra data + + // pre checkpoint 5 + check(sequence[27], buffer.getNextNonBlocked(), PAGE_SIZE); + + validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment()); + assertEquals(5L, handler.getNextExpectedCheckpointId()); + + check(sequence[28], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[29], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint 5 aligning + check(sequence[31], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[32], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[33], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[37], buffer.getNextNonBlocked(), PAGE_SIZE); + + // buffered data from checkpoint 5 alignment + check(sequence[34], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[36], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[38], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[39], buffer.getNextNonBlocked(), PAGE_SIZE); + + // remaining data + check(sequence[41], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[42], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[43], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[44], buffer.getNextNonBlocked(), PAGE_SIZE); + + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + validateAlignmentBuffered(handler.getLastReportedBytesBufferedInAlignment(), + sequence[34], sequence[36], sequence[38], sequence[39]); + + buffer.cleanup(); + } + + @Test + public void testMultiChannelTrailingBlockedData() throws Exception { + BufferOrEvent[] sequence = { + createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0), + + createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(2, 1), + createBuffer(1, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createEndOfPartition(1), createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBarrier(2, 2), + createBuffer(2, PAGE_SIZE), createEndOfPartition(2), createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); + buffer.registerCheckpointEventHandler(handler); + handler.setNextExpectedCheckpointId(1L); + + // pre-checkpoint 1 + check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(1L, handler.getNextExpectedCheckpointId()); + + // pre-checkpoint 2 + check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(2L, handler.getNextExpectedCheckpointId()); + check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint 2 alignment + long startTs = System.nanoTime(); + check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[14], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[19], buffer.getNextNonBlocked(), PAGE_SIZE); + validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); + + // end of stream: remaining buffered contents + check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[11], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[17], buffer.getNextNonBlocked(), PAGE_SIZE); + + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + } + + /** + * Validates that the buffer correctly aligns the streams in cases + * where some channels receive barriers from multiple successive checkpoints + * before the pending checkpoint is complete. + */ + @Test + public void testMultiChannelWithQueuedFutureBarriers() throws Exception{ + BufferOrEvent[] sequence = { + // checkpoint 1 - with blocked data + createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(1, 1), createBarrier(1, 2), + createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(1, 0), + createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + + // checkpoint 2 - where future checkpoint barriers come before + // the current checkpoint is complete + createBarrier(2, 1), + createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBarrier(2, 0), + createBarrier(3, 0), createBuffer(0, PAGE_SIZE), + createBarrier(3, 1), createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBarrier(4, 1), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + + // complete checkpoint 2, send a barrier for checkpoints 4 and 5 + createBarrier(2, 2), + createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(4, 0), + createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(5, 1), + + // complete checkpoint 3 + createBarrier(3, 2), + createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(6, 1), + + // complete checkpoint 4, checkpoint 5 remains not fully triggered + createBarrier(4, 2), + createBuffer(2, PAGE_SIZE), + createBuffer(1, PAGE_SIZE), createEndOfPartition(1), + createBuffer(2, PAGE_SIZE), createEndOfPartition(2), + createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); + buffer.registerCheckpointEventHandler(handler); + handler.setNextExpectedCheckpointId(1L); + + // around checkpoint 1 + check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + + check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(2L, handler.getNextExpectedCheckpointId()); + check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + + // alignment of checkpoint 2 - buffering also some barriers for + // checkpoints 3 and 4 + long startTs = System.nanoTime(); + check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[20], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[23], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint 2 completed + check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); + validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); + check(sequence[25], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[27], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[30], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[32], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint 3 completed (emit buffered) + check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[19], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[28], buffer.getNextNonBlocked(), PAGE_SIZE); + + // past checkpoint 3 + check(sequence[36], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[38], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint 4 completed (emit buffered) + check(sequence[22], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[26], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[31], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[33], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[39], buffer.getNextNonBlocked(), PAGE_SIZE); + + // past checkpoint 4, alignment for checkpoint 5 + check(sequence[42], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[45], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[46], buffer.getNextNonBlocked(), PAGE_SIZE); + + // abort checkpoint 5 (end of partition) + check(sequence[37], buffer.getNextNonBlocked(), PAGE_SIZE); + + // start checkpoint 6 alignment + check(sequence[47], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[48], buffer.getNextNonBlocked(), PAGE_SIZE); + + // end of input, emit remainder + check(sequence[43], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[44], buffer.getNextNonBlocked(), PAGE_SIZE); + + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + } + + /** + * Validates that the buffer skips over the current checkpoint if it + * receives a barrier from a later checkpoint on a non-blocked input. + */ + @Test + public void testMultiChannelSkippingCheckpoints() throws Exception { + BufferOrEvent[] sequence = { + // checkpoint 1 - with blocked data + createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(1, 1), createBarrier(1, 2), + createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(1, 0), + createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + + // checkpoint 2 will not complete: pre-mature barrier from checkpoint 3 + createBarrier(2, 1), + createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBarrier(2, 0), + createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(3, 2), + + createBuffer(2, PAGE_SIZE), + createBuffer(1, PAGE_SIZE), createEndOfPartition(1), + createBuffer(2, PAGE_SIZE), createEndOfPartition(2), + createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + AbstractInvokable toNotify = mock(AbstractInvokable.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + // initial data + check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + + // align checkpoint 1 + startTs = System.nanoTime(); + check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(1L, buffer.getCurrentCheckpointId()); + + // checkpoint done - replay buffered + check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); + verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); + check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + + check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + + // alignment of checkpoint 2 + startTs = System.nanoTime(); + check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[15], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint 2 aborted, checkpoint 3 started + check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(3L, buffer.getCurrentCheckpointId()); + validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); + verify(toNotify).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineSubsumedException.class)); + check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint 3 alignment in progress + check(sequence[19], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint 3 aborted (end of partition) + check(sequence[20], buffer.getNextNonBlocked(), PAGE_SIZE); + verify(toNotify).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class)); + + // replay buffered data from checkpoint 3 + check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + + // all the remaining messages + check(sequence[21], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[22], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[23], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[24], buffer.getNextNonBlocked(), PAGE_SIZE); + + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + } + + /** + * Validates that the buffer skips over the current checkpoint if it + * receives a barrier from a later checkpoint on a non-blocked input. + */ + @Test + public void testMultiChannelJumpingOverCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + // checkpoint 1 - with blocked data + createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(1, 1), createBarrier(1, 2), + createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(1, 0), + createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + + // checkpoint 2 will not complete: pre-mature barrier from checkpoint 3 + createBarrier(2, 1), + createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBarrier(2, 0), + createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(3, 1), + createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBarrier(3, 0), + createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(4, 2), + + createBuffer(2, PAGE_SIZE), + createBuffer(1, PAGE_SIZE), createEndOfPartition(1), + createBuffer(2, PAGE_SIZE), createEndOfPartition(2), + createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); + buffer.registerCheckpointEventHandler(handler); + handler.setNextExpectedCheckpointId(1L); + + // checkpoint 1 + check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(1L, buffer.getCurrentCheckpointId()); + + check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + + // alignment of checkpoint 2 + check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(2L, buffer.getCurrentCheckpointId()); + check(sequence[15], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[19], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[21], buffer.getNextNonBlocked(), PAGE_SIZE); + + long startTs = System.nanoTime(); + + // checkpoint 2 aborted, checkpoint 4 started. replay buffered + check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(4L, buffer.getCurrentCheckpointId()); + check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[22], buffer.getNextNonBlocked(), PAGE_SIZE); + + // align checkpoint 4 remainder + check(sequence[25], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[26], buffer.getNextNonBlocked(), PAGE_SIZE); + + validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); + + // checkpoint 4 aborted (due to end of partition) + check(sequence[24], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[27], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[28], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[29], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[30], buffer.getNextNonBlocked(), PAGE_SIZE); + + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + } + + /** + * Validates that the buffer skips over a later checkpoint if it + * receives a barrier from an even later checkpoint on a blocked input. + */ + @Test + public void testMultiChannelSkippingCheckpointsViaBlockedInputs() throws Exception { + BufferOrEvent[] sequence = { + // checkpoint 1 - with blocked data + createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(1, 1), createBarrier(1, 2), + createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(1, 0), + createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + + // checkpoint 2 will not complete: pre-mature barrier from checkpoint 3 + createBarrier(2, 1), + createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBarrier(2, 0), + createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + + createBarrier(3, 0), // queued barrier on blocked input + createBuffer(0, PAGE_SIZE), + + createBarrier(4, 1), // pre-mature barrier on blocked input + createBuffer(1, PAGE_SIZE), + createBuffer(0, PAGE_SIZE), + createBuffer(2, PAGE_SIZE), + + // complete checkpoint 2 + createBarrier(2, 2), + createBuffer(0, PAGE_SIZE), + + createBarrier(3, 2), // should be ignored + createBuffer(2, PAGE_SIZE), + createBarrier(4, 0), + createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBarrier(4, 2), + + createBuffer(1, PAGE_SIZE), createEndOfPartition(1), + createBuffer(2, PAGE_SIZE), createEndOfPartition(2), + createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + // checkpoint 1 + check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(1L, buffer.getCurrentCheckpointId()); + check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + + // alignment of checkpoint 2 + check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[22], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(2L, buffer.getCurrentCheckpointId()); + + // checkpoint 2 completed + check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[15], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint 3 skipped, alignment for 4 started + check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(4L, buffer.getCurrentCheckpointId()); + check(sequence[21], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[24], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[26], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[30], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint 4 completed + check(sequence[20], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[28], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[29], buffer.getNextNonBlocked(), PAGE_SIZE); + + check(sequence[32], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[33], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[34], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[35], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[36], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[37], buffer.getNextNonBlocked(), PAGE_SIZE); + + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + } + + @Test + public void testEarlyCleanup() throws Exception { + BufferOrEvent[] sequence = { + createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0), + + createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(2, 1), + createBuffer(1, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createEndOfPartition(1), createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBarrier(2, 2), + createBuffer(2, PAGE_SIZE), createEndOfPartition(2), createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); + buffer.registerCheckpointEventHandler(handler); + handler.setNextExpectedCheckpointId(1L); + + // pre-checkpoint 1 + check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(1L, handler.getNextExpectedCheckpointId()); + + // pre-checkpoint 2 + check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(2L, handler.getNextExpectedCheckpointId()); + check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint 2 alignment + check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[14], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[19], buffer.getNextNonBlocked(), PAGE_SIZE); + + // end of stream: remaining buffered contents + buffer.getNextNonBlocked(); + buffer.cleanup(); + } + + @Test + public void testStartAlignmentWithClosedChannels() throws Exception { + BufferOrEvent[] sequence = { + // close some channels immediately + createEndOfPartition(2), createEndOfPartition(1), + + // checkpoint without blocked data + createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(3, PAGE_SIZE), + createBarrier(2, 3), createBarrier(2, 0), + + // checkpoint with blocked data + createBuffer(3, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(3, 3), + createBuffer(3, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBarrier(3, 0), + + // empty checkpoint + createBarrier(4, 0), createBarrier(4, 3), + + // some data, one channel closes + createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(3, PAGE_SIZE), + createEndOfPartition(0), + + // checkpoint on last remaining channel + createBuffer(3, PAGE_SIZE), + createBarrier(5, 3), + createBuffer(3, PAGE_SIZE), + createEndOfPartition(3) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 4, Arrays.asList(sequence)); + + BarrierBuffer buffer = createBarrierHandler(gate); + + // pre checkpoint 2 + check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[3], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[4], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint 3 alignment + check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(2L, buffer.getCurrentCheckpointId()); + check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[11], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint 3 buffered + check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(3L, buffer.getCurrentCheckpointId()); + + // after checkpoint 4 + check(sequence[15], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(4L, buffer.getCurrentCheckpointId()); + check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[17], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + + check(sequence[19], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[21], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(5L, buffer.getCurrentCheckpointId()); + check(sequence[22], buffer.getNextNonBlocked(), PAGE_SIZE); + + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + } + + @Test + public void testEndOfStreamWhileCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + // one checkpoint + createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2), + + // some buffers + createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + + // start the checkpoint that will be incomplete + createBarrier(2, 2), createBarrier(2, 0), + createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + + // close one after the barrier one before the barrier + createEndOfPartition(2), createEndOfPartition(1), + createBuffer(0, PAGE_SIZE), + + // final end of stream + createEndOfPartition(0) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + // data after first checkpoint + check(sequence[3], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[4], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(1L, buffer.getCurrentCheckpointId()); + + // alignment of second checkpoint + check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(2L, buffer.getCurrentCheckpointId()); + + // first end-of-partition encountered: checkpoint will not be completed + check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[11], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[14], buffer.getNextNonBlocked(), PAGE_SIZE); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + } + + @Test + public void testSingleChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + createBuffer(0, PAGE_SIZE), + createBarrier(1, 0), + createBuffer(0, PAGE_SIZE), + createBarrier(2, 0), + createCancellationBarrier(4, 0), + createBarrier(5, 0), + createBuffer(0, PAGE_SIZE), + createCancellationBarrier(6, 0), + createBuffer(0, PAGE_SIZE) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 1, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + AbstractInvokable toNotify = mock(AbstractInvokable.class); + buffer.registerCheckpointEventHandler(toNotify); + + check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(5L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class)); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); + assertEquals(6L, buffer.getCurrentCheckpointId()); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class)); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + buffer.cleanup(); + } + + @Test + public void testMultiChannelAbortCheckpoint() throws Exception { + BufferOrEvent[] sequence = { + // some buffers and a successful checkpoint + /* 0 */ createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + /* 3 */ createBarrier(1, 1), createBarrier(1, 2), + /* 5 */ createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + /* 7 */ createBarrier(1, 0), + /* 8 */ createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + + // aborted on last barrier + /* 10 */ createBarrier(2, 0), createBarrier(2, 2), + /* 12 */ createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + /* 14 */ createCancellationBarrier(2, 1), + + // successful checkpoint + /* 15 */ createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + /* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0), + + // abort on first barrier + /* 20 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + /* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2), + /* 24 */ createBuffer(0, PAGE_SIZE), + /* 25 */ createBarrier(4, 0), + + // another successful checkpoint + /* 26 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + /* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0), + /* 32 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + + // abort multiple cancellations and a barrier after the cancellations + /* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2), + /* 36 */ createBarrier(6, 0), + + /* 37 */ createBuffer(0, PAGE_SIZE) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + AbstractInvokable toNotify = mock(AbstractInvokable.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + // successful first checkpoint, with some aligned buffers + check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[1], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + startTs = System.nanoTime(); + check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); + validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); + + check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); + + // canceled checkpoint on last barrier + startTs = System.nanoTime(); + check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); + validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); + check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + + // one more successful checkpoint + check(sequence[15], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); + startTs = System.nanoTime(); + check(sequence[20], buffer.getNextNonBlocked(), PAGE_SIZE); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(3L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); + validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); + check(sequence[21], buffer.getNextNonBlocked(), PAGE_SIZE); + + // this checkpoint gets immediately canceled + check(sequence[24], buffer.getNextNonBlocked(), PAGE_SIZE); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), any(CheckpointDeclineOnCancellationBarrierException.class)); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // some buffers + check(sequence[26], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[27], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[28], buffer.getNextNonBlocked(), PAGE_SIZE); + + // a simple successful checkpoint + startTs = System.nanoTime(); + check(sequence[32], buffer.getNextNonBlocked(), PAGE_SIZE); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); + validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); + check(sequence[33], buffer.getNextNonBlocked(), PAGE_SIZE); + + check(sequence[37], buffer.getNextNonBlocked(), PAGE_SIZE); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), any(CheckpointDeclineOnCancellationBarrierException.class)); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + } + + @Test + public void testAbortViaQueuedBarriers() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(1, PAGE_SIZE), + /* 1 */ createBarrier(1, 1), createBarrier(1, 2), + /* 3 */ createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + + // queued barrier and cancellation barrier + /* 6 */ createCancellationBarrier(2, 2), + /* 7 */ createBarrier(2, 1), + + // some intermediate buffers (some queued) + /* 8 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + + // complete initial checkpoint + /* 11 */ createBarrier(1, 0), + + // some buffers (none queued, since checkpoint is aborted) + /* 12 */ createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + + // final barrier of aborted checkpoint + /* 15 */ createBarrier(2, 0), + + // some more buffers + /* 16 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + AbstractInvokable toNotify = mock(AbstractInvokable.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + + // starting first checkpoint + startTs = System.nanoTime(); + check(sequence[4], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); + + // finished first checkpoint + check(sequence[3], buffer.getNextNonBlocked(), PAGE_SIZE); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(1L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); + validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); + + check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + + // re-read the queued cancellation barriers + check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(2L), any(CheckpointDeclineOnCancellationBarrierException.class)); + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[14], buffer.getNextNonBlocked(), PAGE_SIZE); + + check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[17], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + + // no further alignment should have happened + assertEquals(0L, buffer.getAlignmentDurationNanos()); + + // no further checkpoint (abort) notifications + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(CheckpointDeclineOnCancellationBarrierException.class)); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + } + + /** + * This tests the where a replay of queued checkpoint barriers meets + * a canceled checkpoint. + * + * <p>The replayed newer checkpoint barrier must not try to cancel the + * already canceled checkpoint. + */ + @Test + public void testAbortWhileHavingQueuedBarriers() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(1, PAGE_SIZE), + /* 1 */ createBarrier(1, 1), + /* 2 */ createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + + // queued barrier and cancellation barrier + /* 5 */ createBarrier(2, 1), + + // some queued buffers + /* 6 */ createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + + // cancel the initial checkpoint + /* 8 */ createCancellationBarrier(1, 0), + + // some more buffers + /* 9 */ createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + + // ignored barrier - already canceled and moved to next checkpoint + /* 12 */ createBarrier(1, 2), + + // some more buffers + /* 13 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + + // complete next checkpoint regularly + /* 16 */ createBarrier(2, 0), createBarrier(2, 2), + + // some more buffers + /* 18 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + AbstractInvokable toNotify = mock(AbstractInvokable.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + + // starting first checkpoint + startTs = System.nanoTime(); + check(sequence[2], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[3], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[6], buffer.getNextNonBlocked(), PAGE_SIZE); + + // cancelled by cancellation barrier + check(sequence[4], buffer.getNextNonBlocked(), PAGE_SIZE); + validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); + verify(toNotify).abortCheckpointOnBarrier(eq(1L), any(CheckpointDeclineOnCancellationBarrierException.class)); + + // the next checkpoint alignment starts now + startTs = System.nanoTime(); + check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[11], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[15], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint done + check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); + verify(toNotify).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); + + // queued data + check(sequence[10], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[14], buffer.getNextNonBlocked(), PAGE_SIZE); + + // trailing data + check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[19], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[20], buffer.getNextNonBlocked(), PAGE_SIZE); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + + // check overall notifications + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class)); + } + + /** + * This tests the where a cancellation barrier is received for a checkpoint already + * canceled due to receiving a newer checkpoint barrier. + */ + @Test + public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws Exception { + BufferOrEvent[] sequence = { + // starting a checkpoint + /* 0 */ createBuffer(2, PAGE_SIZE), + /* 1 */ createBarrier(3, 1), createBarrier(3, 0), + /* 3 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + + // newer checkpoint barrier cancels/subsumes pending checkpoint + /* 6 */ createBarrier(5, 2), + + // some queued buffers + /* 7 */ createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + + // cancel barrier the initial checkpoint /it is already canceled) + /* 10 */ createCancellationBarrier(3, 2), + + // some more buffers + /* 11 */ createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + + // complete next checkpoint regularly + /* 14 */ createBarrier(5, 0), createBarrier(5, 1), + + // some more buffers + /* 16 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE) + }; + + MockInputGate gate = new MockInputGate(PAGE_SIZE, 3, Arrays.asList(sequence)); + BarrierBuffer buffer = createBarrierHandler(gate); + + AbstractInvokable toNotify = mock(AbstractInvokable.class); + buffer.registerCheckpointEventHandler(toNotify); + + long startTs; + + // validate the sequence + + check(sequence[0], buffer.getNextNonBlocked(), PAGE_SIZE); + + // beginning of first checkpoint + check(sequence[5], buffer.getNextNonBlocked(), PAGE_SIZE); + + // future barrier aborts checkpoint + startTs = System.nanoTime(); + check(sequence[3], buffer.getNextNonBlocked(), PAGE_SIZE); + verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(3L), any(CheckpointDeclineSubsumedException.class)); + check(sequence[4], buffer.getNextNonBlocked(), PAGE_SIZE); + + // alignment of next checkpoint + check(sequence[8], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[9], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[12], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[13], buffer.getNextNonBlocked(), PAGE_SIZE); + + // checkpoint finished + check(sequence[7], buffer.getNextNonBlocked(), PAGE_SIZE); + validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); + verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(5L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); + check(sequence[11], buffer.getNextNonBlocked(), PAGE_SIZE); + + // remaining data + check(sequence[16], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[17], buffer.getNextNonBlocked(), PAGE_SIZE); + check(sequence[18], buffer.getNextNonBlocked(), PAGE_SIZE); + + // all done + assertNull(buffer.getNextNonBlocked()); + assertNull(buffer.getNextNonBlocked()); + + buffer.cleanup(); + + // check overall notifications + verify(toNotify, times(1)).triggerCheckpointOnBarrier(any(CheckpointMetaData.class), any(CheckpointOptions.class), any(CheckpointMetrics.class)); + verify(toNotify, times(1)).abortCheckpointOnBarrier(anyLong(), any(Throwable.class)); + } + + // ------------------------------------------------------------------------ + // Utils + // ------------------------------------------------------------------------ + + private static BufferOrEvent createBarrier(long checkpointId, int channel) { + return new BufferOrEvent(new CheckpointBarrier( + checkpointId, System.currentTimeMillis(), CheckpointOptions.forCheckpointWithDefaultLocation()), channel); + } + + private static BufferOrEvent createCancellationBarrier(long checkpointId, int channel) { + return new BufferOrEvent(new CancelCheckpointMarker(checkpointId), channel); + } + + private static BufferOrEvent createBuffer(int channel, int pageSize) { + final int size = sizeCounter++; + byte[] bytes = new byte[size]; + RND.nextBytes(bytes); + + MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(pageSize); + memory.put(0, bytes); + + Buffer buf = new NetworkBuffer(memory, FreeingBufferRecycler.INSTANCE); + buf.setSize(size); + + // retain an additional time so it does not get disposed after being read by the input gate + buf.retainBuffer(); + + return new BufferOrEvent(buf, channel); + } + + private static BufferOrEvent createEndOfPartition(int channel) { + return new BufferOrEvent(EndOfPartitionEvent.INSTANCE, channel); + } + + private static void check(BufferOrEvent expected, BufferOrEvent present, int pageSize) { + assertNotNull(expected); + assertNotNull(present); + assertEquals(expected.isBuffer(), present.isBuffer()); + + if (expected.isBuffer()) { + assertEquals(expected.getBuffer().getMaxCapacity(), present.getBuffer().getMaxCapacity()); + assertEquals(expected.getBuffer().getSize(), present.getBuffer().getSize()); + MemorySegment expectedMem = expected.getBuffer().getMemorySegment(); + MemorySegment presentMem = present.getBuffer().getMemorySegment(); + assertTrue("memory contents differs", expectedMem.compare(presentMem, 0, 0, pageSize) == 0); + } else { + assertEquals(expected.getEvent(), present.getEvent()); + } + } + + private static void validateAlignmentTime(long startTimestamp, long alignmentDuration) { + final long elapsed = System.nanoTime() - startTimestamp; + assertTrue("wrong alignment time", alignmentDuration <= elapsed); + } + + // ------------------------------------------------------------------------ + // Testing Mocks + // ------------------------------------------------------------------------ + + /** + * The invokable handler used for triggering checkpoint and validation. + */ + private static class ValidatingCheckpointHandler extends AbstractInvokable { + + private long nextExpectedCheckpointId = -1L; + private long lastReportedBytesBufferedInAlignment = -1; + + public ValidatingCheckpointHandler() { + super(new DummyEnvironment("test", 1, 0)); + } + + public void setNextExpectedCheckpointId(long nextExpectedCheckpointId) { + this.nextExpectedCheckpointId = nextExpectedCheckpointId; + } + + public long getNextExpectedCheckpointId() { + return nextExpectedCheckpointId; + } + + long getLastReportedBytesBufferedInAlignment() { + return lastReportedBytesBufferedInAlignment; + } + + @Override + public void invoke() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean triggerCheckpoint( + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions) throws Exception { + throw new UnsupportedOperationException("should never be called"); + } + + @Override + public void triggerCheckpointOnBarrier( + CheckpointMetaData checkpointMetaData, + CheckpointOptions checkpointOptions, + CheckpointMetrics checkpointMetrics) throws Exception { + assertTrue("wrong checkpoint id", nextExpectedCheckpointId == -1L || + nextExpectedCheckpointId == checkpointMetaData.getCheckpointId()); + + assertTrue(checkpointMetaData.getTimestamp() > 0); + assertTrue(checkpointMetrics.getBytesBufferedInAlignment() >= 0); + assertTrue(checkpointMetrics.getAlignmentDurationNanos() >= 0); + + nextExpectedCheckpointId++; + lastReportedBytesBufferedInAlignment = checkpointMetrics.getBytesBufferedInAlignment(); + } + + @Override + public void abortCheckpointOnBarrier(long checkpointId, Throwable cause) {} + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + throw new UnsupportedOperationException("should never be called"); + } + } + + /** + * The matcher used for verifying checkpoint equality. + */ + private static class CheckpointMatcher extends BaseMatcher<CheckpointMetaData> { + + private final long checkpointId; + + CheckpointMatcher(long checkpointId) { + this.checkpointId = checkpointId; + } + + @Override + public boolean matches(Object o) { + return o != null && + o.getClass() == CheckpointMetaData.class && + ((CheckpointMetaData) o).getCheckpointId() == checkpointId; + } + + @Override + public void describeTo(Description description) { + description.appendText("CheckpointMetaData - id = " + checkpointId); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java new file mode 100644 index 0000000..4448edd --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; +import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; + +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for {@link BufferBlocker}. + */ +public abstract class BufferBlockerTestBase { + + protected static final int PAGE_SIZE = 4096; + + abstract BufferBlocker createBufferBlocker(); + + @Test + public void testRollOverEmptySequences() throws IOException { + BufferBlocker bufferBlocker = createBufferBlocker(); + assertNull(bufferBlocker.rollOverReusingResources()); + assertNull(bufferBlocker.rollOverReusingResources()); + assertNull(bufferBlocker.rollOverReusingResources()); + } + + @Test + public void testSpillAndRollOverSimple() throws IOException { + final Random rnd = new Random(); + final Random bufferRnd = new Random(); + + final int maxNumEventsAndBuffers = 3000; + final int maxNumChannels = 1656; + + BufferBlocker bufferBlocker = createBufferBlocker(); + + // do multiple spilling / rolling over rounds + for (int round = 0; round < 5; round++) { + + final long bufferSeed = rnd.nextLong(); + bufferRnd.setSeed(bufferSeed); + + final int numEventsAndBuffers = rnd.nextInt(maxNumEventsAndBuffers) + 1; + final int numChannels = rnd.nextInt(maxNumChannels) + 1; + + final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128); + + // generate sequence + for (int i = 0; i < numEventsAndBuffers; i++) { + boolean isEvent = rnd.nextDouble() < 0.05d; + BufferOrEvent evt; + if (isEvent) { + evt = generateRandomEvent(rnd, numChannels); + events.add(evt); + } else { + evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels)); + } + bufferBlocker.add(evt); + } + + // reset and create reader + bufferRnd.setSeed(bufferSeed); + + BufferOrEventSequence seq = bufferBlocker.rollOverReusingResources(); + seq.open(); + + // read and validate the sequence + + int numEvent = 0; + for (int i = 0; i < numEventsAndBuffers; i++) { + BufferOrEvent next = seq.getNext(); + assertNotNull(next); + if (next.isEvent()) { + BufferOrEvent expected = events.get(numEvent++); + assertEquals(expected.getEvent(), next.getEvent()); + assertEquals(expected.getChannelIndex(), next.getChannelIndex()); + } else { + validateBuffer(next, bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels)); + } + } + + // no further data + assertNull(seq.getNext()); + + // all events need to be consumed + assertEquals(events.size(), numEvent); + + seq.cleanup(); + } + } + + @Test + public void testSpillWhileReading() throws IOException { + final int sequences = 10; + + final Random rnd = new Random(); + + final int maxNumEventsAndBuffers = 30000; + final int maxNumChannels = 1656; + + int sequencesConsumed = 0; + + ArrayDeque<SequenceToConsume> pendingSequences = new ArrayDeque<SequenceToConsume>(); + SequenceToConsume currentSequence = null; + int currentNumEvents = 0; + int currentNumRecordAndEvents = 0; + + BufferBlocker bufferBlocker = createBufferBlocker(); + + // do multiple spilling / rolling over rounds + for (int round = 0; round < 2 * sequences; round++) { + + if (round % 2 == 1) { + // make this an empty sequence + assertNull(bufferBlocker.rollOverReusingResources()); + } else { + // proper spilled sequence + final long bufferSeed = rnd.nextLong(); + final Random bufferRnd = new Random(bufferSeed); + + final int numEventsAndBuffers = rnd.nextInt(maxNumEventsAndBuffers) + 1; + final int numChannels = rnd.nextInt(maxNumChannels) + 1; + + final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128); + + int generated = 0; + while (generated < numEventsAndBuffers) { + + if (currentSequence == null || rnd.nextDouble() < 0.5) { + // add a new record + boolean isEvent = rnd.nextDouble() < 0.05; + BufferOrEvent evt; + if (isEvent) { + evt = generateRandomEvent(rnd, numChannels); + events.add(evt); + } else { + evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels)); + } + bufferBlocker.add(evt); + generated++; + } else { + // consume a record + BufferOrEvent next = currentSequence.sequence.getNext(); + assertNotNull(next); + if (next.isEvent()) { + BufferOrEvent expected = currentSequence.events.get(currentNumEvents++); + assertEquals(expected.getEvent(), next.getEvent()); + assertEquals(expected.getChannelIndex(), next.getChannelIndex()); + } else { + Random validationRnd = currentSequence.bufferRnd; + validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numChannels)); + } + + currentNumRecordAndEvents++; + if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) { + // done with the sequence + currentSequence.sequence.cleanup(); + sequencesConsumed++; + + // validate we had all events + assertEquals(currentSequence.events.size(), currentNumEvents); + + // reset + currentSequence = pendingSequences.pollFirst(); + if (currentSequence != null) { + currentSequence.sequence.open(); + } + + currentNumRecordAndEvents = 0; + currentNumEvents = 0; + } + } + } + + // done generating a sequence. queue it for consumption + bufferRnd.setSeed(bufferSeed); + BufferOrEventSequence seq = bufferBlocker.rollOverReusingResources(); + + SequenceToConsume stc = new SequenceToConsume(bufferRnd, events, seq, numEventsAndBuffers, numChannels); + + if (currentSequence == null) { + currentSequence = stc; + stc.sequence.open(); + } else { + pendingSequences.addLast(stc); + } + } + } + + // consume all the remainder + while (currentSequence != null) { + // consume a record + BufferOrEvent next = currentSequence.sequence.getNext(); + assertNotNull(next); + if (next.isEvent()) { + BufferOrEvent expected = currentSequence.events.get(currentNumEvents++); + assertEquals(expected.getEvent(), next.getEvent()); + assertEquals(expected.getChannelIndex(), next.getChannelIndex()); + } else { + Random validationRnd = currentSequence.bufferRnd; + validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numChannels)); + } + + currentNumRecordAndEvents++; + if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) { + // done with the sequence + currentSequence.sequence.cleanup(); + sequencesConsumed++; + + // validate we had all events + assertEquals(currentSequence.events.size(), currentNumEvents); + + // reset + currentSequence = pendingSequences.pollFirst(); + if (currentSequence != null) { + currentSequence.sequence.open(); + } + + currentNumRecordAndEvents = 0; + currentNumEvents = 0; + } + } + + assertEquals(sequences, sequencesConsumed); + } + + // ------------------------------------------------------------------------ + // Utils + // ------------------------------------------------------------------------ + + private static BufferOrEvent generateRandomEvent(Random rnd, int numChannels) { + long magicNumber = rnd.nextLong(); + byte[] data = new byte[rnd.nextInt(1000)]; + rnd.nextBytes(data); + TestEvent evt = new TestEvent(magicNumber, data); + + int channelIndex = rnd.nextInt(numChannels); + + return new BufferOrEvent(evt, channelIndex); + } + + public static BufferOrEvent generateRandomBuffer(int size, int channelIndex) { + MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE); + for (int i = 0; i < size; i++) { + seg.put(i, (byte) i); + } + + Buffer buf = new NetworkBuffer(seg, FreeingBufferRecycler.INSTANCE); + buf.setSize(size); + return new BufferOrEvent(buf, channelIndex); + } + + private static void validateBuffer(BufferOrEvent boe, int expectedSize, int expectedChannelIndex) { + assertEquals("wrong channel index", expectedChannelIndex, boe.getChannelIndex()); + assertTrue("is not buffer", boe.isBuffer()); + + Buffer buf = boe.getBuffer(); + assertEquals("wrong buffer size", expectedSize, buf.getSize()); + + MemorySegment seg = buf.getMemorySegment(); + for (int i = 0; i < expectedSize; i++) { + byte expected = (byte) i; + if (expected != seg.get(i)) { + fail(String.format( + "wrong buffer contents at position %s : expected=%d , found=%d", i, expected, seg.get(i))); + } + } + } + + /** + * Wrappers the buffered sequence and related elements for consuming and validation. + */ + private static class SequenceToConsume { + + final BufferOrEventSequence sequence; + final ArrayList<BufferOrEvent> events; + final Random bufferRnd; + final int numBuffersAndEvents; + final int numChannels; + + private SequenceToConsume( + Random bufferRnd, + ArrayList<BufferOrEvent> events, + BufferOrEventSequence sequence, + int numBuffersAndEvents, + int numChannels) { + this.bufferRnd = bufferRnd; + this.events = events; + this.sequence = sequence; + this.numBuffersAndEvents = numBuffersAndEvents; + this.numChannels = numChannels; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java index ee58052..b70ba24 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java @@ -18,13 +18,8 @@ package org.apache.flink.streaming.runtime.io; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemorySegmentFactory; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.io.network.buffer.Buffer; -import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; -import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.junit.After; @@ -32,35 +27,23 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.File; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Random; +import java.io.IOException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** * Tests for {@link BufferSpiller}. */ -public class BufferSpillerTest { - - private static final Logger LOG = LoggerFactory.getLogger(BufferSpillerTest.class); - - private static final int PAGE_SIZE = 4096; +public class BufferSpillerTest extends BufferBlockerTestBase { private static IOManager ioManager; private BufferSpiller spiller; - // ------------------------------------------------------------------------ // Setup / Cleanup // ------------------------------------------------------------------------ @@ -76,26 +59,14 @@ public class BufferSpillerTest { } @Before - public void createSpiller() { - try { - spiller = new BufferSpiller(ioManager, PAGE_SIZE); - } - catch (Exception e) { - e.printStackTrace(); - fail("Cannot create BufferSpiller: " + e.getMessage()); - } + public void createSpiller() throws IOException { + spiller = new BufferSpiller(ioManager, PAGE_SIZE); } @After - public void cleanupSpiller() { + public void cleanupSpiller() throws IOException { if (spiller != null) { - try { - spiller.close(); - } - catch (Exception e) { - e.printStackTrace(); - fail("Cannot properly close the BufferSpiller: " + e.getMessage()); - } + spiller.close(); assertFalse(spiller.getCurrentChannel().isOpen()); assertFalse(spiller.getCurrentSpillFile().exists()); @@ -104,239 +75,9 @@ public class BufferSpillerTest { checkNoTempFilesRemain(); } - // ------------------------------------------------------------------------ - // Tests - // ------------------------------------------------------------------------ - - @Test - public void testRollOverEmptySequences() { - try { - assertNull(spiller.rollOver()); - assertNull(spiller.rollOver()); - assertNull(spiller.rollOver()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSpillAndRollOverSimple() { - try { - final Random rnd = new Random(); - final Random bufferRnd = new Random(); - - final int maxNumEventsAndBuffers = 3000; - final int maxNumChannels = 1656; - - // do multiple spilling / rolling over rounds - for (int round = 0; round < 5; round++) { - - final long bufferSeed = rnd.nextLong(); - bufferRnd.setSeed(bufferSeed); - - final int numEventsAndBuffers = rnd.nextInt(maxNumEventsAndBuffers) + 1; - final int numChannels = rnd.nextInt(maxNumChannels) + 1; - - final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128); - - // generate sequence - for (int i = 0; i < numEventsAndBuffers; i++) { - boolean isEvent = rnd.nextDouble() < 0.05d; - if (isEvent) { - BufferOrEvent evt = generateRandomEvent(rnd, numChannels); - events.add(evt); - spiller.add(evt); - } - else { - BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels)); - spiller.add(evt); - } - } - - // reset and create reader - bufferRnd.setSeed(bufferSeed); - - BufferSpiller.SpilledBufferOrEventSequence seq = spiller.rollOver(); - seq.open(); - - // read and validate the sequence - - int numEvent = 0; - for (int i = 0; i < numEventsAndBuffers; i++) { - BufferOrEvent next = seq.getNext(); - assertNotNull(next); - if (next.isEvent()) { - BufferOrEvent expected = events.get(numEvent++); - assertEquals(expected.getEvent(), next.getEvent()); - assertEquals(expected.getChannelIndex(), next.getChannelIndex()); - } - else { - validateBuffer(next, bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels)); - } - } - - // no further data - assertNull(seq.getNext()); - - // all events need to be consumed - assertEquals(events.size(), numEvent); - - seq.cleanup(); - } - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testSpillWhileReading() { - LOG.info("Starting SpillWhileReading test"); - - try { - final int sequences = 10; - - final Random rnd = new Random(); - - final int maxNumEventsAndBuffers = 30000; - final int maxNumChannels = 1656; - - int sequencesConsumed = 0; - - ArrayDeque<SequenceToConsume> pendingSequences = new ArrayDeque<SequenceToConsume>(); - SequenceToConsume currentSequence = null; - int currentNumEvents = 0; - int currentNumRecordAndEvents = 0; - - // do multiple spilling / rolling over rounds - for (int round = 0; round < 2 * sequences; round++) { - - if (round % 2 == 1) { - // make this an empty sequence - assertNull(spiller.rollOver()); - } - else { - // proper spilled sequence - final long bufferSeed = rnd.nextLong(); - final Random bufferRnd = new Random(bufferSeed); - - final int numEventsAndBuffers = rnd.nextInt(maxNumEventsAndBuffers) + 1; - final int numChannels = rnd.nextInt(maxNumChannels) + 1; - - final ArrayList<BufferOrEvent> events = new ArrayList<BufferOrEvent>(128); - - int generated = 0; - while (generated < numEventsAndBuffers) { - - if (currentSequence == null || rnd.nextDouble() < 0.5) { - // add a new record - boolean isEvent = rnd.nextDouble() < 0.05; - if (isEvent) { - BufferOrEvent evt = generateRandomEvent(rnd, numChannels); - events.add(evt); - spiller.add(evt); - } - else { - BufferOrEvent evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numChannels)); - spiller.add(evt); - } - generated++; - } - else { - // consume a record - BufferOrEvent next = currentSequence.sequence.getNext(); - assertNotNull(next); - if (next.isEvent()) { - BufferOrEvent expected = currentSequence.events.get(currentNumEvents++); - assertEquals(expected.getEvent(), next.getEvent()); - assertEquals(expected.getChannelIndex(), next.getChannelIndex()); - } - else { - Random validationRnd = currentSequence.bufferRnd; - validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numChannels)); - } - - currentNumRecordAndEvents++; - if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) { - // done with the sequence - currentSequence.sequence.cleanup(); - sequencesConsumed++; - - // validate we had all events - assertEquals(currentSequence.events.size(), currentNumEvents); - - // reset - currentSequence = pendingSequences.pollFirst(); - if (currentSequence != null) { - currentSequence.sequence.open(); - } - - currentNumRecordAndEvents = 0; - currentNumEvents = 0; - } - } - } - - // done generating a sequence. queue it for consumption - bufferRnd.setSeed(bufferSeed); - BufferSpiller.SpilledBufferOrEventSequence seq = spiller.rollOver(); - - SequenceToConsume stc = new SequenceToConsume(bufferRnd, events, seq, numEventsAndBuffers, numChannels); - - if (currentSequence == null) { - currentSequence = stc; - stc.sequence.open(); - } - else { - pendingSequences.addLast(stc); - } - } - } - - // consume all the remainder - while (currentSequence != null) { - // consume a record - BufferOrEvent next = currentSequence.sequence.getNext(); - assertNotNull(next); - if (next.isEvent()) { - BufferOrEvent expected = currentSequence.events.get(currentNumEvents++); - assertEquals(expected.getEvent(), next.getEvent()); - assertEquals(expected.getChannelIndex(), next.getChannelIndex()); - } - else { - Random validationRnd = currentSequence.bufferRnd; - validateBuffer(next, validationRnd.nextInt(PAGE_SIZE) + 1, validationRnd.nextInt(currentSequence.numChannels)); - } - - currentNumRecordAndEvents++; - if (currentNumRecordAndEvents == currentSequence.numBuffersAndEvents) { - // done with the sequence - currentSequence.sequence.cleanup(); - sequencesConsumed++; - - // validate we had all events - assertEquals(currentSequence.events.size(), currentNumEvents); - - // reset - currentSequence = pendingSequences.pollFirst(); - if (currentSequence != null) { - currentSequence.sequence.open(); - } - - currentNumRecordAndEvents = 0; - currentNumEvents = 0; - } - } - - assertEquals(sequences, sequencesConsumed); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } + @Override + public BufferBlocker createBufferBlocker() { + return spiller; } /** @@ -351,50 +92,7 @@ public class BufferSpillerTest { assertEquals( "Changed the header format, but did not adjust the HEADER_SIZE field", BufferSpiller.HEADER_SIZE + size, - spiller.getBytesWritten()); - } - - // ------------------------------------------------------------------------ - // Utils - // ------------------------------------------------------------------------ - - private static BufferOrEvent generateRandomEvent(Random rnd, int numChannels) { - long magicNumber = rnd.nextLong(); - byte[] data = new byte[rnd.nextInt(1000)]; - rnd.nextBytes(data); - TestEvent evt = new TestEvent(magicNumber, data); - - int channelIndex = rnd.nextInt(numChannels); - - return new BufferOrEvent(evt, channelIndex); - } - - private static BufferOrEvent generateRandomBuffer(int size, int channelIndex) { - MemorySegment seg = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE); - for (int i = 0; i < size; i++) { - seg.put(i, (byte) i); - } - - Buffer buf = new NetworkBuffer(seg, FreeingBufferRecycler.INSTANCE); - buf.setSize(size); - return new BufferOrEvent(buf, channelIndex); - } - - private static void validateBuffer(BufferOrEvent boe, int expectedSize, int expectedChannelIndex) { - assertEquals("wrong channel index", expectedChannelIndex, boe.getChannelIndex()); - assertTrue("is not buffer", boe.isBuffer()); - - Buffer buf = boe.getBuffer(); - assertEquals("wrong buffer size", expectedSize, buf.getSize()); - - MemorySegment seg = buf.getMemorySegment(); - for (int i = 0; i < expectedSize; i++) { - byte expected = (byte) i; - if (expected != seg.get(i)) { - fail(String.format( - "wrong buffer contents at position %s : expected=%d , found=%d", i, expected, seg.get(i))); - } - } + spiller.getBytesBlocked()); } private static void checkNoTempFilesRemain() { @@ -407,23 +105,4 @@ public class BufferSpillerTest { } } } - - private static class SequenceToConsume { - - final BufferSpiller.SpilledBufferOrEventSequence sequence; - final ArrayList<BufferOrEvent> events; - final Random bufferRnd; - final int numBuffersAndEvents; - final int numChannels; - - private SequenceToConsume(Random bufferRnd, ArrayList<BufferOrEvent> events, - BufferSpiller.SpilledBufferOrEventSequence sequence, - int numBuffersAndEvents, int numChannels) { - this.bufferRnd = bufferRnd; - this.events = events; - this.sequence = sequence; - this.numBuffersAndEvents = numBuffersAndEvents; - this.numChannels = numChannels; - } - } } http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java new file mode 100644 index 0000000..e7bf128 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; + +/** + * Tests for {@link CachedBufferBlocker}. + */ +public class CachedBufferBlockerTest extends BufferBlockerTestBase { + + private CachedBufferBlocker bufferBlocker; + + // ------------------------------------------------------------------------ + // Setup / Cleanup + // ------------------------------------------------------------------------ + + @Before + public void createBlocker() { + bufferBlocker = new CachedBufferBlocker(PAGE_SIZE); + } + + @After + public void cleanupBlocker() throws IOException { + if (bufferBlocker != null) { + bufferBlocker.close(); + } + } + + @Override + public BufferBlocker createBufferBlocker() { + return bufferBlocker; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3126bf52/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java new file mode 100644 index 0000000..a5f9469 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.io.network.partition.consumer.InputGate; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; + +/** + * Tests for the behaviors of the {@link BarrierBuffer} with {@link CachedBufferBlocker}. + */ +public class CreditBasedBarrierBufferTest extends BarrierBufferTestBase { + + @Override + public BarrierBuffer createBarrierHandler(InputGate gate) throws IOException { + return new BarrierBuffer(gate, new CachedBufferBlocker(PAGE_SIZE)); + } + + @Override + public void validateAlignmentBuffered(long actualBytesBuffered, BufferOrEvent... sequence) { + long expectedBuffered = 0; + for (BufferOrEvent boe : sequence) { + if (boe.isBuffer()) { + expectedBuffered += PAGE_SIZE; + } + } + + assertEquals("Wrong alignment buffered bytes", actualBytesBuffered, expectedBuffered); + } +}