This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ff8a94612c2e80189f9ba883e58c8fd599d4bb45
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Thu Jun 13 13:28:13 2019 +0200

    [hotfix][network] Rename BufferBlocker to BufferStorage
---
 .../flink/streaming/runtime/io/BarrierBuffer.java  | 24 +++++++++----------
 .../flink/streaming/runtime/io/BufferSpiller.java  |  6 ++---
 .../io/{BufferBlocker.java => BufferStorage.java}  | 10 ++++----
 ...BufferBlocker.java => CachedBufferStorage.java} | 18 +++++++-------
 .../streaming/runtime/io/InputProcessorUtil.java   |  2 +-
 .../runtime/io/BarrierBufferTestBase.java          |  2 +-
 .../streaming/runtime/io/BufferSpillerTest.java    |  4 ++--
 ...kerTestBase.java => BufferStorageTestBase.java} | 28 +++++++++++-----------
 ...ockerTest.java => CachedBufferStorageTest.java} | 20 ++++++++--------
 .../runtime/io/CreditBasedBarrierBufferTest.java   |  4 ++--
 10 files changed, 60 insertions(+), 58 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 63fa1ac..ad62360 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -64,7 +64,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        private final int totalNumberOfInputChannels;
 
        /** To utility to write blocked data to a file channel. */
-       private final BufferBlocker bufferBlocker;
+       private final BufferStorage bufferStorage;
 
        /**
         * The pending blocked buffer/event sequences. Must be consumed before 
requesting further data
@@ -123,11 +123,11 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         * <p>There is no limit to how much data may be buffered during an 
alignment.
         *
         * @param inputGate The input gate to draw the buffers and events from.
-        * @param bufferBlocker The buffer blocker to hold the buffers and 
events for channels with barrier.
+        * @param bufferStorage The storage to hold the buffers and events for 
blocked channels.
         */
        @VisibleForTesting
-       BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker) {
-               this (inputGate, bufferBlocker, -1, "Testing: No task 
associated");
+       BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage) {
+               this (inputGate, bufferStorage, -1, "Testing: No task 
associated");
        }
 
        /**
@@ -138,11 +138,11 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         * checkpoint has been cancelled.
         *
         * @param inputGate The input gate to draw the buffers and events from.
-        * @param bufferBlocker The buffer blocker to hold the buffers and 
events for channels with barrier.
+        * @param bufferStorage The storage to hold the buffers and events for 
blocked channels.
         * @param maxBufferedBytes The maximum bytes to be buffered before the 
checkpoint aborts.
         * @param taskName The task name for logging.
         */
-       BarrierBuffer(InputGate inputGate, BufferBlocker bufferBlocker, long 
maxBufferedBytes, String taskName) {
+       BarrierBuffer(InputGate inputGate, BufferStorage bufferStorage, long 
maxBufferedBytes, String taskName) {
                checkArgument(maxBufferedBytes == -1 || maxBufferedBytes > 0);
 
                this.inputGate = inputGate;
@@ -150,7 +150,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                this.totalNumberOfInputChannels = 
inputGate.getNumberOfInputChannels();
                this.blockedChannels = new 
boolean[this.totalNumberOfInputChannels];
 
-               this.bufferBlocker = checkNotNull(bufferBlocker);
+               this.bufferStorage = checkNotNull(bufferStorage);
                this.queuedBuffered = new ArrayDeque<BufferOrEventSequence>();
 
                this.taskName = taskName;
@@ -192,7 +192,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        BufferOrEvent bufferOrEvent = next.get();
                        if (isBlocked(bufferOrEvent.getChannelIndex())) {
                                // if the channel is blocked, we just store the 
BufferOrEvent
-                               bufferBlocker.add(bufferOrEvent);
+                               bufferStorage.add(bufferOrEvent);
                                checkSizeLimit();
                        }
                        else if (bufferOrEvent.isBuffer()) {
@@ -436,7 +436,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
        }
 
        private void checkSizeLimit() throws Exception {
-               if (maxBufferedBytes > 0 && (numQueuedBytes + 
bufferBlocker.getBytesBlocked()) > maxBufferedBytes) {
+               if (maxBufferedBytes > 0 && (numQueuedBytes + 
bufferStorage.getBytesBlocked()) > maxBufferedBytes) {
                        // exceeded our limit - abort this checkpoint
                        LOG.info("{}: Checkpoint {} aborted because alignment 
volume limit ({} bytes) exceeded.",
                                taskName,
@@ -473,7 +473,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
 
        @Override
        public void cleanup() throws IOException {
-               bufferBlocker.close();
+               bufferStorage.close();
                if (currentBuffered != null) {
                        currentBuffered.cleanup();
                }
@@ -538,7 +538,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
 
                if (currentBuffered == null) {
                        // common case: no more buffered data
-                       currentBuffered = 
bufferBlocker.rollOverReusingResources();
+                       currentBuffered = 
bufferStorage.rollOverReusingResources();
                        if (currentBuffered != null) {
                                currentBuffered.open();
                        }
@@ -550,7 +550,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                        "Pushing back current alignment buffers 
and feeding back new alignment data first.", taskName);
 
                        // since we did not fully drain the previous sequence, 
we need to allocate a new buffer for this one
-                       BufferOrEventSequence bufferedNow = 
bufferBlocker.rollOverWithoutReusingResources();
+                       BufferOrEventSequence bufferedNow = 
bufferStorage.rollOverWithoutReusingResources();
                        if (bufferedNow != null) {
                                bufferedNow.open();
                                queuedBuffered.addFirst(currentBuffered);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index 5a7c496..59877a0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -40,7 +40,7 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
- * The buffer spiller takes the buffers and events from a data stream and adds 
them to a spill file.
+ * The {@link BufferSpiller} takes the buffers and events from a data stream 
and adds them to a spill file.
  * After a number of elements have been spilled, the spiller can "roll over": 
It presents the spilled
  * elements as a readable sequence, and opens a new spill file.
  *
@@ -53,7 +53,7 @@ import java.util.concurrent.atomic.AtomicInteger;
  */
 @Internal
 @Deprecated
-public class BufferSpiller implements BufferBlocker {
+public class BufferSpiller implements BufferStorage {
 
        /** Size of header in bytes (see add method). */
        static final int HEADER_SIZE = 9;
@@ -92,7 +92,7 @@ public class BufferSpiller implements BufferBlocker {
        private long bytesWritten;
 
        /**
-        * Creates a new buffer spiller, spilling to one of the I/O manager's 
temp directories.
+        * Creates a new {@link BufferSpiller}, spilling to one of the I/O 
manager's temp directories.
         *
         * @param ioManager The I/O manager for access to the temp directories.
         * @param pageSize The page size used to re-create spilled buffers.
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
similarity index 84%
rename from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
rename to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
index 4d0f66f..7d4dff0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferBlocker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferStorage.java
@@ -24,15 +24,15 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import java.io.IOException;
 
 /**
- * The buffer blocker takes the buffers and events from a data stream and adds 
them in a sequence.
- * After a number of elements have been added, the blocker can "roll over": It 
presents the added
- * elements as a readable sequence, and creates a new sequence.
+ * The {@link BufferStorage} takes the buffers and events from a data stream 
and adds them in a sequence.
+ * After a number of elements have been added, the {@link BufferStorage} can 
"roll over":
+ * It presents the added elements as a readable sequence, and creates a new 
sequence.
  */
 @Internal
-public interface BufferBlocker {
+public interface BufferStorage {
 
        /**
-        * Adds a buffer or event to the blocker.
+        * Adds a buffer or event to the {@link BufferStorage}.
         *
         * @param boe The buffer or event to be added into the blocker.
         */
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
similarity index 84%
rename from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java
rename to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
index f91e8cc..e0a79c2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferBlocker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CachedBufferStorage.java
@@ -25,14 +25,16 @@ import javax.annotation.Nullable;
 import java.util.ArrayDeque;
 
 /**
- * The cached buffer blocker takes the buffers and events from a data stream 
and adds them to a memory queue.
- * After a number of elements have been cached, the blocker can "roll over": 
It presents the cached
- * elements as a readable sequence, and creates a new memory queue.
+ * The {@link CachedBufferStorage} takes the buffers and events from a data 
stream and adds them to
+ * a memory queue. After a number of elements have been cached, the {@link 
CachedBufferStorage}
+ * can "roll over":
+ * It presents the cached elements as a readable sequence, and creates a new 
memory queue.
  *
- * <p>This buffer blocked can be used in credit-based flow control for better 
barrier alignment in exactly-once mode.
+ * <p>This {@link CachedBufferStorage} can be used in credit-based flow 
control for better barrier
+ * alignment in exactly-once mode.
  */
 @Internal
-public class CachedBufferBlocker implements BufferBlocker {
+public class CachedBufferStorage implements BufferStorage {
 
        /** The page size, to estimate the total cached data size. */
        private final int pageSize;
@@ -44,11 +46,11 @@ public class CachedBufferBlocker implements BufferBlocker {
        private ArrayDeque<BufferOrEvent> currentBuffers;
 
        /**
-        * Creates a new buffer blocker, caching the buffers or events in 
memory queue.
+        * Creates a new {@link CachedBufferStorage}, caching the buffers or 
events in memory queue.
         *
         * @param pageSize The page size used to estimate the cached size.
         */
-       public CachedBufferBlocker(int pageSize) {
+       public CachedBufferStorage(int pageSize) {
                this.pageSize = pageSize;
                this.currentBuffers = new ArrayDeque<BufferOrEvent>();
        }
@@ -100,7 +102,7 @@ public class CachedBufferBlocker implements BufferBlocker {
 
        /**
         * This class represents a sequence of cached buffers and events, 
created by the
-        * {@link CachedBufferBlocker}.
+        * {@link CachedBufferStorage}.
         */
        public static class CachedBufferOrEventSequence implements 
BufferOrEventSequence {
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index b77c7d0..289dd1a 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -56,7 +56,7 @@ public class InputProcessorUtil {
                        if 
(taskManagerConfig.getBoolean(NettyShuffleEnvironmentOptions.NETWORK_CREDIT_MODEL))
 {
                                barrierHandler = new BarrierBuffer(
                                        inputGate,
-                                       new 
CachedBufferBlocker(inputGate.getPageSize()),
+                                       new 
CachedBufferStorage(inputGate.getPageSize()),
                                        maxAlign,
                                        taskName);
                        } else {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
index c9981b5..4bc05ff 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
@@ -58,7 +58,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.hamcrest.MockitoHamcrest.argThat;
 
 /**
- * Tests for the behavior of the {@link BarrierBuffer} with different {@link 
BufferBlocker} implements.
+ * Tests for the behavior of the {@link BarrierBuffer} with different {@link 
BufferStorage} implements.
  */
 public abstract class BarrierBufferTestBase {
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
index b70ba24..4633154 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferSpillerTest.java
@@ -38,7 +38,7 @@ import static org.junit.Assert.fail;
 /**
  * Tests for {@link BufferSpiller}.
  */
-public class BufferSpillerTest extends BufferBlockerTestBase {
+public class BufferSpillerTest extends BufferStorageTestBase {
 
        private static IOManager ioManager;
 
@@ -76,7 +76,7 @@ public class BufferSpillerTest extends BufferBlockerTestBase {
        }
 
        @Override
-       public BufferBlocker createBufferBlocker() {
+       public BufferStorage createBufferStorage() {
                return spiller;
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
similarity index 93%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
index 4533a65..0485d88 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferBlockerTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BufferStorageTestBase.java
@@ -39,20 +39,20 @@ import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
- * Tests for {@link BufferBlocker}.
+ * Tests for {@link BufferStorage}.
  */
-public abstract class BufferBlockerTestBase {
+public abstract class BufferStorageTestBase {
 
        protected static final int PAGE_SIZE = 4096;
 
-       abstract BufferBlocker createBufferBlocker();
+       abstract BufferStorage createBufferStorage();
 
        @Test
        public void testRollOverEmptySequences() throws IOException {
-               BufferBlocker bufferBlocker = createBufferBlocker();
-               assertNull(bufferBlocker.rollOverReusingResources());
-               assertNull(bufferBlocker.rollOverReusingResources());
-               assertNull(bufferBlocker.rollOverReusingResources());
+               BufferStorage bufferStorage = createBufferStorage();
+               assertNull(bufferStorage.rollOverReusingResources());
+               assertNull(bufferStorage.rollOverReusingResources());
+               assertNull(bufferStorage.rollOverReusingResources());
        }
 
        @Test
@@ -63,7 +63,7 @@ public abstract class BufferBlockerTestBase {
                final int maxNumEventsAndBuffers = 3000;
                final int maxNumChannels = 1656;
 
-               BufferBlocker bufferBlocker = createBufferBlocker();
+               BufferStorage bufferStorage = createBufferStorage();
 
                // do multiple spilling / rolling over rounds
                for (int round = 0; round < 5; round++) {
@@ -86,13 +86,13 @@ public abstract class BufferBlockerTestBase {
                                } else {
                                        evt = 
generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, 
bufferRnd.nextInt(numberOfChannels));
                                }
-                               bufferBlocker.add(evt);
+                               bufferStorage.add(evt);
                        }
 
                        // reset and create reader
                        bufferRnd.setSeed(bufferSeed);
 
-                       BufferOrEventSequence seq = 
bufferBlocker.rollOverReusingResources();
+                       BufferOrEventSequence seq = 
bufferStorage.rollOverReusingResources();
                        seq.open();
 
                        // read and validate the sequence
@@ -136,14 +136,14 @@ public abstract class BufferBlockerTestBase {
                int currentNumEvents = 0;
                int currentNumRecordAndEvents = 0;
 
-               BufferBlocker bufferBlocker = createBufferBlocker();
+               BufferStorage bufferStorage = createBufferStorage();
 
                // do multiple spilling / rolling over rounds
                for (int round = 0; round < 2 * sequences; round++) {
 
                        if (round % 2 == 1) {
                                // make this an empty sequence
-                               
assertNull(bufferBlocker.rollOverReusingResources());
+                               
assertNull(bufferStorage.rollOverReusingResources());
                        } else {
                                // proper spilled sequence
                                final long bufferSeed = rnd.nextLong();
@@ -167,7 +167,7 @@ public abstract class BufferBlockerTestBase {
                                                } else {
                                                        evt = 
generateRandomBuffer(bufferRnd.nextInt(PAGE_SIZE) + 1, 
bufferRnd.nextInt(numberOfChannels));
                                                }
-                                               bufferBlocker.add(evt);
+                                               bufferStorage.add(evt);
                                                generated++;
                                        } else {
                                                // consume a record
@@ -205,7 +205,7 @@ public abstract class BufferBlockerTestBase {
 
                                // done generating a sequence. queue it for 
consumption
                                bufferRnd.setSeed(bufferSeed);
-                               BufferOrEventSequence seq = 
bufferBlocker.rollOverReusingResources();
+                               BufferOrEventSequence seq = 
bufferStorage.rollOverReusingResources();
 
                                SequenceToConsume stc = new 
SequenceToConsume(bufferRnd, events, seq, numEventsAndBuffers, 
numberOfChannels);
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferStorageTest.java
similarity index 73%
rename from 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java
rename to 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferStorageTest.java
index e7bf128..d1e2cf4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferBlockerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CachedBufferStorageTest.java
@@ -24,30 +24,30 @@ import org.junit.Before;
 import java.io.IOException;
 
 /**
- * Tests for {@link CachedBufferBlocker}.
+ * Tests for {@link CachedBufferStorage}.
  */
-public class CachedBufferBlockerTest extends BufferBlockerTestBase {
+public class CachedBufferStorageTest extends BufferStorageTestBase {
 
-       private CachedBufferBlocker bufferBlocker;
+       private CachedBufferStorage bufferStorage;
 
        // 
------------------------------------------------------------------------
        //  Setup / Cleanup
        // 
------------------------------------------------------------------------
 
        @Before
-       public void createBlocker() {
-               bufferBlocker = new CachedBufferBlocker(PAGE_SIZE);
+       public void createStorage() {
+               bufferStorage = new CachedBufferStorage(PAGE_SIZE);
        }
 
        @After
-       public void cleanupBlocker() throws IOException {
-               if (bufferBlocker != null) {
-                       bufferBlocker.close();
+       public void cleanupStorage() throws IOException {
+               if (bufferStorage != null) {
+                       bufferStorage.close();
                }
        }
 
        @Override
-       public BufferBlocker createBufferBlocker() {
-               return  bufferBlocker;
+       public BufferStorage createBufferStorage() {
+               return  bufferStorage;
        }
 }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
index e9c87ed..da88ffb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/CreditBasedBarrierBufferTest.java
@@ -26,13 +26,13 @@ import java.io.IOException;
 import static org.junit.Assert.assertEquals;
 
 /**
- * Tests for the behaviors of the {@link BarrierBuffer} with {@link 
CachedBufferBlocker}.
+ * Tests for the behaviors of the {@link BarrierBuffer} with {@link 
CachedBufferStorage}.
  */
 public class CreditBasedBarrierBufferTest extends BarrierBufferTestBase {
 
        @Override
        public BarrierBuffer createBarrierBuffer(InputGate gate) throws 
IOException {
-               return new BarrierBuffer(gate, new 
CachedBufferBlocker(PAGE_SIZE));
+               return new BarrierBuffer(gate, new 
CachedBufferStorage(PAGE_SIZE));
        }
 
        @Override

Reply via email to