This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ad4404854d8fe6c75bcef301e69a867673c0ea5b Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Sat Jun 15 10:29:37 2019 +0200 [hotfix][test] Drop unnecessary pageSize argument in BufferBarierTestBase#createBuffer --- .../runtime/io/BarrierBufferTestBase.java | 234 ++++++++++----------- 1 file changed, 117 insertions(+), 117 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java index 908a199..3b4f65f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java @@ -112,8 +112,8 @@ public abstract class BarrierBufferTestBase { @Test public void testSingleChannelNoBarriers() throws Exception { BufferOrEvent[] sequence = { - createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), - createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + createBuffer(0), createBuffer(0), + createBuffer(0), createEndOfPartition(0) }; buffer = createBarrierBuffer(1, sequence); @@ -131,10 +131,10 @@ public abstract class BarrierBufferTestBase { @Test public void testMultiChannelNoBarriers() throws Exception { BufferOrEvent[] sequence = { - createBuffer(2, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), - createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createEndOfPartition(0), - createBuffer(3, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createEndOfPartition(3), - createBuffer(1, PAGE_SIZE), createEndOfPartition(1), createBuffer(2, PAGE_SIZE), createEndOfPartition(2) + createBuffer(2), createBuffer(2), createBuffer(0), + createBuffer(1), createBuffer(0), createEndOfPartition(0), + createBuffer(3), createBuffer(1), createEndOfPartition(3), + createBuffer(1), createEndOfPartition(1), createBuffer(2), createEndOfPartition(2) }; buffer = createBarrierBuffer(4, sequence); @@ -152,13 +152,13 @@ public abstract class BarrierBufferTestBase { @Test public void testSingleChannelWithBarriers() throws Exception { BufferOrEvent[] sequence = { - createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(0), createBuffer(0), createBuffer(0), createBarrier(1, 0), - createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(0), createBuffer(0), createBuffer(0), createBuffer(0), createBarrier(2, 0), createBarrier(3, 0), - createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(0), createBuffer(0), createBarrier(4, 0), createBarrier(5, 0), createBarrier(6, 0), - createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + createBuffer(0), createEndOfPartition(0) }; ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); buffer = createBarrierBuffer(1, sequence, handler); @@ -180,34 +180,34 @@ public abstract class BarrierBufferTestBase { public void testMultiChannelWithBarriers() throws Exception { BufferOrEvent[] sequence = { // checkpoint with blocked data - createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1, 1), createBarrier(1, 2), - createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(1, 0), // checkpoint without blocked data - createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBuffer(0), createBuffer(0), createBuffer(1), createBuffer(1), createBuffer(2), createBarrier(2, 0), createBarrier(2, 1), createBarrier(2, 2), // checkpoint with data only from one channel - createBuffer(2, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBuffer(2), createBuffer(2), createBarrier(3, 2), - createBuffer(2, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBuffer(2), createBuffer(2), createBarrier(3, 0), createBarrier(3, 1), // empty checkpoint createBarrier(4, 1), createBarrier(4, 2), createBarrier(4, 0), // checkpoint with blocked data in mixed order - createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(5, 1), - createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + createBuffer(2), createBuffer(0), createBuffer(2), createBuffer(1), createBarrier(5, 2), - createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + createBuffer(1), createBuffer(0), createBuffer(2), createBuffer(1), createBarrier(5, 0), // some trailing data - createBuffer(0, PAGE_SIZE), + createBuffer(0), createEndOfPartition(0), createEndOfPartition(1), createEndOfPartition(2) }; ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); @@ -294,14 +294,14 @@ public abstract class BarrierBufferTestBase { @Test public void testMultiChannelTrailingBlockedData() throws Exception { BufferOrEvent[] sequence = { - createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0), - createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(2, 1), - createBuffer(1, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createEndOfPartition(1), createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2), createBarrier(2, 2), - createBuffer(2, PAGE_SIZE), createEndOfPartition(2), createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0) }; ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); buffer = createBarrierBuffer(3, sequence, handler); @@ -345,38 +345,38 @@ public abstract class BarrierBufferTestBase { public void testMultiChannelWithQueuedFutureBarriers() throws Exception{ BufferOrEvent[] sequence = { // checkpoint 1 - with blocked data - createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1, 1), createBarrier(1, 2), - createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(1, 0), - createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(1), createBuffer(0), // checkpoint 2 - where future checkpoint barriers come before // the current checkpoint is complete createBarrier(2, 1), - createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBarrier(2, 0), - createBarrier(3, 0), createBuffer(0, PAGE_SIZE), - createBarrier(3, 1), createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), - createBarrier(4, 1), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBuffer(1), createBuffer(2), createBarrier(2, 0), + createBarrier(3, 0), createBuffer(0), + createBarrier(3, 1), createBuffer(0), createBuffer(1), createBuffer(2), + createBarrier(4, 1), createBuffer(1), createBuffer(2), // complete checkpoint 2, send a barrier for checkpoints 4 and 5 createBarrier(2, 2), - createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0), createBarrier(4, 0), - createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0), createBarrier(5, 1), // complete checkpoint 3 createBarrier(3, 2), - createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(2), createBuffer(1), createBuffer(2), createBuffer(0), createBarrier(6, 1), // complete checkpoint 4, checkpoint 5 remains not fully triggered createBarrier(4, 2), - createBuffer(2, PAGE_SIZE), - createBuffer(1, PAGE_SIZE), createEndOfPartition(1), - createBuffer(2, PAGE_SIZE), createEndOfPartition(2), - createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + createBuffer(2), + createBuffer(1), createEndOfPartition(1), + createBuffer(2), createEndOfPartition(2), + createBuffer(0), createEndOfPartition(0) }; ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); buffer = createBarrierBuffer(3, sequence, handler); @@ -452,23 +452,23 @@ public abstract class BarrierBufferTestBase { public void testMultiChannelSkippingCheckpoints() throws Exception { BufferOrEvent[] sequence = { // checkpoint 1 - with blocked data - createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1, 1), createBarrier(1, 2), - createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(1, 0), - createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(1), createBuffer(0), // checkpoint 2 will not complete: pre-mature barrier from checkpoint 3 createBarrier(2, 1), - createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBuffer(1), createBuffer(2), createBarrier(2, 0), - createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(2), createBuffer(0), createBarrier(3, 2), - createBuffer(2, PAGE_SIZE), - createBuffer(1, PAGE_SIZE), createEndOfPartition(1), - createBuffer(2, PAGE_SIZE), createEndOfPartition(2), - createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + createBuffer(2), + createBuffer(1), createEndOfPartition(1), + createBuffer(2), createEndOfPartition(2), + createBuffer(0), createEndOfPartition(0) }; AbstractInvokable toNotify = mock(AbstractInvokable.class); buffer = createBarrierBuffer(3, sequence, toNotify); @@ -533,27 +533,27 @@ public abstract class BarrierBufferTestBase { public void testMultiChannelJumpingOverCheckpoint() throws Exception { BufferOrEvent[] sequence = { // checkpoint 1 - with blocked data - createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1, 1), createBarrier(1, 2), - createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(1, 0), - createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(1), createBuffer(0), // checkpoint 2 will not complete: pre-mature barrier from checkpoint 3 createBarrier(2, 1), - createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBuffer(1), createBuffer(2), createBarrier(2, 0), - createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(2), createBuffer(0), createBarrier(3, 1), - createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBuffer(1), createBuffer(2), createBarrier(3, 0), - createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(2), createBuffer(0), createBarrier(4, 2), - createBuffer(2, PAGE_SIZE), - createBuffer(1, PAGE_SIZE), createEndOfPartition(1), - createBuffer(2, PAGE_SIZE), createEndOfPartition(2), - createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + createBuffer(2), + createBuffer(1), createEndOfPartition(1), + createBuffer(2), createEndOfPartition(2), + createBuffer(0), createEndOfPartition(0) }; ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); buffer = createBarrierBuffer(3, sequence, handler); @@ -610,39 +610,39 @@ public abstract class BarrierBufferTestBase { public void testMultiChannelSkippingCheckpointsViaBlockedInputs() throws Exception { BufferOrEvent[] sequence = { // checkpoint 1 - with blocked data - createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(0), createBuffer(2), createBuffer(0), createBarrier(1, 1), createBarrier(1, 2), - createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(1, 0), - createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(1), createBuffer(0), // checkpoint 2 will not complete: pre-mature barrier from checkpoint 3 createBarrier(2, 1), - createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBuffer(1), createBuffer(2), createBarrier(2, 0), - createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(1), createBuffer(0), createBarrier(3, 0), // queued barrier on blocked input - createBuffer(0, PAGE_SIZE), + createBuffer(0), createBarrier(4, 1), // pre-mature barrier on blocked input - createBuffer(1, PAGE_SIZE), - createBuffer(0, PAGE_SIZE), - createBuffer(2, PAGE_SIZE), + createBuffer(1), + createBuffer(0), + createBuffer(2), // complete checkpoint 2 createBarrier(2, 2), - createBuffer(0, PAGE_SIZE), + createBuffer(0), createBarrier(3, 2), // should be ignored - createBuffer(2, PAGE_SIZE), + createBuffer(2), createBarrier(4, 0), - createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(4, 2), - createBuffer(1, PAGE_SIZE), createEndOfPartition(1), - createBuffer(2, PAGE_SIZE), createEndOfPartition(2), - createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + createBuffer(1), createEndOfPartition(1), + createBuffer(2), createEndOfPartition(2), + createBuffer(0), createEndOfPartition(0) }; buffer = createBarrierBuffer(3, sequence); @@ -691,14 +691,14 @@ public abstract class BarrierBufferTestBase { @Test public void testEarlyCleanup() throws Exception { BufferOrEvent[] sequence = { - createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBuffer(0), createBuffer(1), createBuffer(2), createBarrier(1, 1), createBarrier(1, 2), createBarrier(1, 0), - createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(2), createBuffer(1), createBuffer(0), createBarrier(2, 1), - createBuffer(1, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createEndOfPartition(1), createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBuffer(1), createBuffer(1), createEndOfPartition(1), createBuffer(0), createBuffer(2), createBarrier(2, 2), - createBuffer(2, PAGE_SIZE), createEndOfPartition(2), createBuffer(0, PAGE_SIZE), createEndOfPartition(0) + createBuffer(2), createEndOfPartition(2), createBuffer(0), createEndOfPartition(0) }; ValidatingCheckpointHandler handler = new ValidatingCheckpointHandler(); buffer = createBarrierBuffer(3, sequence, handler); @@ -738,26 +738,26 @@ public abstract class BarrierBufferTestBase { createEndOfPartition(2), createEndOfPartition(1), // checkpoint without blocked data - createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(3, PAGE_SIZE), + createBuffer(0), createBuffer(0), createBuffer(3), createBarrier(2, 3), createBarrier(2, 0), // checkpoint with blocked data - createBuffer(3, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(3), createBuffer(0), createBarrier(3, 3), - createBuffer(3, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + createBuffer(3), createBuffer(0), createBarrier(3, 0), // empty checkpoint createBarrier(4, 0), createBarrier(4, 3), // some data, one channel closes - createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(3, PAGE_SIZE), + createBuffer(0), createBuffer(0), createBuffer(3), createEndOfPartition(0), // checkpoint on last remaining channel - createBuffer(3, PAGE_SIZE), + createBuffer(3), createBarrier(5, 3), - createBuffer(3, PAGE_SIZE), + createBuffer(3), createEndOfPartition(3) }; buffer = createBarrierBuffer(4, sequence); @@ -799,15 +799,15 @@ public abstract class BarrierBufferTestBase { createBarrier(1, 0), createBarrier(1, 1), createBarrier(1, 2), // some buffers - createBuffer(0, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + createBuffer(0), createBuffer(0), createBuffer(2), // start the checkpoint that will be incomplete createBarrier(2, 2), createBarrier(2, 0), - createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + createBuffer(0), createBuffer(2), createBuffer(1), // close one after the barrier one before the barrier createEndOfPartition(2), createEndOfPartition(1), - createBuffer(0, PAGE_SIZE), + createBuffer(0), // final end of stream createEndOfPartition(0) @@ -836,15 +836,15 @@ public abstract class BarrierBufferTestBase { @Test public void testSingleChannelAbortCheckpoint() throws Exception { BufferOrEvent[] sequence = { - createBuffer(0, PAGE_SIZE), + createBuffer(0), createBarrier(1, 0), - createBuffer(0, PAGE_SIZE), + createBuffer(0), createBarrier(2, 0), createCancellationBarrier(4, 0), createBarrier(5, 0), - createBuffer(0, PAGE_SIZE), + createBuffer(0), createCancellationBarrier(6, 0), - createBuffer(0, PAGE_SIZE) + createBuffer(0) }; AbstractInvokable toNotify = mock(AbstractInvokable.class); buffer = createBarrierBuffer(1, sequence, toNotify); @@ -873,37 +873,37 @@ public abstract class BarrierBufferTestBase { public void testMultiChannelAbortCheckpoint() throws Exception { BufferOrEvent[] sequence = { // some buffers and a successful checkpoint - /* 0 */ createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + /* 0 */ createBuffer(0), createBuffer(2), createBuffer(0), /* 3 */ createBarrier(1, 1), createBarrier(1, 2), - /* 5 */ createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + /* 5 */ createBuffer(2), createBuffer(1), /* 7 */ createBarrier(1, 0), - /* 8 */ createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + /* 8 */ createBuffer(0), createBuffer(2), // aborted on last barrier /* 10 */ createBarrier(2, 0), createBarrier(2, 2), - /* 12 */ createBuffer(0, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + /* 12 */ createBuffer(0), createBuffer(2), /* 14 */ createCancellationBarrier(2, 1), // successful checkpoint - /* 15 */ createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + /* 15 */ createBuffer(2), createBuffer(1), /* 17 */ createBarrier(3, 1), createBarrier(3, 2), createBarrier(3, 0), // abort on first barrier - /* 20 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + /* 20 */ createBuffer(0), createBuffer(1), /* 22 */ createCancellationBarrier(4, 1), createBarrier(4, 2), - /* 24 */ createBuffer(0, PAGE_SIZE), + /* 24 */ createBuffer(0), /* 25 */ createBarrier(4, 0), // another successful checkpoint - /* 26 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + /* 26 */ createBuffer(0), createBuffer(1), createBuffer(2), /* 29 */ createBarrier(5, 2), createBarrier(5, 1), createBarrier(5, 0), - /* 32 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + /* 32 */ createBuffer(0), createBuffer(1), // abort multiple cancellations and a barrier after the cancellations /* 34 */ createCancellationBarrier(6, 1), createCancellationBarrier(6, 2), /* 36 */ createBarrier(6, 0), - /* 37 */ createBuffer(0, PAGE_SIZE) + /* 37 */ createBuffer(0) }; AbstractInvokable toNotify = mock(AbstractInvokable.class); buffer = createBarrierBuffer(3, sequence, toNotify); @@ -968,28 +968,28 @@ public abstract class BarrierBufferTestBase { public void testAbortViaQueuedBarriers() throws Exception { BufferOrEvent[] sequence = { // starting a checkpoint - /* 0 */ createBuffer(1, PAGE_SIZE), + /* 0 */ createBuffer(1), /* 1 */ createBarrier(1, 1), createBarrier(1, 2), - /* 3 */ createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + /* 3 */ createBuffer(2), createBuffer(0), createBuffer(1), // queued barrier and cancellation barrier /* 6 */ createCancellationBarrier(2, 2), /* 7 */ createBarrier(2, 1), // some intermediate buffers (some queued) - /* 8 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + /* 8 */ createBuffer(0), createBuffer(1), createBuffer(2), // complete initial checkpoint /* 11 */ createBarrier(1, 0), // some buffers (none queued, since checkpoint is aborted) - /* 12 */ createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + /* 12 */ createBuffer(2), createBuffer(1), createBuffer(0), // final barrier of aborted checkpoint /* 15 */ createBarrier(2, 0), // some more buffers - /* 16 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE) + /* 16 */ createBuffer(0), createBuffer(1), createBuffer(2) }; AbstractInvokable toNotify = mock(AbstractInvokable.class); buffer = createBarrierBuffer(3, sequence, toNotify); @@ -1045,33 +1045,33 @@ public abstract class BarrierBufferTestBase { public void testAbortWhileHavingQueuedBarriers() throws Exception { BufferOrEvent[] sequence = { // starting a checkpoint - /* 0 */ createBuffer(1, PAGE_SIZE), + /* 0 */ createBuffer(1), /* 1 */ createBarrier(1, 1), - /* 2 */ createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + /* 2 */ createBuffer(2), createBuffer(0), createBuffer(1), // queued barrier and cancellation barrier /* 5 */ createBarrier(2, 1), // some queued buffers - /* 6 */ createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + /* 6 */ createBuffer(2), createBuffer(1), // cancel the initial checkpoint /* 8 */ createCancellationBarrier(1, 0), // some more buffers - /* 9 */ createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + /* 9 */ createBuffer(2), createBuffer(1), createBuffer(0), // ignored barrier - already canceled and moved to next checkpoint /* 12 */ createBarrier(1, 2), // some more buffers - /* 13 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + /* 13 */ createBuffer(0), createBuffer(1), createBuffer(2), // complete next checkpoint regularly /* 16 */ createBarrier(2, 0), createBarrier(2, 2), // some more buffers - /* 18 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE) + /* 18 */ createBuffer(0), createBuffer(1), createBuffer(2) }; AbstractInvokable toNotify = mock(AbstractInvokable.class); buffer = createBarrierBuffer(3, sequence, toNotify); @@ -1126,27 +1126,27 @@ public abstract class BarrierBufferTestBase { public void testIgnoreCancelBarrierIfCheckpointSubsumed() throws Exception { BufferOrEvent[] sequence = { // starting a checkpoint - /* 0 */ createBuffer(2, PAGE_SIZE), + /* 0 */ createBuffer(2), /* 1 */ createBarrier(3, 1), createBarrier(3, 0), - /* 3 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE), + /* 3 */ createBuffer(0), createBuffer(1), createBuffer(2), // newer checkpoint barrier cancels/subsumes pending checkpoint /* 6 */ createBarrier(5, 2), // some queued buffers - /* 7 */ createBuffer(2, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(0, PAGE_SIZE), + /* 7 */ createBuffer(2), createBuffer(1), createBuffer(0), // cancel barrier the initial checkpoint /it is already canceled) /* 10 */ createCancellationBarrier(3, 2), // some more buffers - /* 11 */ createBuffer(2, PAGE_SIZE), createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), + /* 11 */ createBuffer(2), createBuffer(0), createBuffer(1), // complete next checkpoint regularly /* 14 */ createBarrier(5, 0), createBarrier(5, 1), // some more buffers - /* 16 */ createBuffer(0, PAGE_SIZE), createBuffer(1, PAGE_SIZE), createBuffer(2, PAGE_SIZE) + /* 16 */ createBuffer(0), createBuffer(1), createBuffer(2) }; AbstractInvokable toNotify = mock(AbstractInvokable.class); buffer = createBarrierBuffer(3, sequence, toNotify); @@ -1202,12 +1202,12 @@ public abstract class BarrierBufferTestBase { return new BufferOrEvent(new CancelCheckpointMarker(checkpointId), channel); } - private static BufferOrEvent createBuffer(int channel, int pageSize) { + private static BufferOrEvent createBuffer(int channel) { final int size = sizeCounter++; byte[] bytes = new byte[size]; RND.nextBytes(bytes); - MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(pageSize); + MemorySegment memory = MemorySegmentFactory.allocateUnpooledSegment(PAGE_SIZE); memory.put(0, bytes); Buffer buf = new NetworkBuffer(memory, FreeingBufferRecycler.INSTANCE);