This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 52525884826cd0e99b2309d89d74fa270d51ec90
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Sat Jun 15 10:18:01 2019 +0200

    [FLIKN-12777][network] Refactor BarrierTracker to use the same code 
structure as BarrierBuffer
---
 .../flink/streaming/runtime/io/BarrierBuffer.java  |  52 ++++---
 .../streaming/runtime/io/BarrierDiscarder.java     | 105 --------------
 .../runtime/io/BufferOrEventSequence.java          |   2 +-
 .../runtime/io/CheckpointBarrierAligner.java       |  65 ++-------
 .../runtime/io/CheckpointBarrierDiscarder.java     |  74 ++++++++++
 .../runtime/io/CheckpointBarrierHandler.java       |  85 ++++++++---
 ...rTracker.java => CheckpointBarrierTracker.java} | 158 +++++++--------------
 ...rierHandler.java => CheckpointedInputGate.java} |   7 +-
 .../streaming/runtime/io/EmptyBufferStorage.java   |  73 ++++++++++
 .../streaming/runtime/io/InputProcessorUtil.java   |  11 +-
 .../streaming/runtime/io/StreamInputProcessor.java |   2 +-
 .../runtime/io/StreamTaskNetworkInput.java         |   6 +-
 .../runtime/io/StreamTwoInputProcessor.java        |   2 +-
 .../io/StreamTwoInputSelectableProcessor.java      |   5 +-
 .../runtime/io/BarrierBufferTestBase.java          |  32 ++---
 .../streaming/runtime/io/BarrierTrackerTest.java   |  11 +-
 16 files changed, 349 insertions(+), 341 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 1594389..8dcc005 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -38,7 +38,7 @@ import java.util.concurrent.CompletableFuture;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
- * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs 
with barriers until
+ * The barrier buffer is {@link CheckpointedInputGate} that blocks inputs with 
barriers until
  * all inputs have received the barrier for a given checkpoint.
  *
  * <p>To avoid back-pressuring the input streams (which may cause distributed 
deadlocks), the
@@ -46,11 +46,11 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * the blocks are released.
  */
 @Internal
-public class BarrierBuffer implements CheckpointBarrierHandler {
+public class BarrierBuffer implements CheckpointedInputGate {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(BarrierBuffer.class);
 
-       private final CheckpointBarrierAligner barrierAligner;
+       private final CheckpointBarrierHandler barrierHandler;
 
        /** The gate that the buffer draws its input from. */
        private final InputGate inputGate;
@@ -77,6 +77,21 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                this (inputGate, bufferStorage, "Testing: No task associated", 
null);
        }
 
+       BarrierBuffer(
+                       InputGate inputGate,
+                       BufferStorage bufferStorage,
+                       String taskName,
+                       @Nullable AbstractInvokable toNotifyOnCheckpoint) {
+               this(
+                       inputGate,
+                       bufferStorage,
+                       new CheckpointBarrierAligner(
+                               inputGate.getNumberOfInputChannels(),
+                               taskName,
+                               toNotifyOnCheckpoint)
+               );
+       }
+
        /**
         * Creates a new checkpoint stream aligner.
         *
@@ -86,20 +101,15 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         *
         * @param inputGate The input gate to draw the buffers and events from.
         * @param bufferStorage The storage to hold the buffers and events for 
blocked channels.
-        * @param taskName The task name for logging.
-        * @param toNotifyOnCheckpoint optional Handler that receives the 
checkpoint notifications.
+        * @param barrierHandler Handler that controls which channels are 
blocked.
         */
        BarrierBuffer(
                        InputGate inputGate,
                        BufferStorage bufferStorage,
-                       String taskName,
-                       @Nullable AbstractInvokable toNotifyOnCheckpoint) {
+                       CheckpointBarrierHandler barrierHandler) {
                this.inputGate = inputGate;
                this.bufferStorage = checkNotNull(bufferStorage);
-               this.barrierAligner = new CheckpointBarrierAligner(
-                       inputGate.getNumberOfInputChannels(),
-                       taskName,
-                       toNotifyOnCheckpoint);
+               this.barrierHandler = barrierHandler;
        }
 
        @Override
@@ -131,11 +141,11 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                        }
 
                        BufferOrEvent bufferOrEvent = next.get();
-                       if 
(barrierAligner.isBlocked(bufferOrEvent.getChannelIndex())) {
+                       if 
(barrierHandler.isBlocked(bufferOrEvent.getChannelIndex())) {
                                // if the channel is blocked, we just store the 
BufferOrEvent
                                bufferStorage.add(bufferOrEvent);
                                if (bufferStorage.isFull()) {
-                                       
barrierAligner.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
+                                       
barrierHandler.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes());
                                        bufferStorage.rollOver();
                                }
                        }
@@ -146,19 +156,19 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                                CheckpointBarrier checkpointBarrier = 
(CheckpointBarrier) bufferOrEvent.getEvent();
                                if (!endOfInputGate) {
                                        // process barriers only if there is a 
chance of the checkpoint completing
-                                       if 
(barrierAligner.processBarrier(checkpointBarrier, 
bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) {
+                                       if 
(barrierHandler.processBarrier(checkpointBarrier, 
bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) {
                                                bufferStorage.rollOver();
                                        }
                                }
                        }
                        else if (bufferOrEvent.getEvent().getClass() == 
CancelCheckpointMarker.class) {
-                               if 
(barrierAligner.processCancellationBarrier((CancelCheckpointMarker) 
bufferOrEvent.getEvent())) {
+                               if 
(barrierHandler.processCancellationBarrier((CancelCheckpointMarker) 
bufferOrEvent.getEvent())) {
                                        bufferStorage.rollOver();
                                }
                        }
                        else {
                                if (bufferOrEvent.getEvent().getClass() == 
EndOfPartitionEvent.class) {
-                                       if 
(barrierAligner.processEndOfPartition()) {
+                                       if 
(barrierHandler.processEndOfPartition()) {
                                                bufferStorage.rollOver();
                                        }
                                }
@@ -178,7 +188,7 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
                } else {
                        // end of input stream. stream continues with the 
buffered data
                        endOfInputGate = true;
-                       barrierAligner.releaseBlocksAndResetBarriers();
+                       barrierHandler.releaseBlocksAndResetBarriers();
                        bufferStorage.rollOver();
                        return pollNext();
                }
@@ -208,13 +218,13 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
         *
         * @return The ID of the pending of completed checkpoint.
         */
-       public long getCurrentCheckpointId() {
-               return barrierAligner.getCurrentCheckpointId();
+       public long getLatestCheckpointId() {
+               return barrierHandler.getLatestCheckpointId();
        }
 
        @Override
        public long getAlignmentDurationNanos() {
-               return barrierAligner.getAlignmentDurationNanos();
+               return barrierHandler.getAlignmentDurationNanos();
        }
 
        @Override
@@ -228,6 +238,6 @@ public class BarrierBuffer implements 
CheckpointBarrierHandler {
 
        @Override
        public String toString() {
-               return barrierAligner.toString();
+               return barrierHandler.toString();
        }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
deleted file mode 100644
index c33c940..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * The BarrierDiscarder discards checkpoint barriers have been received from 
which input channels.
- */
-@Internal
-public class BarrierDiscarder implements CheckpointBarrierHandler {
-
-       // 
------------------------------------------------------------------------
-
-       /** The input gate, to draw the buffers and events from. */
-       private final InputGate inputGate;
-
-       /**
-        * The number of channels. Once that many barriers have been received 
for a checkpoint, the
-        * checkpoint is considered complete.
-        */
-       private final int totalNumberOfInputChannels;
-
-       // 
------------------------------------------------------------------------
-
-       public BarrierDiscarder(InputGate inputGate) {
-               this.inputGate = inputGate;
-               this.totalNumberOfInputChannels = 
inputGate.getNumberOfInputChannels();
-       }
-
-       @Override
-       public CompletableFuture<?> isAvailable() {
-               return inputGate.isAvailable();
-       }
-
-       @Override
-       public boolean isFinished() {
-               return inputGate.isFinished();
-       }
-
-       @Override
-       public Optional<BufferOrEvent> pollNext() throws Exception {
-               while (true) {
-                       Optional<BufferOrEvent> next = inputGate.pollNext();
-                       if (!next.isPresent()) {
-                               // buffer or input exhausted
-                               return next;
-                       }
-
-                       BufferOrEvent bufferOrEvent = next.get();
-                       if (bufferOrEvent.isBuffer()) {
-                               return next;
-                       }
-                       else if (bufferOrEvent.getEvent().getClass() != 
CheckpointBarrier.class &&
-                               bufferOrEvent.getEvent().getClass() != 
CancelCheckpointMarker.class) {
-                               // some other event
-                               return next;
-                       }
-               }
-       }
-
-       @Override
-       public void cleanup() {
-
-       }
-
-       @Override
-       public boolean isEmpty() {
-               return true;
-       }
-
-       @Override
-       public long getAlignmentDurationNanos() {
-               // this one does not do alignment at all
-               return 0L;
-       }
-
-       @Override
-       public int getNumberOfInputChannels() {
-               return totalNumberOfInputChannels;
-       }
-}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java
index c5bde1b..21649bf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java
@@ -27,7 +27,7 @@ import java.io.IOException;
 
 /**
  * This class represents a sequence of buffers and events which are blocked by
- * {@link CheckpointBarrierHandler}. The sequence of buffers and events can be
+ * {@link CheckpointedInputGate}. The sequence of buffers and events can be
  * read back using the method {@link #getNext()}.
  */
 @Internal
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
index 30e05c1..482ba65 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java
@@ -21,8 +21,6 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -40,7 +38,7 @@ import java.io.IOException;
  * release blocked channels.
  */
 @Internal
-public class CheckpointBarrierAligner {
+public class CheckpointBarrierAligner extends CheckpointBarrierHandler {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointBarrierAligner.class);
 
@@ -52,9 +50,6 @@ public class CheckpointBarrierAligner {
 
        private final String taskName;
 
-       @Nullable
-       private final AbstractInvokable toNotifyOnCheckpoint;
-
        /** The ID of the checkpoint for which we expect barriers. */
        private long currentCheckpointId = -1L;
 
@@ -77,13 +72,14 @@ public class CheckpointBarrierAligner {
                        int totalNumberOfInputChannels,
                        String taskName,
                        @Nullable AbstractInvokable toNotifyOnCheckpoint) {
+               super(toNotifyOnCheckpoint);
                this.totalNumberOfInputChannels = totalNumberOfInputChannels;
                this.taskName = taskName;
-               this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
 
                this.blockedChannels = new boolean[totalNumberOfInputChannels];
        }
 
+       @Override
        public void releaseBlocksAndResetBarriers() throws IOException {
                LOG.debug("{}: End of stream alignment, feeding buffered data 
back.", taskName);
 
@@ -100,19 +96,12 @@ public class CheckpointBarrierAligner {
                }
        }
 
-       /**
-        * Checks whether the channel with the given index is blocked.
-        *
-        * @param channelIndex The channel index to check.
-        * @return True if the channel is blocked, false if not.
-        */
+       @Override
        public boolean isBlocked(int channelIndex) {
                return blockedChannels[channelIndex];
        }
 
-       /**
-        * @return true if some blocked data should be unblocked/rolled over.
-        */
+       @Override
        public boolean processBarrier(CheckpointBarrier receivedBarrier, int 
channelIndex, long bufferedBytes) throws Exception {
                final long barrierId = receivedBarrier.getId();
 
@@ -121,7 +110,7 @@ public class CheckpointBarrierAligner {
                        if (barrierId > currentCheckpointId) {
                                // new checkpoint
                                currentCheckpointId = barrierId;
-                               notifyCheckpoint(receivedBarrier, 
bufferedBytes);
+                               notifyCheckpoint(receivedBarrier, 
bufferedBytes, latestAlignmentDurationNanos);
                        }
                        return false;
                }
@@ -185,7 +174,7 @@ public class CheckpointBarrierAligner {
                        }
 
                        releaseBlocksAndResetBarriers();
-                       notifyCheckpoint(receivedBarrier, bufferedBytes);
+                       notifyCheckpoint(receivedBarrier, bufferedBytes, 
latestAlignmentDurationNanos);
                        return true;
                }
                return checkpointAborted;
@@ -222,9 +211,7 @@ public class CheckpointBarrierAligner {
                }
        }
 
-       /**
-        * @return true if some blocked data should be unblocked/rolled over.
-        */
+       @Override
        public boolean processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
                final long barrierId = cancelBarrier.getCheckpointId();
 
@@ -300,9 +287,7 @@ public class CheckpointBarrierAligner {
                return false;
        }
 
-       /**
-        * @return true if some blocked data should be unblocked/rolled over.
-        */
+       @Override
        public boolean processEndOfPartition() throws Exception {
                numClosedChannels++;
 
@@ -317,37 +302,12 @@ public class CheckpointBarrierAligner {
                return false;
        }
 
-       private void notifyCheckpoint(CheckpointBarrier checkpointBarrier, long 
bufferedBytes) throws Exception {
-               if (toNotifyOnCheckpoint != null) {
-                       CheckpointMetaData checkpointMetaData =
-                               new 
CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
-
-                       CheckpointMetrics checkpointMetrics = new 
CheckpointMetrics()
-                               .setBytesBufferedInAlignment(bufferedBytes)
-                               
.setAlignmentDurationNanos(latestAlignmentDurationNanos);
-
-                       toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
-                               checkpointMetaData,
-                               checkpointBarrier.getCheckpointOptions(),
-                               checkpointMetrics);
-               }
-       }
-
-       private void notifyAbortOnCancellationBarrier(long checkpointId) throws 
Exception {
-               notifyAbort(checkpointId,
-                       new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
-       }
-
-       private void notifyAbort(long checkpointId, CheckpointException cause) 
throws Exception {
-               if (toNotifyOnCheckpoint != null) {
-                       
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
-               }
-       }
-
-       public long getCurrentCheckpointId() {
+       @Override
+       public long getLatestCheckpointId() {
                return currentCheckpointId;
        }
 
+       @Override
        public long getAlignmentDurationNanos() {
                if (startOfAlignmentTimestamp <= 0) {
                        return latestAlignmentDurationNanos;
@@ -365,6 +325,7 @@ public class CheckpointBarrierAligner {
                        numClosedChannels);
        }
 
+       @Override
        public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws 
Exception {
                releaseBlocksAndResetBarriers();
                notifyAbort(currentCheckpointId,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java
new file mode 100644
index 0000000..4c6cdab
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+
+import java.io.IOException;
+
+/**
+ * The {@link CheckpointBarrierDiscarder} discards checkpoint barriers have 
been received from which input channels.
+ */
+@Internal
+public class CheckpointBarrierDiscarder extends CheckpointBarrierHandler {
+       public CheckpointBarrierDiscarder() {
+               super(null);
+       }
+
+       @Override
+       public void releaseBlocksAndResetBarriers() throws IOException {
+       }
+
+       @Override
+       public boolean isBlocked(int channelIndex) {
+               return false;
+       }
+
+       @Override
+       public boolean processBarrier(CheckpointBarrier receivedBarrier, int 
channelIndex, long bufferedBytes) throws Exception {
+               return false;
+       }
+
+       @Override
+       public boolean processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
+               return false;
+       }
+
+       @Override
+       public boolean processEndOfPartition() throws Exception {
+               return false;
+       }
+
+       @Override
+       public long getLatestCheckpointId() {
+               return 0;
+       }
+
+       @Override
+       public long getAlignmentDurationNanos() {
+               return 0;
+       }
+
+       @Override
+       public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws 
Exception {
+
+       }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 2ee1a97..41e043e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -18,43 +18,88 @@
 
 package org.apache.flink.streaming.runtime.io;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.io.AsyncDataInput;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 
 /**
- * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the 
input channels.
+ * The {@link CheckpointBarrierHandler} reacts to checkpoint barrier arriving 
from the input channels.
  * Different implementations may either simply track barriers, or block 
certain inputs on
  * barriers.
  */
-@Internal
-public interface CheckpointBarrierHandler extends 
AsyncDataInput<BufferOrEvent> {
+public abstract class CheckpointBarrierHandler {
+
+       /** The listener to be notified on complete checkpoints. */
+       @Nullable
+       private final AbstractInvokable toNotifyOnCheckpoint;
+
+       public CheckpointBarrierHandler(@Nullable AbstractInvokable 
toNotifyOnCheckpoint) {
+               this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
+       }
+
+       public abstract void releaseBlocksAndResetBarriers() throws IOException;
+
        /**
-        * Cleans up all internally held resources.
+        * Checks whether the channel with the given index is blocked.
         *
-        * @throws IOException Thrown if the cleanup of I/O resources failed.
+        * @param channelIndex The channel index to check.
+        * @return True if the channel is blocked, false if not.
         */
-       void cleanup() throws IOException;
+       public abstract boolean isBlocked(int channelIndex);
 
        /**
-        * Checks if the barrier handler has buffered any data internally.
-        * @return {@code True}, if no data is buffered internally, {@code 
false} otherwise.
+        * @return true if some blocked data should be unblocked/rolled over.
         */
-       boolean isEmpty();
+       public abstract boolean processBarrier(CheckpointBarrier 
receivedBarrier, int channelIndex, long bufferedBytes) throws Exception;
 
        /**
-        * Gets the time that the latest alignment took, in nanoseconds.
-        * If there is currently an alignment in progress, it will return the 
time spent in the
-        * current alignment so far.
-        *
-        * @return The duration in nanoseconds
+        * @return true if some blocked data should be unblocked/rolled over.
         */
-       long getAlignmentDurationNanos();
+       public abstract boolean 
processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws 
Exception;
 
        /**
-        * @return number of underlying input channels.
+        * @return true if some blocked data should be unblocked/rolled over.
         */
-       int getNumberOfInputChannels();
+       public abstract boolean processEndOfPartition() throws Exception;
+
+       public abstract long getLatestCheckpointId();
+
+       public abstract long getAlignmentDurationNanos();
+
+       public abstract void checkpointSizeLimitExceeded(long maxBufferedBytes) 
throws Exception;
+
+       protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier, 
long bufferedBytes, long alignmentDurationNanos) throws Exception {
+               if (toNotifyOnCheckpoint != null) {
+                       CheckpointMetaData checkpointMetaData =
+                               new 
CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());
+
+                       CheckpointMetrics checkpointMetrics = new 
CheckpointMetrics()
+                               .setBytesBufferedInAlignment(bufferedBytes)
+                               
.setAlignmentDurationNanos(alignmentDurationNanos);
+
+                       toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
+                               checkpointMetaData,
+                               checkpointBarrier.getCheckpointOptions(),
+                               checkpointMetrics);
+               }
+       }
+
+       protected void notifyAbortOnCancellationBarrier(long checkpointId) 
throws Exception {
+               notifyAbort(checkpointId,
+                       new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
+       }
+
+       protected void notifyAbort(long checkpointId, CheckpointException 
cause) throws Exception {
+               if (toNotifyOnCheckpoint != null) {
+                       
toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause);
+               }
+       }
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java
similarity index 61%
rename from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
rename to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java
index f7629bb..0ec9004 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java
@@ -21,13 +21,8 @@ package org.apache.flink.streaming.runtime.io;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
 import org.slf4j.Logger;
@@ -36,24 +31,22 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.ArrayDeque;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
 
 /**
- * The BarrierTracker keeps track of what checkpoint barriers have been 
received from
+ * The {@link CheckpointBarrierTracker} keeps track of what checkpoint 
barriers have been received from
  * which input channels. Once it has observed all checkpoint barriers for a 
checkpoint ID,
  * it notifies its listener of a completed checkpoint.
  *
- * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the 
input
+ * <p>Unlike the {@link CheckpointBarrierAligner}, the BarrierTracker does not 
block the input
  * channels that have sent barriers, so it cannot be used to gain 
"exactly-once" processing
  * guarantees. It can, however, be used to gain "at least once" processing 
guarantees.
  *
  * <p>NOTE: This implementation strictly assumes that newer checkpoints have 
higher checkpoint IDs.
  */
 @Internal
-public class BarrierTracker implements CheckpointBarrierHandler {
+public class CheckpointBarrierTracker extends CheckpointBarrierHandler {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(BarrierTracker.class);
+       private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointBarrierTracker.class);
 
        /**
         * The tracker tracks a maximum number of checkpoints, for which some, 
but not all barriers
@@ -63,9 +56,6 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
 
        // 
------------------------------------------------------------------------
 
-       /** The input gate, to draw the buffers and events from. */
-       private final InputGate inputGate;
-
        /**
         * The number of channels. Once that many barriers have been received 
for a checkpoint, the
         * checkpoint is considered complete.
@@ -78,89 +68,36 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
         */
        private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints;
 
-       /** The listener to be notified on complete checkpoints. */
-       private final AbstractInvokable toNotifyOnCheckpoint;
-
        /** The highest checkpoint ID encountered so far. */
        private long latestPendingCheckpointID = -1;
 
-       // 
------------------------------------------------------------------------
-
-       public BarrierTracker(InputGate inputGate) {
-               this(inputGate, null);
+       public CheckpointBarrierTracker(int totalNumberOfInputChannels) {
+               this(totalNumberOfInputChannels, null);
        }
 
-       public BarrierTracker(InputGate inputGate, @Nullable AbstractInvokable 
toNotifyOnCheckpoint) {
-               this.inputGate = inputGate;
-               this.totalNumberOfInputChannels = 
inputGate.getNumberOfInputChannels();
+       public CheckpointBarrierTracker(int totalNumberOfInputChannels, 
@Nullable AbstractInvokable toNotifyOnCheckpoint) {
+               super(toNotifyOnCheckpoint);
+               this.totalNumberOfInputChannels = totalNumberOfInputChannels;
                this.pendingCheckpoints = new ArrayDeque<>();
-               this.toNotifyOnCheckpoint = toNotifyOnCheckpoint;
        }
 
        @Override
-       public CompletableFuture<?> isAvailable() {
-               return inputGate.isAvailable();
+       public void releaseBlocksAndResetBarriers() {
        }
 
        @Override
-       public boolean isFinished() {
-               return inputGate.isFinished();
+       public boolean isBlocked(int channelIndex) {
+               return false;
        }
 
        @Override
-       public Optional<BufferOrEvent> pollNext() throws Exception {
-               while (true) {
-                       Optional<BufferOrEvent> next = inputGate.pollNext();
-                       if (!next.isPresent()) {
-                               // buffer or input exhausted
-                               return next;
-                       }
-
-                       BufferOrEvent bufferOrEvent = next.get();
-                       if (bufferOrEvent.isBuffer()) {
-                               return next;
-                       }
-                       else if (bufferOrEvent.getEvent().getClass() == 
CheckpointBarrier.class) {
-                               processBarrier((CheckpointBarrier) 
bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
-                       }
-                       else if (bufferOrEvent.getEvent().getClass() == 
CancelCheckpointMarker.class) {
-                               
processCheckpointAbortBarrier((CancelCheckpointMarker) 
bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());
-                       }
-                       else {
-                               // some other event
-                               return next;
-                       }
-               }
-       }
-
-       @Override
-       public void cleanup() {
-               pendingCheckpoints.clear();
-       }
-
-       @Override
-       public boolean isEmpty() {
-               return pendingCheckpoints.isEmpty();
-       }
-
-       @Override
-       public long getAlignmentDurationNanos() {
-               // this one does not do alignment at all
-               return 0L;
-       }
-
-       @Override
-       public int getNumberOfInputChannels() {
-               return totalNumberOfInputChannels;
-       }
-
-       private void processBarrier(CheckpointBarrier receivedBarrier, int 
channelIndex) throws Exception {
+       public boolean processBarrier(CheckpointBarrier receivedBarrier, int 
channelIndex, long bufferedBytes) throws Exception {
                final long barrierId = receivedBarrier.getId();
 
                // fast path for single channel trackers
                if (totalNumberOfInputChannels == 1) {
-                       notifyCheckpoint(barrierId, 
receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());
-                       return;
+                       notifyCheckpoint(receivedBarrier, 0, 0);
+                       return false;
                }
 
                // general path for multiple input channels
@@ -169,20 +106,20 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                }
 
                // find the checkpoint barrier in the queue of pending barriers
-               CheckpointBarrierCount cbc = null;
+               CheckpointBarrierCount barrierCount = null;
                int pos = 0;
 
                for (CheckpointBarrierCount next : pendingCheckpoints) {
                        if (next.checkpointId == barrierId) {
-                               cbc = next;
+                               barrierCount = next;
                                break;
                        }
                        pos++;
                }
 
-               if (cbc != null) {
+               if (barrierCount != null) {
                        // add one to the count to that barrier and check for 
completion
-                       int numBarriersNew = cbc.incrementBarrierCount();
+                       int numBarriersNew = 
barrierCount.incrementBarrierCount();
                        if (numBarriersNew == totalNumberOfInputChannels) {
                                // checkpoint can be triggered (or is aborted 
and all barriers have been seen)
                                // first, remove this checkpoint and all all 
prior pending
@@ -192,12 +129,12 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                                }
 
                                // notify the listener
-                               if (!cbc.isAborted()) {
+                               if (!barrierCount.isAborted()) {
                                        if (LOG.isDebugEnabled()) {
                                                LOG.debug("Received all 
barriers for checkpoint {}", barrierId);
                                        }
 
-                                       
notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), 
receivedBarrier.getCheckpointOptions());
+                                       notifyCheckpoint(receivedBarrier, 0, 0);
                                }
                        }
                }
@@ -216,19 +153,21 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                                }
                        }
                }
+               return false;
        }
 
-       private void processCheckpointAbortBarrier(CancelCheckpointMarker 
barrier, int channelIndex) throws Exception {
-               final long checkpointId = barrier.getCheckpointId();
+       @Override
+       public boolean processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
+               final long checkpointId = cancelBarrier.getCheckpointId();
 
                if (LOG.isDebugEnabled()) {
-                       LOG.debug("Received cancellation barrier for checkpoint 
{} from channel {}", checkpointId, channelIndex);
+                       LOG.debug("Received cancellation barrier for checkpoint 
{}", checkpointId);
                }
 
                // fast path for single channel trackers
                if (totalNumberOfInputChannels == 1) {
-                       notifyAbort(checkpointId);
-                       return;
+                       notifyAbortOnCancellationBarrier(checkpointId);
+                       return false;
                }
 
                // -- general path for multiple input channels --
@@ -241,7 +180,7 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
 
                        if (cbc.markAborted()) {
                                // abort the subsumed checkpoints if not 
already done
-                               notifyAbort(cbc.checkpointId());
+                               
notifyAbortOnCancellationBarrier(cbc.checkpointId());
                        }
                }
 
@@ -249,7 +188,7 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                        // make sure the checkpoint is remembered as aborted
                        if (cbc.markAborted()) {
                                // this was the first time the checkpoint was 
aborted - notify
-                               notifyAbort(checkpointId);
+                               notifyAbortOnCancellationBarrier(checkpointId);
                        }
 
                        // we still count the barriers to be able to remove the 
entry once all barriers have been seen
@@ -259,7 +198,7 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                        }
                }
                else if (checkpointId > latestPendingCheckpointID) {
-                       notifyAbort(checkpointId);
+                       notifyAbortOnCancellationBarrier(checkpointId);
 
                        latestPendingCheckpointID = checkpointId;
 
@@ -272,28 +211,33 @@ public class BarrierTracker implements 
CheckpointBarrierHandler {
                } else {
                        // trailing cancellation barrier which was already 
cancelled
                }
+               return false;
        }
 
-       private void notifyCheckpoint(long checkpointId, long timestamp, 
CheckpointOptions checkpointOptions) throws Exception {
-               if (toNotifyOnCheckpoint != null) {
-                       CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, timestamp);
-                       CheckpointMetrics checkpointMetrics = new 
CheckpointMetrics()
-                               .setBytesBufferedInAlignment(0L)
-                               .setAlignmentDurationNanos(0L);
-
-                       
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, 
checkpointOptions, checkpointMetrics);
+       @Override
+       public boolean processEndOfPartition() throws Exception {
+               while (!pendingCheckpoints.isEmpty()) {
+                       CheckpointBarrierCount barrierCount = 
pendingCheckpoints.removeFirst();
+                       if (barrierCount.markAborted()) {
+                               notifyAbort(barrierCount.checkpointId(),
+                                       new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
+                       }
                }
+               return false;
        }
 
-       private void notifyAbort(long checkpointId) throws Exception {
-               if (toNotifyOnCheckpoint != null) {
-                       toNotifyOnCheckpoint.abortCheckpointOnBarrier(
-                               checkpointId,
-                               new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
-               }
+       public long getLatestCheckpointId() {
+               return pendingCheckpoints.isEmpty() ? -1 : 
pendingCheckpoints.peekLast().checkpointId();
        }
 
-       // 
------------------------------------------------------------------------
+       public long getAlignmentDurationNanos() {
+               return 0;
+       }
+
+       @Override
+       public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws 
Exception {
+               throw new UnsupportedOperationException("This should never 
happened as this class doesn't block any data");
+       }
 
        /**
         * Simple class for a checkpoint ID with a barrier counter.
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
similarity index 84%
copy from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
index 2ee1a97..cdbbfbc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java
@@ -25,12 +25,11 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import java.io.IOException;
 
 /**
- * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the 
input channels.
- * Different implementations may either simply track barriers, or block 
certain inputs on
- * barriers.
+ * The {@link CheckpointedInputGate} uses {@link CheckpointBarrierHandler} to 
handle incoming
+ * {@link org.apache.flink.runtime.io.network.api.CheckpointBarrier} from the 
{@link org.apache.flink.runtime.io.network.partition.consumer.InputGate}.
  */
 @Internal
-public interface CheckpointBarrierHandler extends 
AsyncDataInput<BufferOrEvent> {
+public interface CheckpointedInputGate extends AsyncDataInput<BufferOrEvent> {
        /**
         * Cleans up all internally held resources.
         *
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/EmptyBufferStorage.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/EmptyBufferStorage.java
new file mode 100644
index 0000000..5535c49
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/EmptyBufferStorage.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.io;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+
+import java.io.IOException;
+import java.util.Optional;
+
+/**
+ * Always empty implementation of {@link BufferStorage}. It doesn't allow for 
adding any data.
+ */
+@Internal
+public class EmptyBufferStorage implements BufferStorage {
+       @Override
+       public void add(BufferOrEvent boe) throws IOException {
+               throw new UnsupportedOperationException("Adding to 
EmptyBufferStorage is unsupported");
+       }
+
+       @Override
+       public boolean isFull() {
+               return false;
+       }
+
+       @Override
+       public void rollOver() throws IOException {
+       }
+
+       @Override
+       public long getPendingBytes() {
+               return 0;
+       }
+
+       @Override
+       public long getRolledBytes() {
+               return 0;
+       }
+
+       @Override
+       public boolean isEmpty() {
+               return true;
+       }
+
+       @Override
+       public Optional<BufferOrEvent> pollNext() throws IOException {
+               return Optional.empty();
+       }
+
+       @Override
+       public long getMaxBufferedBytes() {
+               return -1;
+       }
+
+       @Override
+       public void close() throws IOException {
+       }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
index c9ec6bf..75926b9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
@@ -30,13 +30,13 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import java.io.IOException;
 
 /**
- * Utility for creating {@link CheckpointBarrierHandler} based on checkpoint 
mode
+ * Utility for creating {@link CheckpointedInputGate} based on checkpoint mode
  * for {@link StreamInputProcessor} and {@link StreamTwoInputProcessor}.
  */
 @Internal
 public class InputProcessorUtil {
 
-       public static CheckpointBarrierHandler createCheckpointBarrierHandler(
+       public static CheckpointedInputGate createCheckpointBarrierHandler(
                        StreamTask<?, ?> checkpointedTask,
                        CheckpointingMode checkpointMode,
                        IOManager ioManager,
@@ -44,7 +44,7 @@ public class InputProcessorUtil {
                        Configuration taskManagerConfig,
                        String taskName) throws IOException {
 
-               CheckpointBarrierHandler barrierHandler;
+               CheckpointedInputGate barrierHandler;
                if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) {
                        long maxAlign = 
taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);
                        if (!(maxAlign == -1 || maxAlign > 0)) {
@@ -67,7 +67,10 @@ public class InputProcessorUtil {
                                        checkpointedTask);
                        }
                } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) {
-                       barrierHandler = new BarrierTracker(inputGate);
+                       barrierHandler = new BarrierBuffer(
+                               inputGate,
+                               new EmptyBufferStorage(),
+                               new 
CheckpointBarrierTracker(inputGate.getNumberOfInputChannels(), 
checkpointedTask));
                } else {
                        throw new IllegalArgumentException("Unrecognized 
Checkpointing Mode: " + checkpointMode);
                }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 742dbe8..58b2051 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -104,7 +104,7 @@ public class StreamInputProcessor<IN> {
 
                InputGate inputGate = InputGateUtil.createInputGate(inputGates);
 
-               CheckpointBarrierHandler barrierHandler = 
InputProcessorUtil.createCheckpointBarrierHandler(
+               CheckpointedInputGate barrierHandler = 
InputProcessorUtil.createCheckpointBarrierHandler(
                        checkpointedTask,
                        checkpointMode,
                        ioManager,
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
index 85e7f46..ecf88e2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java
@@ -42,12 +42,12 @@ import java.util.concurrent.CompletableFuture;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /**
- * Implementation of {@link StreamTaskInput} that wraps an input from network 
taken from {@link CheckpointBarrierHandler}.
+ * Implementation of {@link StreamTaskInput} that wraps an input from network 
taken from {@link CheckpointedInputGate}.
  */
 @Internal
 public final class StreamTaskNetworkInput implements StreamTaskInput {
 
-       private final CheckpointBarrierHandler barrierHandler;
+       private final CheckpointedInputGate barrierHandler;
 
        private final DeserializationDelegate<StreamElement> 
deserializationDelegate;
 
@@ -63,7 +63,7 @@ public final class StreamTaskNetworkInput implements 
StreamTaskInput {
 
        @SuppressWarnings("unchecked")
        public StreamTaskNetworkInput(
-                       CheckpointBarrierHandler barrierHandler,
+                       CheckpointedInputGate barrierHandler,
                        TypeSerializer<?> inputSerializer,
                        IOManager ioManager,
                        int inputIndex) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 4efbc7f..aa6354d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -86,7 +86,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
        private final DeserializationDelegate<StreamElement> 
deserializationDelegate1;
        private final DeserializationDelegate<StreamElement> 
deserializationDelegate2;
 
-       private final CheckpointBarrierHandler barrierHandler;
+       private final CheckpointedInputGate barrierHandler;
 
        private final Object lock;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
index 54ce749..d5ebf29 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java
@@ -120,8 +120,9 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> {
                InputGate unionedInputGate2 = 
InputGateUtil.createInputGate(inputGates2.toArray(new InputGate[0]));
 
                // create a Input instance for each input
-               this.input1 = new StreamTaskNetworkInput(new 
BarrierDiscarder(unionedInputGate1), inputSerializer1, ioManager, 0);
-               this.input2 = new StreamTaskNetworkInput(new 
BarrierDiscarder(unionedInputGate2), inputSerializer2, ioManager, 1);
+               CachedBufferStorage bufferStorage = new 
CachedBufferStorage(unionedInputGate1.getPageSize());
+               this.input1 = new StreamTaskNetworkInput(new 
BarrierBuffer(unionedInputGate1, bufferStorage, new 
CheckpointBarrierDiscarder()), inputSerializer1, ioManager, 0);
+               this.input2 = new StreamTaskNetworkInput(new 
BarrierBuffer(unionedInputGate2, bufferStorage, new 
CheckpointBarrierDiscarder()), inputSerializer2, ioManager, 1);
 
                this.statusWatermarkValve1 = new StatusWatermarkValve(
                        unionedInputGate1.getNumberOfInputChannels(),
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
index e6e48a8..13c4aad 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java
@@ -483,7 +483,7 @@ public abstract class BarrierBufferTestBase {
                // align checkpoint 1
                startTs = System.nanoTime();
                check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(1L, buffer.getCurrentCheckpointId());
+               assertEquals(1L, buffer.getLatestCheckpointId());
 
                // checkpoint done - replay buffered
                check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
@@ -501,7 +501,7 @@ public abstract class BarrierBufferTestBase {
 
                // checkpoint 2 aborted, checkpoint 3 started
                check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(3L, buffer.getCurrentCheckpointId());
+               assertEquals(3L, buffer.getLatestCheckpointId());
                validateAlignmentTime(startTs, 
buffer.getAlignmentDurationNanos());
                verify(toNotify).abortCheckpointOnBarrier(eq(2L),
                        argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED)));
@@ -565,7 +565,7 @@ public abstract class BarrierBufferTestBase {
                check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(1L, buffer.getCurrentCheckpointId());
+               assertEquals(1L, buffer.getLatestCheckpointId());
 
                check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
@@ -574,7 +574,7 @@ public abstract class BarrierBufferTestBase {
 
                // alignment of checkpoint 2
                check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(2L, buffer.getCurrentCheckpointId());
+               assertEquals(2L, buffer.getLatestCheckpointId());
                check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
@@ -583,7 +583,7 @@ public abstract class BarrierBufferTestBase {
 
                // checkpoint 2 aborted, checkpoint 4 started. replay buffered
                check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(4L, buffer.getCurrentCheckpointId());
+               assertEquals(4L, buffer.getLatestCheckpointId());
                check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
@@ -651,7 +651,7 @@ public abstract class BarrierBufferTestBase {
                check(sequence[1], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[2], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(1L, buffer.getCurrentCheckpointId());
+               assertEquals(1L, buffer.getLatestCheckpointId());
                check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[9], buffer.pollNext().get(), PAGE_SIZE);
@@ -660,7 +660,7 @@ public abstract class BarrierBufferTestBase {
                // alignment of checkpoint 2
                check(sequence[13], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(2L, buffer.getCurrentCheckpointId());
+               assertEquals(2L, buffer.getLatestCheckpointId());
 
                // checkpoint 2 completed
                check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
@@ -669,7 +669,7 @@ public abstract class BarrierBufferTestBase {
 
                // checkpoint 3 skipped, alignment for 4 started
                check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(4L, buffer.getCurrentCheckpointId());
+               assertEquals(4L, buffer.getLatestCheckpointId());
                check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[24], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[26], buffer.pollNext().get(), PAGE_SIZE);
@@ -790,24 +790,24 @@ public abstract class BarrierBufferTestBase {
 
                // checkpoint 3 alignment
                check(sequence[7], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(2L, buffer.getCurrentCheckpointId());
+               assertEquals(2L, buffer.getLatestCheckpointId());
                check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[11], buffer.pollNext().get(), PAGE_SIZE);
 
                // checkpoint 3 buffered
                check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(3L, buffer.getCurrentCheckpointId());
+               assertEquals(3L, buffer.getLatestCheckpointId());
 
                // after checkpoint 4
                check(sequence[15], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(4L, buffer.getCurrentCheckpointId());
+               assertEquals(4L, buffer.getLatestCheckpointId());
                check(sequence[16], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[17], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[18], buffer.pollNext().get(), PAGE_SIZE);
 
                check(sequence[19], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[21], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(5L, buffer.getCurrentCheckpointId());
+               assertEquals(5L, buffer.getLatestCheckpointId());
                check(sequence[22], buffer.pollNext().get(), PAGE_SIZE);
        }
 
@@ -837,11 +837,11 @@ public abstract class BarrierBufferTestBase {
                check(sequence[3], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[4], buffer.pollNext().get(), PAGE_SIZE);
                check(sequence[5], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(1L, buffer.getCurrentCheckpointId());
+               assertEquals(1L, buffer.getLatestCheckpointId());
 
                // alignment of second checkpoint
                check(sequence[10], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(2L, buffer.getCurrentCheckpointId());
+               assertEquals(2L, buffer.getLatestCheckpointId());
 
                // first end-of-partition encountered: checkpoint will not be 
completed
                check(sequence[12], buffer.pollNext().get(), PAGE_SIZE);
@@ -874,7 +874,7 @@ public abstract class BarrierBufferTestBase {
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                check(sequence[6], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(5L, buffer.getCurrentCheckpointId());
+               assertEquals(5L, buffer.getLatestCheckpointId());
                verify(toNotify, 
times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), 
any(CheckpointOptions.class), any(CheckpointMetrics.class));
                verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L),
                        argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
@@ -882,7 +882,7 @@ public abstract class BarrierBufferTestBase {
                assertEquals(0L, buffer.getAlignmentDurationNanos());
 
                check(sequence[8], buffer.pollNext().get(), PAGE_SIZE);
-               assertEquals(6L, buffer.getCurrentCheckpointId());
+               assertEquals(6L, buffer.getLatestCheckpointId());
                verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L),
                        argThat(new 
CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)));
                assertEquals(0L, buffer.getAlignmentDurationNanos());
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
index a8e7727..5112f63 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java
@@ -46,7 +46,7 @@ public class BarrierTrackerTest {
 
        private static final int PAGE_SIZE = 512;
 
-       private BarrierTracker tracker;
+       private CheckpointedInputGate tracker;
 
        @After
        public void ensureEmpty() throws Exception {
@@ -365,16 +365,19 @@ public class BarrierTrackerTest {
        // 
------------------------------------------------------------------------
        //  Utils
        // 
------------------------------------------------------------------------
-       private static BarrierTracker createBarrierTracker(int 
numberOfChannels, BufferOrEvent[] sequence) {
+       private static CheckpointedInputGate createBarrierTracker(int 
numberOfChannels, BufferOrEvent[] sequence) {
                return createBarrierTracker(numberOfChannels, sequence, null);
        }
 
-       private static BarrierTracker createBarrierTracker(
+       private static CheckpointedInputGate createBarrierTracker(
                        int numberOfChannels,
                        BufferOrEvent[] sequence,
                        @Nullable AbstractInvokable toNotifyOnCheckpoint) {
                MockInputGate gate = new MockInputGate(PAGE_SIZE, 
numberOfChannels, Arrays.asList(sequence));
-               return new BarrierTracker(gate, toNotifyOnCheckpoint);
+               return new BarrierBuffer(
+                       gate,
+                       new CachedBufferStorage(PAGE_SIZE, -1, "Testing"),
+                       new 
CheckpointBarrierTracker(gate.getNumberOfInputChannels(), 
toNotifyOnCheckpoint));
        }
 
        private static BufferOrEvent createBarrier(long id, int channel) {

Reply via email to