This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ff8a94612c2e80189f9ba883e58c8fd599d4bb45 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Thu Jun 13 13:28:13 2019 +0200 [hotfix][network] Rename BufferBlocker to BufferStorage --- .../flink/streaming/runtime/io/BarrierBuffer.java | 24 +++++++++---------- .../flink/streaming/runtime/io/BufferSpiller.java | 6 ++--- .../io/{BufferBlocker.java => BufferStorage.java} | 10 ++++---- ...BufferBlocker.java => CachedBufferStorage.java} | 18 +++++++------- .../streaming/runtime/io/InputProcessorUtil.java | 2 +- .../runtime/io/BarrierBufferTestBase.java | 2 +- .../streaming/runtime/io/BufferSpillerTest.java | 4 ++-- ...kerTestBase.java => BufferStorageTestBase.java} | 28 +++++++++++----------- ...ockerTest.java => CachedBufferStorageTest.java} | 20 ++++++++-------- .../runtime/io/CreditBasedBarrierBufferTest.java | 4 ++-- 10 files changed, 60 insertions(+), 58 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index 63fa1ac..ad62360 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -64,7 +64,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { private final int totalNumberOfInputChannels; /** To utility to write blocked data to a file channel. */ - private final BufferBlocker bufferBlocker; + private final BufferStorage bufferStorage; /** * The pending blocked buffer/event sequences. Must be consumed before requesting further data @@ -123,11 +123,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * <p>There is no limit to how much data may be buffered during an alignment. * * @param inputGate The input gate to draw the buffers and events from. - * @param bufferBlocker The buffer blocker to hold the buffers and events for channels with barrier. + * @param bufferStorage The storage to hold the buffers and events for blocked channels. */ @VisibleForTesting - BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) { - this (inputGate, bufferBlocker, -1, "Testing: No task associated"); + BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage) { + this (inputGate, bufferStorage, -1, "Testing: No task associated"); } /** @@ -138,11 +138,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * checkpoint has been cancelled. * * @param inputGate The input gate to draw the buffers and events from. - * @param bufferBlocker The buffer blocker to hold the buffers and events for channels with barrier. + * @param bufferStorage The storage to hold the buffers and events for blocked channels. * @param maxBufferedBytes The maximum bytes to be buffered before the checkpoint aborts. * @param taskName The task name for logging. */ - BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long maxBufferedBytes, String taskName) { + BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage, long maxBufferedBytes, String taskName) { checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0); this.inputGate = inputGate; @@ -150,7 +150,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); this.blockedChannels = new boolean[this.totalNumberOfInputChannels]; - this.bufferBlocker = checkNotNull(bufferBlocker); + this.bufferStorage = checkNotNull(bufferStorage); this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>(); this.taskName = taskName; @@ -192,7 +192,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { BufferOrEvent bufferOrEvent = next.get(); if (isBlocked(bufferOrEvent.getChannelIndex())) { // if the channel is blocked, we just store the BufferOrEvent - bufferBlocker.add(bufferOrEvent); + bufferStorage.add(bufferOrEvent); checkSizeLimit(); } else if (bufferOrEvent.isBuffer()) { @@ -436,7 +436,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } private void checkSizeLimit() throws Exception { - if (maxBufferedBytes > 0 && (numQueuedBytes + bufferBlocker.getBytesBlocked()) > maxBufferedBytes) { + if (maxBufferedBytes > 0 && (numQueuedBytes + bufferStorage.getBytesBlocked()) > maxBufferedBytes) { // exceeded our limit - abort this checkpoint LOG.info("{}: Checkpoint {} aborted because alignment volume limit ({} bytes) exceeded.", taskName, @@ -473,7 +473,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { @Override public void cleanup() throws IOException { - bufferBlocker.close(); + bufferStorage.close(); if (currentBuffered != null) { currentBuffered.cleanup(); } @@ -538,7 +538,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { if (currentBuffered == null) { // common case: no more buffered data - currentBuffered = bufferBlocker.rollOverReusingResources(); + currentBuffered = bufferStorage.rollOverReusingResources(); if (currentBuffered != null) { currentBuffered.open(); } @@ -550,7 +550,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { "Pushing back current alignment buffers and feeding back new alignment data first.", taskName); // since we did not fully drain the previous sequence, we need to allocate a new buffer for this one - BufferOrEventSequence bufferedNow = bufferBlocker.rollOverWithoutReusingResources(); + BufferOrEventSequence bufferedNow = bufferStorage.rollOverWithoutReusingResources(); if (bufferedNow != null) { bufferedNow.open(); queuedBuffered.addFirst(currentBuffered); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java index 5a7c496..59877a0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java @@ -40,7 +40,7 @@ import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; /** - * The buffer spiller takes the buffers and events from a data stream and adds them to a spill file. + * The {@link BufferSpiller} takes the buffers and events from a data stream and adds them to a spill file. * After a number of elements have been spilled, the spiller can "roll over": It presents the spilled * elements as a readable sequence, and opens a new spill file. * @@ -53,7 +53,7 @@ import java.util.concurrent.atomic.AtomicInteger; */ @Internal @Deprecated -public class BufferSpiller implements BufferBlocker { +public class BufferSpiller implements BufferStorage { /** Size of header in bytes (see add method). */ static final int HEADER_SIZE = 9; @@ -92,7 +92,7 @@ public class BufferSpiller implements BufferBlocker { private long bytesWritten; /** - * Creates a new buffer spiller, spilling to one of the I/O manager's temp directories. + * Creates a new {@link BufferSpiller}, spilling to one of the I/O manager's temp directories. * * @param ioManager The I/O manager for access to the temp directories. * @param pageSize The page size used to re-create spilled buffers. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java similarity index 84% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java index 4d0f66f..7d4dff0 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java @@ -24,15 +24,15 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import java.io.IOException; /** - * The buffer blocker takes the buffers and events from a data stream and adds them in a sequence. - * After a number of elements have been added, the blocker can "roll over": It presents the added - * elements as a readable sequence, and creates a new sequence. + * The {@link BufferStorage} takes the buffers and events from a data stream and adds them in a sequence. + * After a number of elements have been added, the {@link BufferStorage} can "roll over": + * It presents the added elements as a readable sequence, and creates a new sequence. */ @Internal -public interface BufferBlocker { +public interface BufferStorage { /** - * Adds a buffer or event to the blocker. + * Adds a buffer or event to the {@link BufferStorage}. * * @param boe The buffer or event to be added into the blocker. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java similarity index 84% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java index f91e8cc..e0a79c2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java @@ -25,14 +25,16 @@ import javax.annotation.Nullable; import java.util.ArrayDeque; /** - * The cached buffer blocker takes the buffers and events from a data stream and adds them to a memory queue. - * After a number of elements have been cached, the blocker can "roll over": It presents the cached - * elements as a readable sequence, and creates a new memory queue. + * The {@link CachedBufferStorage} takes the buffers and events from a data stream and adds them to + * a memory queue. After a number of elements have been cached, the {@link CachedBufferStorage} + * can "roll over": + * It presents the cached elements as a readable sequence, and creates a new memory queue. * - * <p>This buffer blocked can be used in credit-based flow control for better barrier alignment in exactly-once mode. + * <p>This {@link CachedBufferStorage} can be used in credit-based flow control for better barrier + * alignment in exactly-once mode. */ @Internal -public class CachedBufferBlocker implements BufferBlocker { +public class CachedBufferStorage implements BufferStorage { /** The page size, to estimate the total cached data size. */ private final int pageSize; @@ -44,11 +46,11 @@ public class CachedBufferBlocker implements BufferBlocker { private ArrayDeque<BufferOrEvent> currentBuffers; /** - * Creates a new buffer blocker, caching the buffers or events in memory queue. + * Creates a new {@link CachedBufferStorage}, caching the buffers or events in memory queue. * * @param pageSize The page size used to estimate the cached size. */ - public CachedBufferBlocker(int pageSize) { + public CachedBufferStorage(int pageSize) { this.pageSize = pageSize; this.currentBuffers = new ArrayDeque<BufferOrEvent>(); } @@ -100,7 +102,7 @@ public class CachedBufferBlocker implements BufferBlocker { /** * This class represents a sequence of cached buffers and events, created by the - * {@link CachedBufferBlocker}. + * {@link CachedBufferStorage}. */ public static class CachedBufferOrEventSequence implements BufferOrEventSequence { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java index b77c7d0..289dd1a 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java @@ -56,7 +56,7 @@ public class InputProcessorUtil { if (taskManagerConfig.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_CREDIT_MODEL)) { barrierHandler = new BarrierBuffer( inputGate, - new CachedBufferBlocker(inputGate.getPageSize()), + new CachedBufferStorage(inputGate.getPageSize()), maxAlign, taskName); } else { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java index c9981b5..4bc05ff 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java @@ -58,7 +58,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.hamcrest.MockitoHamcrest.argThat; /** - * Tests for the behavior of the {@link BarrierBuffer} with different {@link BufferBlocker} implements. + * Tests for the behavior of the {@link BarrierBuffer} with different {@link BufferStorage} implements. */ public abstract class BarrierBufferTestBase { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java index b70ba24..4633154 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java @@ -38,7 +38,7 @@ import static org.junit.Assert.fail; /** * Tests for {@link BufferSpiller}. */ -public class BufferSpillerTest extends BufferBlockerTestBase { +public class BufferSpillerTest extends BufferStorageTestBase { private static IOManager ioManager; @@ -76,7 +76,7 @@ public class BufferSpillerTest extends BufferBlockerTestBase { } @Override - public BufferBlocker createBufferBlocker() { + public BufferStorage createBufferStorage() { return spiller; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java similarity index 93% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java index 4533a65..0485d88 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java @@ -39,20 +39,20 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** - * Tests for {@link BufferBlocker}. + * Tests for {@link BufferStorage}. */ -public abstract class BufferBlockerTestBase { +public abstract class BufferStorageTestBase { protected static final int PAGE_SIZE = 4096; - abstract BufferBlocker createBufferBlocker(); + abstract BufferStorage createBufferStorage(); @Test public void testRollOverEmptySequences() throws IOException { - BufferBlocker bufferBlocker = createBufferBlocker(); - assertNull(bufferBlocker.rollOverReusingResources()); - assertNull(bufferBlocker.rollOverReusingResources()); - assertNull(bufferBlocker.rollOverReusingResources()); + BufferStorage bufferStorage = createBufferStorage(); + assertNull(bufferStorage.rollOverReusingResources()); + assertNull(bufferStorage.rollOverReusingResources()); + assertNull(bufferStorage.rollOverReusingResources()); } @Test @@ -63,7 +63,7 @@ public abstract class BufferBlockerTestBase { final int maxNumEventsAndBuffers = 3000; final int maxNumChannels = 1656; - BufferBlocker bufferBlocker = createBufferBlocker(); + BufferStorage bufferStorage = createBufferStorage(); // do multiple spilling / rolling over rounds for (int round = 0; round < 5; round++) { @@ -86,13 +86,13 @@ public abstract class BufferBlockerTestBase { } else { evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numberOfChannels)); } - bufferBlocker.add(evt); + bufferStorage.add(evt); } // reset and create reader bufferRnd.setSeed(bufferSeed); - BufferOrEventSequence seq = bufferBlocker.rollOverReusingResources(); + BufferOrEventSequence seq = bufferStorage.rollOverReusingResources(); seq.open(); // read and validate the sequence @@ -136,14 +136,14 @@ public abstract class BufferBlockerTestBase { int currentNumEvents = 0; int currentNumRecordAndEvents = 0; - BufferBlocker bufferBlocker = createBufferBlocker(); + BufferStorage bufferStorage = createBufferStorage(); // do multiple spilling / rolling over rounds for (int round = 0; round < 2 * sequences; round++) { if (round % 2 == 1) { // make this an empty sequence - assertNull(bufferBlocker.rollOverReusingResources()); + assertNull(bufferStorage.rollOverReusingResources()); } else { // proper spilled sequence final long bufferSeed = rnd.nextLong(); @@ -167,7 +167,7 @@ public abstract class BufferBlockerTestBase { } else { evt = generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, bufferRnd.nextInt(numberOfChannels)); } - bufferBlocker.add(evt); + bufferStorage.add(evt); generated++; } else { // consume a record @@ -205,7 +205,7 @@ public abstract class BufferBlockerTestBase { // done generating a sequence. queue it for consumption bufferRnd.setSeed(bufferSeed); - BufferOrEventSequence seq = bufferBlocker.rollOverReusingResources(); + BufferOrEventSequence seq = bufferStorage.rollOverReusingResources(); SequenceToConsume stc = new SequenceToConsume(bufferRnd, events, seq, numEventsAndBuffers, numberOfChannels); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferStorageTest.java similarity index 73% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferStorageTest.java index e7bf128..d1e2cf4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferStorageTest.java @@ -24,30 +24,30 @@ import org.junit.Before; import java.io.IOException; /** - * Tests for {@link CachedBufferBlocker}. + * Tests for {@link CachedBufferStorage}. */ -public class CachedBufferBlockerTest extends BufferBlockerTestBase { +public class CachedBufferStorageTest extends BufferStorageTestBase { - private CachedBufferBlocker bufferBlocker; + private CachedBufferStorage bufferStorage; // ------------------------------------------------------------------------ // Setup / Cleanup // ------------------------------------------------------------------------ @Before - public void createBlocker() { - bufferBlocker = new CachedBufferBlocker(PAGE_SIZE); + public void createStorage() { + bufferStorage = new CachedBufferStorage(PAGE_SIZE); } @After - public void cleanupBlocker() throws IOException { - if (bufferBlocker != null) { - bufferBlocker.close(); + public void cleanupStorage() throws IOException { + if (bufferStorage != null) { + bufferStorage.close(); } } @Override - public BufferBlocker createBufferBlocker() { - return bufferBlocker; + public BufferStorage createBufferStorage() { + return bufferStorage; } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java index e9c87ed..da88ffb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java @@ -26,13 +26,13 @@ import java.io.IOException; import static org.junit.Assert.assertEquals; /** - * Tests for the behaviors of the {@link BarrierBuffer} with {@link CachedBufferBlocker}. + * Tests for the behaviors of the {@link BarrierBuffer} with {@link CachedBufferStorage}. */ public class CreditBasedBarrierBufferTest extends BarrierBufferTestBase { @Override public BarrierBuffer createBarrierBuffer(InputGate gate) throws IOException { - return new BarrierBuffer(gate, new CachedBufferBlocker(PAGE_SIZE)); + return new BarrierBuffer(gate, new CachedBufferStorage(PAGE_SIZE)); } @Override