This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 52525884826cd0e99b2309d89d74fa270d51ec90 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Sat Jun 15 10:18:01 2019 +0200 [FLIKN-12777][network] Refactor BarrierTracker to use the same code structure as BarrierBuffer --- .../flink/streaming/runtime/io/BarrierBuffer.java | 52 ++++--- .../streaming/runtime/io/BarrierDiscarder.java | 105 -------------- .../runtime/io/BufferOrEventSequence.java | 2 +- .../runtime/io/CheckpointBarrierAligner.java | 65 ++------- .../runtime/io/CheckpointBarrierDiscarder.java | 74 ++++++++++ .../runtime/io/CheckpointBarrierHandler.java | 85 ++++++++--- ...rTracker.java => CheckpointBarrierTracker.java} | 158 +++++++-------------- ...rierHandler.java => CheckpointedInputGate.java} | 7 +- .../streaming/runtime/io/EmptyBufferStorage.java | 73 ++++++++++ .../streaming/runtime/io/InputProcessorUtil.java | 11 +- .../streaming/runtime/io/StreamInputProcessor.java | 2 +- .../runtime/io/StreamTaskNetworkInput.java | 6 +- .../runtime/io/StreamTwoInputProcessor.java | 2 +- .../io/StreamTwoInputSelectableProcessor.java | 5 +- .../runtime/io/BarrierBufferTestBase.java | 32 ++--- .../streaming/runtime/io/BarrierTrackerTest.java | 11 +- 16 files changed, 349 insertions(+), 341 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java index 1594389..8dcc005 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java @@ -38,7 +38,7 @@ import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkNotNull; /** - * The barrier buffer is {@link CheckpointBarrierHandler} that blocks inputs with barriers until + * The barrier buffer is {@link CheckpointedInputGate} that blocks inputs with barriers until * all inputs have received the barrier for a given checkpoint. * * <p>To avoid back-pressuring the input streams (which may cause distributed deadlocks), the @@ -46,11 +46,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * the blocks are released. */ @Internal -public class BarrierBuffer implements CheckpointBarrierHandler { +public class BarrierBuffer implements CheckpointedInputGate { private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class); - private final CheckpointBarrierAligner barrierAligner; + private final CheckpointBarrierHandler barrierHandler; /** The gate that the buffer draws its input from. */ private final InputGate inputGate; @@ -77,6 +77,21 @@ public class BarrierBuffer implements CheckpointBarrierHandler { this (inputGate, bufferStorage, "Testing: No task associated", null); } + BarrierBuffer( + InputGate inputGate, + BufferStorage bufferStorage, + String taskName, + @Nullable AbstractInvokable toNotifyOnCheckpoint) { + this( + inputGate, + bufferStorage, + new CheckpointBarrierAligner( + inputGate.getNumberOfInputChannels(), + taskName, + toNotifyOnCheckpoint) + ); + } + /** * Creates a new checkpoint stream aligner. * @@ -86,20 +101,15 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * * @param inputGate The input gate to draw the buffers and events from. * @param bufferStorage The storage to hold the buffers and events for blocked channels. - * @param taskName The task name for logging. - * @param toNotifyOnCheckpoint optional Handler that receives the checkpoint notifications. + * @param barrierHandler Handler that controls which channels are blocked. */ BarrierBuffer( InputGate inputGate, BufferStorage bufferStorage, - String taskName, - @Nullable AbstractInvokable toNotifyOnCheckpoint) { + CheckpointBarrierHandler barrierHandler) { this.inputGate = inputGate; this.bufferStorage = checkNotNull(bufferStorage); - this.barrierAligner = new CheckpointBarrierAligner( - inputGate.getNumberOfInputChannels(), - taskName, - toNotifyOnCheckpoint); + this.barrierHandler = barrierHandler; } @Override @@ -131,11 +141,11 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } BufferOrEvent bufferOrEvent = next.get(); - if (barrierAligner.isBlocked(bufferOrEvent.getChannelIndex())) { + if (barrierHandler.isBlocked(bufferOrEvent.getChannelIndex())) { // if the channel is blocked, we just store the BufferOrEvent bufferStorage.add(bufferOrEvent); if (bufferStorage.isFull()) { - barrierAligner.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes()); + barrierHandler.checkpointSizeLimitExceeded(bufferStorage.getMaxBufferedBytes()); bufferStorage.rollOver(); } } @@ -146,19 +156,19 @@ public class BarrierBuffer implements CheckpointBarrierHandler { CheckpointBarrier checkpointBarrier = (CheckpointBarrier) bufferOrEvent.getEvent(); if (!endOfInputGate) { // process barriers only if there is a chance of the checkpoint completing - if (barrierAligner.processBarrier(checkpointBarrier, bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) { + if (barrierHandler.processBarrier(checkpointBarrier, bufferOrEvent.getChannelIndex(), bufferStorage.getPendingBytes())) { bufferStorage.rollOver(); } } } else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) { - if (barrierAligner.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent())) { + if (barrierHandler.processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent())) { bufferStorage.rollOver(); } } else { if (bufferOrEvent.getEvent().getClass() == EndOfPartitionEvent.class) { - if (barrierAligner.processEndOfPartition()) { + if (barrierHandler.processEndOfPartition()) { bufferStorage.rollOver(); } } @@ -178,7 +188,7 @@ public class BarrierBuffer implements CheckpointBarrierHandler { } else { // end of input stream. stream continues with the buffered data endOfInputGate = true; - barrierAligner.releaseBlocksAndResetBarriers(); + barrierHandler.releaseBlocksAndResetBarriers(); bufferStorage.rollOver(); return pollNext(); } @@ -208,13 +218,13 @@ public class BarrierBuffer implements CheckpointBarrierHandler { * * @return The ID of the pending of completed checkpoint. */ - public long getCurrentCheckpointId() { - return barrierAligner.getCurrentCheckpointId(); + public long getLatestCheckpointId() { + return barrierHandler.getLatestCheckpointId(); } @Override public long getAlignmentDurationNanos() { - return barrierAligner.getAlignmentDurationNanos(); + return barrierHandler.getAlignmentDurationNanos(); } @Override @@ -228,6 +238,6 @@ public class BarrierBuffer implements CheckpointBarrierHandler { @Override public String toString() { - return barrierAligner.toString(); + return barrierHandler.toString(); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java deleted file mode 100644 index c33c940..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierDiscarder.java +++ /dev/null @@ -1,105 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.runtime.io; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; -import org.apache.flink.runtime.io.network.api.CheckpointBarrier; -import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; - -import java.util.Optional; -import java.util.concurrent.CompletableFuture; - -/** - * The BarrierDiscarder discards checkpoint barriers have been received from which input channels. - */ -@Internal -public class BarrierDiscarder implements CheckpointBarrierHandler { - - // ------------------------------------------------------------------------ - - /** The input gate, to draw the buffers and events from. */ - private final InputGate inputGate; - - /** - * The number of channels. Once that many barriers have been received for a checkpoint, the - * checkpoint is considered complete. - */ - private final int totalNumberOfInputChannels; - - // ------------------------------------------------------------------------ - - public BarrierDiscarder(InputGate inputGate) { - this.inputGate = inputGate; - this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); - } - - @Override - public CompletableFuture<?> isAvailable() { - return inputGate.isAvailable(); - } - - @Override - public boolean isFinished() { - return inputGate.isFinished(); - } - - @Override - public Optional<BufferOrEvent> pollNext() throws Exception { - while (true) { - Optional<BufferOrEvent> next = inputGate.pollNext(); - if (!next.isPresent()) { - // buffer or input exhausted - return next; - } - - BufferOrEvent bufferOrEvent = next.get(); - if (bufferOrEvent.isBuffer()) { - return next; - } - else if (bufferOrEvent.getEvent().getClass() != CheckpointBarrier.class && - bufferOrEvent.getEvent().getClass() != CancelCheckpointMarker.class) { - // some other event - return next; - } - } - } - - @Override - public void cleanup() { - - } - - @Override - public boolean isEmpty() { - return true; - } - - @Override - public long getAlignmentDurationNanos() { - // this one does not do alignment at all - return 0L; - } - - @Override - public int getNumberOfInputChannels() { - return totalNumberOfInputChannels; - } -} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java index c5bde1b..21649bf 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferOrEventSequence.java @@ -27,7 +27,7 @@ import java.io.IOException; /** * This class represents a sequence of buffers and events which are blocked by - * {@link CheckpointBarrierHandler}. The sequence of buffers and events can be + * {@link CheckpointedInputGate}. The sequence of buffers and events can be * read back using the method {@link #getNext()}. */ @Internal diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java index 30e05c1..482ba65 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java @@ -21,8 +21,6 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; -import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -40,7 +38,7 @@ import java.io.IOException; * release blocked channels. */ @Internal -public class CheckpointBarrierAligner { +public class CheckpointBarrierAligner extends CheckpointBarrierHandler { private static final Logger LOG = LoggerFactory.getLogger(CheckpointBarrierAligner.class); @@ -52,9 +50,6 @@ public class CheckpointBarrierAligner { private final String taskName; - @Nullable - private final AbstractInvokable toNotifyOnCheckpoint; - /** The ID of the checkpoint for which we expect barriers. */ private long currentCheckpointId = -1L; @@ -77,13 +72,14 @@ public class CheckpointBarrierAligner { int totalNumberOfInputChannels, String taskName, @Nullable AbstractInvokable toNotifyOnCheckpoint) { + super(toNotifyOnCheckpoint); this.totalNumberOfInputChannels = totalNumberOfInputChannels; this.taskName = taskName; - this.toNotifyOnCheckpoint = toNotifyOnCheckpoint; this.blockedChannels = new boolean[totalNumberOfInputChannels]; } + @Override public void releaseBlocksAndResetBarriers() throws IOException { LOG.debug("{}: End of stream alignment, feeding buffered data back.", taskName); @@ -100,19 +96,12 @@ public class CheckpointBarrierAligner { } } - /** - * Checks whether the channel with the given index is blocked. - * - * @param channelIndex The channel index to check. - * @return True if the channel is blocked, false if not. - */ + @Override public boolean isBlocked(int channelIndex) { return blockedChannels[channelIndex]; } - /** - * @return true if some blocked data should be unblocked/rolled over. - */ + @Override public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception { final long barrierId = receivedBarrier.getId(); @@ -121,7 +110,7 @@ public class CheckpointBarrierAligner { if (barrierId > currentCheckpointId) { // new checkpoint currentCheckpointId = barrierId; - notifyCheckpoint(receivedBarrier, bufferedBytes); + notifyCheckpoint(receivedBarrier, bufferedBytes, latestAlignmentDurationNanos); } return false; } @@ -185,7 +174,7 @@ public class CheckpointBarrierAligner { } releaseBlocksAndResetBarriers(); - notifyCheckpoint(receivedBarrier, bufferedBytes); + notifyCheckpoint(receivedBarrier, bufferedBytes, latestAlignmentDurationNanos); return true; } return checkpointAborted; @@ -222,9 +211,7 @@ public class CheckpointBarrierAligner { } } - /** - * @return true if some blocked data should be unblocked/rolled over. - */ + @Override public boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception { final long barrierId = cancelBarrier.getCheckpointId(); @@ -300,9 +287,7 @@ public class CheckpointBarrierAligner { return false; } - /** - * @return true if some blocked data should be unblocked/rolled over. - */ + @Override public boolean processEndOfPartition() throws Exception { numClosedChannels++; @@ -317,37 +302,12 @@ public class CheckpointBarrierAligner { return false; } - private void notifyCheckpoint(CheckpointBarrier checkpointBarrier, long bufferedBytes) throws Exception { - if (toNotifyOnCheckpoint != null) { - CheckpointMetaData checkpointMetaData = - new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp()); - - CheckpointMetrics checkpointMetrics = new CheckpointMetrics() - .setBytesBufferedInAlignment(bufferedBytes) - .setAlignmentDurationNanos(latestAlignmentDurationNanos); - - toNotifyOnCheckpoint.triggerCheckpointOnBarrier( - checkpointMetaData, - checkpointBarrier.getCheckpointOptions(), - checkpointMetrics); - } - } - - private void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception { - notifyAbort(checkpointId, - new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)); - } - - private void notifyAbort(long checkpointId, CheckpointException cause) throws Exception { - if (toNotifyOnCheckpoint != null) { - toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause); - } - } - - public long getCurrentCheckpointId() { + @Override + public long getLatestCheckpointId() { return currentCheckpointId; } + @Override public long getAlignmentDurationNanos() { if (startOfAlignmentTimestamp <= 0) { return latestAlignmentDurationNanos; @@ -365,6 +325,7 @@ public class CheckpointBarrierAligner { numClosedChannels); } + @Override public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception { releaseBlocksAndResetBarriers(); notifyAbort(currentCheckpointId, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java new file mode 100644 index 0000000..4c6cdab --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierDiscarder.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; + +import java.io.IOException; + +/** + * The {@link CheckpointBarrierDiscarder} discards checkpoint barriers have been received from which input channels. + */ +@Internal +public class CheckpointBarrierDiscarder extends CheckpointBarrierHandler { + public CheckpointBarrierDiscarder() { + super(null); + } + + @Override + public void releaseBlocksAndResetBarriers() throws IOException { + } + + @Override + public boolean isBlocked(int channelIndex) { + return false; + } + + @Override + public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception { + return false; + } + + @Override + public boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception { + return false; + } + + @Override + public boolean processEndOfPartition() throws Exception { + return false; + } + + @Override + public long getLatestCheckpointId() { + return 0; + } + + @Override + public long getAlignmentDurationNanos() { + return 0; + } + + @Override + public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception { + + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java index 2ee1a97..41e043e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java @@ -18,43 +18,88 @@ package org.apache.flink.streaming.runtime.io; -import org.apache.flink.annotation.Internal; -import org.apache.flink.runtime.io.AsyncDataInput; -import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; + +import javax.annotation.Nullable; import java.io.IOException; /** - * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the input channels. + * The {@link CheckpointBarrierHandler} reacts to checkpoint barrier arriving from the input channels. * Different implementations may either simply track barriers, or block certain inputs on * barriers. */ -@Internal -public interface CheckpointBarrierHandler extends AsyncDataInput<BufferOrEvent> { +public abstract class CheckpointBarrierHandler { + + /** The listener to be notified on complete checkpoints. */ + @Nullable + private final AbstractInvokable toNotifyOnCheckpoint; + + public CheckpointBarrierHandler(@Nullable AbstractInvokable toNotifyOnCheckpoint) { + this.toNotifyOnCheckpoint = toNotifyOnCheckpoint; + } + + public abstract void releaseBlocksAndResetBarriers() throws IOException; + /** - * Cleans up all internally held resources. + * Checks whether the channel with the given index is blocked. * - * @throws IOException Thrown if the cleanup of I/O resources failed. + * @param channelIndex The channel index to check. + * @return True if the channel is blocked, false if not. */ - void cleanup() throws IOException; + public abstract boolean isBlocked(int channelIndex); /** - * Checks if the barrier handler has buffered any data internally. - * @return {@code True}, if no data is buffered internally, {@code false} otherwise. + * @return true if some blocked data should be unblocked/rolled over. */ - boolean isEmpty(); + public abstract boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception; /** - * Gets the time that the latest alignment took, in nanoseconds. - * If there is currently an alignment in progress, it will return the time spent in the - * current alignment so far. - * - * @return The duration in nanoseconds + * @return true if some blocked data should be unblocked/rolled over. */ - long getAlignmentDurationNanos(); + public abstract boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception; /** - * @return number of underlying input channels. + * @return true if some blocked data should be unblocked/rolled over. */ - int getNumberOfInputChannels(); + public abstract boolean processEndOfPartition() throws Exception; + + public abstract long getLatestCheckpointId(); + + public abstract long getAlignmentDurationNanos(); + + public abstract void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception; + + protected void notifyCheckpoint(CheckpointBarrier checkpointBarrier, long bufferedBytes, long alignmentDurationNanos) throws Exception { + if (toNotifyOnCheckpoint != null) { + CheckpointMetaData checkpointMetaData = + new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp()); + + CheckpointMetrics checkpointMetrics = new CheckpointMetrics() + .setBytesBufferedInAlignment(bufferedBytes) + .setAlignmentDurationNanos(alignmentDurationNanos); + + toNotifyOnCheckpoint.triggerCheckpointOnBarrier( + checkpointMetaData, + checkpointBarrier.getCheckpointOptions(), + checkpointMetrics); + } + } + + protected void notifyAbortOnCancellationBarrier(long checkpointId) throws Exception { + notifyAbort(checkpointId, + new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)); + } + + protected void notifyAbort(long checkpointId, CheckpointException cause) throws Exception { + if (toNotifyOnCheckpoint != null) { + toNotifyOnCheckpoint.abortCheckpointOnBarrier(checkpointId, cause); + } + } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java similarity index 61% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java index f7629bb..0ec9004 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BarrierTracker.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierTracker.java @@ -21,13 +21,8 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.annotation.Internal; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; -import org.apache.flink.runtime.checkpoint.CheckpointMetaData; -import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; -import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; -import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.slf4j.Logger; @@ -36,24 +31,22 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; import java.util.ArrayDeque; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; /** - * The BarrierTracker keeps track of what checkpoint barriers have been received from + * The {@link CheckpointBarrierTracker} keeps track of what checkpoint barriers have been received from * which input channels. Once it has observed all checkpoint barriers for a checkpoint ID, * it notifies its listener of a completed checkpoint. * - * <p>Unlike the {@link BarrierBuffer}, the BarrierTracker does not block the input + * <p>Unlike the {@link CheckpointBarrierAligner}, the BarrierTracker does not block the input * channels that have sent barriers, so it cannot be used to gain "exactly-once" processing * guarantees. It can, however, be used to gain "at least once" processing guarantees. * * <p>NOTE: This implementation strictly assumes that newer checkpoints have higher checkpoint IDs. */ @Internal -public class BarrierTracker implements CheckpointBarrierHandler { +public class CheckpointBarrierTracker extends CheckpointBarrierHandler { - private static final Logger LOG = LoggerFactory.getLogger(BarrierTracker.class); + private static final Logger LOG = LoggerFactory.getLogger(CheckpointBarrierTracker.class); /** * The tracker tracks a maximum number of checkpoints, for which some, but not all barriers @@ -63,9 +56,6 @@ public class BarrierTracker implements CheckpointBarrierHandler { // ------------------------------------------------------------------------ - /** The input gate, to draw the buffers and events from. */ - private final InputGate inputGate; - /** * The number of channels. Once that many barriers have been received for a checkpoint, the * checkpoint is considered complete. @@ -78,89 +68,36 @@ public class BarrierTracker implements CheckpointBarrierHandler { */ private final ArrayDeque<CheckpointBarrierCount> pendingCheckpoints; - /** The listener to be notified on complete checkpoints. */ - private final AbstractInvokable toNotifyOnCheckpoint; - /** The highest checkpoint ID encountered so far. */ private long latestPendingCheckpointID = -1; - // ------------------------------------------------------------------------ - - public BarrierTracker(InputGate inputGate) { - this(inputGate, null); + public CheckpointBarrierTracker(int totalNumberOfInputChannels) { + this(totalNumberOfInputChannels, null); } - public BarrierTracker(InputGate inputGate, @Nullable AbstractInvokable toNotifyOnCheckpoint) { - this.inputGate = inputGate; - this.totalNumberOfInputChannels = inputGate.getNumberOfInputChannels(); + public CheckpointBarrierTracker(int totalNumberOfInputChannels, @Nullable AbstractInvokable toNotifyOnCheckpoint) { + super(toNotifyOnCheckpoint); + this.totalNumberOfInputChannels = totalNumberOfInputChannels; this.pendingCheckpoints = new ArrayDeque<>(); - this.toNotifyOnCheckpoint = toNotifyOnCheckpoint; } @Override - public CompletableFuture<?> isAvailable() { - return inputGate.isAvailable(); + public void releaseBlocksAndResetBarriers() { } @Override - public boolean isFinished() { - return inputGate.isFinished(); + public boolean isBlocked(int channelIndex) { + return false; } @Override - public Optional<BufferOrEvent> pollNext() throws Exception { - while (true) { - Optional<BufferOrEvent> next = inputGate.pollNext(); - if (!next.isPresent()) { - // buffer or input exhausted - return next; - } - - BufferOrEvent bufferOrEvent = next.get(); - if (bufferOrEvent.isBuffer()) { - return next; - } - else if (bufferOrEvent.getEvent().getClass() == CheckpointBarrier.class) { - processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex()); - } - else if (bufferOrEvent.getEvent().getClass() == CancelCheckpointMarker.class) { - processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex()); - } - else { - // some other event - return next; - } - } - } - - @Override - public void cleanup() { - pendingCheckpoints.clear(); - } - - @Override - public boolean isEmpty() { - return pendingCheckpoints.isEmpty(); - } - - @Override - public long getAlignmentDurationNanos() { - // this one does not do alignment at all - return 0L; - } - - @Override - public int getNumberOfInputChannels() { - return totalNumberOfInputChannels; - } - - private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception { + public boolean processBarrier(CheckpointBarrier receivedBarrier, int channelIndex, long bufferedBytes) throws Exception { final long barrierId = receivedBarrier.getId(); // fast path for single channel trackers if (totalNumberOfInputChannels == 1) { - notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions()); - return; + notifyCheckpoint(receivedBarrier, 0, 0); + return false; } // general path for multiple input channels @@ -169,20 +106,20 @@ public class BarrierTracker implements CheckpointBarrierHandler { } // find the checkpoint barrier in the queue of pending barriers - CheckpointBarrierCount cbc = null; + CheckpointBarrierCount barrierCount = null; int pos = 0; for (CheckpointBarrierCount next : pendingCheckpoints) { if (next.checkpointId == barrierId) { - cbc = next; + barrierCount = next; break; } pos++; } - if (cbc != null) { + if (barrierCount != null) { // add one to the count to that barrier and check for completion - int numBarriersNew = cbc.incrementBarrierCount(); + int numBarriersNew = barrierCount.incrementBarrierCount(); if (numBarriersNew == totalNumberOfInputChannels) { // checkpoint can be triggered (or is aborted and all barriers have been seen) // first, remove this checkpoint and all all prior pending @@ -192,12 +129,12 @@ public class BarrierTracker implements CheckpointBarrierHandler { } // notify the listener - if (!cbc.isAborted()) { + if (!barrierCount.isAborted()) { if (LOG.isDebugEnabled()) { LOG.debug("Received all barriers for checkpoint {}", barrierId); } - notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions()); + notifyCheckpoint(receivedBarrier, 0, 0); } } } @@ -216,19 +153,21 @@ public class BarrierTracker implements CheckpointBarrierHandler { } } } + return false; } - private void processCheckpointAbortBarrier(CancelCheckpointMarker barrier, int channelIndex) throws Exception { - final long checkpointId = barrier.getCheckpointId(); + @Override + public boolean processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception { + final long checkpointId = cancelBarrier.getCheckpointId(); if (LOG.isDebugEnabled()) { - LOG.debug("Received cancellation barrier for checkpoint {} from channel {}", checkpointId, channelIndex); + LOG.debug("Received cancellation barrier for checkpoint {}", checkpointId); } // fast path for single channel trackers if (totalNumberOfInputChannels == 1) { - notifyAbort(checkpointId); - return; + notifyAbortOnCancellationBarrier(checkpointId); + return false; } // -- general path for multiple input channels -- @@ -241,7 +180,7 @@ public class BarrierTracker implements CheckpointBarrierHandler { if (cbc.markAborted()) { // abort the subsumed checkpoints if not already done - notifyAbort(cbc.checkpointId()); + notifyAbortOnCancellationBarrier(cbc.checkpointId()); } } @@ -249,7 +188,7 @@ public class BarrierTracker implements CheckpointBarrierHandler { // make sure the checkpoint is remembered as aborted if (cbc.markAborted()) { // this was the first time the checkpoint was aborted - notify - notifyAbort(checkpointId); + notifyAbortOnCancellationBarrier(checkpointId); } // we still count the barriers to be able to remove the entry once all barriers have been seen @@ -259,7 +198,7 @@ public class BarrierTracker implements CheckpointBarrierHandler { } } else if (checkpointId > latestPendingCheckpointID) { - notifyAbort(checkpointId); + notifyAbortOnCancellationBarrier(checkpointId); latestPendingCheckpointID = checkpointId; @@ -272,28 +211,33 @@ public class BarrierTracker implements CheckpointBarrierHandler { } else { // trailing cancellation barrier which was already cancelled } + return false; } - private void notifyCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception { - if (toNotifyOnCheckpoint != null) { - CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp); - CheckpointMetrics checkpointMetrics = new CheckpointMetrics() - .setBytesBufferedInAlignment(0L) - .setAlignmentDurationNanos(0L); - - toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointOptions, checkpointMetrics); + @Override + public boolean processEndOfPartition() throws Exception { + while (!pendingCheckpoints.isEmpty()) { + CheckpointBarrierCount barrierCount = pendingCheckpoints.removeFirst(); + if (barrierCount.markAborted()) { + notifyAbort(barrierCount.checkpointId(), + new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM)); + } } + return false; } - private void notifyAbort(long checkpointId) throws Exception { - if (toNotifyOnCheckpoint != null) { - toNotifyOnCheckpoint.abortCheckpointOnBarrier( - checkpointId, - new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)); - } + public long getLatestCheckpointId() { + return pendingCheckpoints.isEmpty() ? -1 : pendingCheckpoints.peekLast().checkpointId(); } - // ------------------------------------------------------------------------ + public long getAlignmentDurationNanos() { + return 0; + } + + @Override + public void checkpointSizeLimitExceeded(long maxBufferedBytes) throws Exception { + throw new UnsupportedOperationException("This should never happened as this class doesn't block any data"); + } /** * Simple class for a checkpoint ID with a barrier counter. diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java similarity index 84% copy from flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java copy to flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java index 2ee1a97..cdbbfbc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointedInputGate.java @@ -25,12 +25,11 @@ import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import java.io.IOException; /** - * The CheckpointBarrierHandler reacts to checkpoint barrier arriving from the input channels. - * Different implementations may either simply track barriers, or block certain inputs on - * barriers. + * The {@link CheckpointedInputGate} uses {@link CheckpointBarrierHandler} to handle incoming + * {@link org.apache.flink.runtime.io.network.api.CheckpointBarrier} from the {@link org.apache.flink.runtime.io.network.partition.consumer.InputGate}. */ @Internal -public interface CheckpointBarrierHandler extends AsyncDataInput<BufferOrEvent> { +public interface CheckpointedInputGate extends AsyncDataInput<BufferOrEvent> { /** * Cleans up all internally held resources. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/EmptyBufferStorage.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/EmptyBufferStorage.java new file mode 100644 index 0000000..5535c49 --- /dev/null +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/EmptyBufferStorage.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.io; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; + +import java.io.IOException; +import java.util.Optional; + +/** + * Always empty implementation of {@link BufferStorage}. It doesn't allow for adding any data. + */ +@Internal +public class EmptyBufferStorage implements BufferStorage { + @Override + public void add(BufferOrEvent boe) throws IOException { + throw new UnsupportedOperationException("Adding to EmptyBufferStorage is unsupported"); + } + + @Override + public boolean isFull() { + return false; + } + + @Override + public void rollOver() throws IOException { + } + + @Override + public long getPendingBytes() { + return 0; + } + + @Override + public long getRolledBytes() { + return 0; + } + + @Override + public boolean isEmpty() { + return true; + } + + @Override + public Optional<BufferOrEvent> pollNext() throws IOException { + return Optional.empty(); + } + + @Override + public long getMaxBufferedBytes() { + return -1; + } + + @Override + public void close() throws IOException { + } +} diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java index c9ec6bf..75926b9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java @@ -30,13 +30,13 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask; import java.io.IOException; /** - * Utility for creating {@link CheckpointBarrierHandler} based on checkpoint mode + * Utility for creating {@link CheckpointedInputGate} based on checkpoint mode * for {@link StreamInputProcessor} and {@link StreamTwoInputProcessor}. */ @Internal public class InputProcessorUtil { - public static CheckpointBarrierHandler createCheckpointBarrierHandler( + public static CheckpointedInputGate createCheckpointBarrierHandler( StreamTask<?, ?> checkpointedTask, CheckpointingMode checkpointMode, IOManager ioManager, @@ -44,7 +44,7 @@ public class InputProcessorUtil { Configuration taskManagerConfig, String taskName) throws IOException { - CheckpointBarrierHandler barrierHandler; + CheckpointedInputGate barrierHandler; if (checkpointMode == CheckpointingMode.EXACTLY_ONCE) { long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT); if (!(maxAlign == -1 || maxAlign > 0)) { @@ -67,7 +67,10 @@ public class InputProcessorUtil { checkpointedTask); } } else if (checkpointMode == CheckpointingMode.AT_LEAST_ONCE) { - barrierHandler = new BarrierTracker(inputGate); + barrierHandler = new BarrierBuffer( + inputGate, + new EmptyBufferStorage(), + new CheckpointBarrierTracker(inputGate.getNumberOfInputChannels(), checkpointedTask)); } else { throw new IllegalArgumentException("Unrecognized Checkpointing Mode: " + checkpointMode); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 742dbe8..58b2051 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -104,7 +104,7 @@ public class StreamInputProcessor<IN> { InputGate inputGate = InputGateUtil.createInputGate(inputGates); - CheckpointBarrierHandler barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler( + CheckpointedInputGate barrierHandler = InputProcessorUtil.createCheckpointBarrierHandler( checkpointedTask, checkpointMode, ioManager, diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java index 85e7f46..ecf88e2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskNetworkInput.java @@ -42,12 +42,12 @@ import java.util.concurrent.CompletableFuture; import static org.apache.flink.util.Preconditions.checkState; /** - * Implementation of {@link StreamTaskInput} that wraps an input from network taken from {@link CheckpointBarrierHandler}. + * Implementation of {@link StreamTaskInput} that wraps an input from network taken from {@link CheckpointedInputGate}. */ @Internal public final class StreamTaskNetworkInput implements StreamTaskInput { - private final CheckpointBarrierHandler barrierHandler; + private final CheckpointedInputGate barrierHandler; private final DeserializationDelegate<StreamElement> deserializationDelegate; @@ -63,7 +63,7 @@ public final class StreamTaskNetworkInput implements StreamTaskInput { @SuppressWarnings("unchecked") public StreamTaskNetworkInput( - CheckpointBarrierHandler barrierHandler, + CheckpointedInputGate barrierHandler, TypeSerializer<?> inputSerializer, IOManager ioManager, int inputIndex) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index 4efbc7f..aa6354d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -86,7 +86,7 @@ public class StreamTwoInputProcessor<IN1, IN2> { private final DeserializationDelegate<StreamElement> deserializationDelegate1; private final DeserializationDelegate<StreamElement> deserializationDelegate2; - private final CheckpointBarrierHandler barrierHandler; + private final CheckpointedInputGate barrierHandler; private final Object lock; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java index 54ce749..d5ebf29 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.java @@ -120,8 +120,9 @@ public class StreamTwoInputSelectableProcessor<IN1, IN2> { InputGate unionedInputGate2 = InputGateUtil.createInputGate(inputGates2.toArray(new InputGate[0])); // create a Input instance for each input - this.input1 = new StreamTaskNetworkInput(new BarrierDiscarder(unionedInputGate1), inputSerializer1, ioManager, 0); - this.input2 = new StreamTaskNetworkInput(new BarrierDiscarder(unionedInputGate2), inputSerializer2, ioManager, 1); + CachedBufferStorage bufferStorage = new CachedBufferStorage(unionedInputGate1.getPageSize()); + this.input1 = new StreamTaskNetworkInput(new BarrierBuffer(unionedInputGate1, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer1, ioManager, 0); + this.input2 = new StreamTaskNetworkInput(new BarrierBuffer(unionedInputGate2, bufferStorage, new CheckpointBarrierDiscarder()), inputSerializer2, ioManager, 1); this.statusWatermarkValve1 = new StatusWatermarkValve( unionedInputGate1.getNumberOfInputChannels(), diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java index e6e48a8..13c4aad 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierBufferTestBase.java @@ -483,7 +483,7 @@ public abstract class BarrierBufferTestBase { // align checkpoint 1 startTs = System.nanoTime(); check(sequence[7], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(1L, buffer.getCurrentCheckpointId()); + assertEquals(1L, buffer.getLatestCheckpointId()); // checkpoint done - replay buffered check(sequence[5], buffer.pollNext().get(), PAGE_SIZE); @@ -501,7 +501,7 @@ public abstract class BarrierBufferTestBase { // checkpoint 2 aborted, checkpoint 3 started check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(3L, buffer.getCurrentCheckpointId()); + assertEquals(3L, buffer.getLatestCheckpointId()); validateAlignmentTime(startTs, buffer.getAlignmentDurationNanos()); verify(toNotify).abortCheckpointOnBarrier(eq(2L), argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED))); @@ -565,7 +565,7 @@ public abstract class BarrierBufferTestBase { check(sequence[1], buffer.pollNext().get(), PAGE_SIZE); check(sequence[2], buffer.pollNext().get(), PAGE_SIZE); check(sequence[7], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(1L, buffer.getCurrentCheckpointId()); + assertEquals(1L, buffer.getLatestCheckpointId()); check(sequence[5], buffer.pollNext().get(), PAGE_SIZE); check(sequence[6], buffer.pollNext().get(), PAGE_SIZE); @@ -574,7 +574,7 @@ public abstract class BarrierBufferTestBase { // alignment of checkpoint 2 check(sequence[13], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(2L, buffer.getCurrentCheckpointId()); + assertEquals(2L, buffer.getLatestCheckpointId()); check(sequence[15], buffer.pollNext().get(), PAGE_SIZE); check(sequence[19], buffer.pollNext().get(), PAGE_SIZE); check(sequence[21], buffer.pollNext().get(), PAGE_SIZE); @@ -583,7 +583,7 @@ public abstract class BarrierBufferTestBase { // checkpoint 2 aborted, checkpoint 4 started. replay buffered check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(4L, buffer.getCurrentCheckpointId()); + assertEquals(4L, buffer.getLatestCheckpointId()); check(sequence[16], buffer.pollNext().get(), PAGE_SIZE); check(sequence[18], buffer.pollNext().get(), PAGE_SIZE); check(sequence[22], buffer.pollNext().get(), PAGE_SIZE); @@ -651,7 +651,7 @@ public abstract class BarrierBufferTestBase { check(sequence[1], buffer.pollNext().get(), PAGE_SIZE); check(sequence[2], buffer.pollNext().get(), PAGE_SIZE); check(sequence[7], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(1L, buffer.getCurrentCheckpointId()); + assertEquals(1L, buffer.getLatestCheckpointId()); check(sequence[5], buffer.pollNext().get(), PAGE_SIZE); check(sequence[6], buffer.pollNext().get(), PAGE_SIZE); check(sequence[9], buffer.pollNext().get(), PAGE_SIZE); @@ -660,7 +660,7 @@ public abstract class BarrierBufferTestBase { // alignment of checkpoint 2 check(sequence[13], buffer.pollNext().get(), PAGE_SIZE); check(sequence[22], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(2L, buffer.getCurrentCheckpointId()); + assertEquals(2L, buffer.getLatestCheckpointId()); // checkpoint 2 completed check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); @@ -669,7 +669,7 @@ public abstract class BarrierBufferTestBase { // checkpoint 3 skipped, alignment for 4 started check(sequence[18], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(4L, buffer.getCurrentCheckpointId()); + assertEquals(4L, buffer.getLatestCheckpointId()); check(sequence[21], buffer.pollNext().get(), PAGE_SIZE); check(sequence[24], buffer.pollNext().get(), PAGE_SIZE); check(sequence[26], buffer.pollNext().get(), PAGE_SIZE); @@ -790,24 +790,24 @@ public abstract class BarrierBufferTestBase { // checkpoint 3 alignment check(sequence[7], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(2L, buffer.getCurrentCheckpointId()); + assertEquals(2L, buffer.getLatestCheckpointId()); check(sequence[8], buffer.pollNext().get(), PAGE_SIZE); check(sequence[11], buffer.pollNext().get(), PAGE_SIZE); // checkpoint 3 buffered check(sequence[10], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(3L, buffer.getCurrentCheckpointId()); + assertEquals(3L, buffer.getLatestCheckpointId()); // after checkpoint 4 check(sequence[15], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(4L, buffer.getCurrentCheckpointId()); + assertEquals(4L, buffer.getLatestCheckpointId()); check(sequence[16], buffer.pollNext().get(), PAGE_SIZE); check(sequence[17], buffer.pollNext().get(), PAGE_SIZE); check(sequence[18], buffer.pollNext().get(), PAGE_SIZE); check(sequence[19], buffer.pollNext().get(), PAGE_SIZE); check(sequence[21], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(5L, buffer.getCurrentCheckpointId()); + assertEquals(5L, buffer.getLatestCheckpointId()); check(sequence[22], buffer.pollNext().get(), PAGE_SIZE); } @@ -837,11 +837,11 @@ public abstract class BarrierBufferTestBase { check(sequence[3], buffer.pollNext().get(), PAGE_SIZE); check(sequence[4], buffer.pollNext().get(), PAGE_SIZE); check(sequence[5], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(1L, buffer.getCurrentCheckpointId()); + assertEquals(1L, buffer.getLatestCheckpointId()); // alignment of second checkpoint check(sequence[10], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(2L, buffer.getCurrentCheckpointId()); + assertEquals(2L, buffer.getLatestCheckpointId()); // first end-of-partition encountered: checkpoint will not be completed check(sequence[12], buffer.pollNext().get(), PAGE_SIZE); @@ -874,7 +874,7 @@ public abstract class BarrierBufferTestBase { assertEquals(0L, buffer.getAlignmentDurationNanos()); check(sequence[6], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(5L, buffer.getCurrentCheckpointId()); + assertEquals(5L, buffer.getLatestCheckpointId()); verify(toNotify, times(1)).triggerCheckpointOnBarrier(argThat(new CheckpointMatcher(2L)), any(CheckpointOptions.class), any(CheckpointMetrics.class)); verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(4L), argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER))); @@ -882,7 +882,7 @@ public abstract class BarrierBufferTestBase { assertEquals(0L, buffer.getAlignmentDurationNanos()); check(sequence[8], buffer.pollNext().get(), PAGE_SIZE); - assertEquals(6L, buffer.getCurrentCheckpointId()); + assertEquals(6L, buffer.getLatestCheckpointId()); verify(toNotify, times(1)).abortCheckpointOnBarrier(eq(6L), argThat(new CheckpointExceptionMatcher(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER))); assertEquals(0L, buffer.getAlignmentDurationNanos()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java index a8e7727..5112f63 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/BarrierTrackerTest.java @@ -46,7 +46,7 @@ public class BarrierTrackerTest { private static final int PAGE_SIZE = 512; - private BarrierTracker tracker; + private CheckpointedInputGate tracker; @After public void ensureEmpty() throws Exception { @@ -365,16 +365,19 @@ public class BarrierTrackerTest { // ------------------------------------------------------------------------ // Utils // ------------------------------------------------------------------------ - private static BarrierTracker createBarrierTracker(int numberOfChannels, BufferOrEvent[] sequence) { + private static CheckpointedInputGate createBarrierTracker(int numberOfChannels, BufferOrEvent[] sequence) { return createBarrierTracker(numberOfChannels, sequence, null); } - private static BarrierTracker createBarrierTracker( + private static CheckpointedInputGate createBarrierTracker( int numberOfChannels, BufferOrEvent[] sequence, @Nullable AbstractInvokable toNotifyOnCheckpoint) { MockInputGate gate = new MockInputGate(PAGE_SIZE, numberOfChannels, Arrays.asList(sequence)); - return new BarrierTracker(gate, toNotifyOnCheckpoint); + return new BarrierBuffer( + gate, + new CachedBufferStorage(PAGE_SIZE, -1, "Testing"), + new CheckpointBarrierTracker(gate.getNumberOfInputChannels(), toNotifyOnCheckpoint)); } private static BufferOrEvent createBarrier(long id, int channel) {