rkhachatryan commented on a change in pull request #13465: URL: https://github.com/apache/flink/pull/13465#discussion_r494181467
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/io/AvailabilityProvider.java ########## @@ -66,6 +66,32 @@ default boolean isApproximatelyAvailable() { return getAvailableFuture() == AVAILABLE; } + static CompletableFuture<?> and(CompletableFuture<?> first, CompletableFuture<?> second) { + if (first == AVAILABLE) { + if (second == AVAILABLE) { Review comment: I think less `if` nesting would be more readabile: ``` if (first == AVAILABLE && second == AVAILABLE) return AVAILABLE; else if (first == AVAILABLE) return second; else if (second == AVAILABLE) return first; else return allOf(first, second); ``` ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java ########## @@ -101,10 +101,8 @@ public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelIn allBarriersReceivedFuture = new CompletableFuture<>(); checkpointCoordinator.initCheckpoint(barrierId, barrier.getCheckpointOptions()); - for (final InputGate gate : inputGates) { - for (int index = 0, numChannels = gate.getNumberOfInputChannels(); index < numChannels; index++) { - gate.getChannel(index).checkpointStarted(barrier); - } + for (final BlockableInput input : inputs) { + input.checkpointStarted(barrier); Review comment: Why do we block source inputs when we receive a barrier from a non-source input? (maybe a comment is missing here or for `StreamTaskSourceInput.checkpointStarted`) ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java ########## @@ -249,25 +259,34 @@ private void sendControlMail(RunnableWithException mail, String descriptionForma } /** - * This helper method handles all special actions from the mailbox. It returns true if the mailbox loop should - * continue running, false if it should stop. In the current design, this method also evaluates all control flag - * changes. This keeps the hot path in {@link #runMailboxLoop()} free from any other flag checking, at the cost + * This helper method handles all special actions from the mailbox. + * In the current design, this method also evaluates all control flag changes. + * This keeps the hot path in {@link #runMailboxLoop()} free from any other flag checking, at the cost * that all flag changes must make sure that the mailbox signals mailbox#hasMail. + * + * @return true if a mail has been processed. */ - private boolean processMail(TaskMailbox mailbox) throws Exception { - + private boolean processMail(TaskMailbox mailbox, boolean singleStep) throws Exception { + boolean processed = false; // Doing this check is an optimization to only have a volatile read in the expected hot path, locks are only // acquired after this point. if (!mailbox.createBatch()) { // We can also directly return true because all changes to #isMailboxLoopRunning must be connected to // mailbox.hasMail() == true. - return true; + return processed; Review comment: Why not just return false here? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java ########## @@ -34,30 +40,75 @@ * unavailable or finished. */ @Internal -public final class StreamTaskSourceInput<T> implements StreamTaskInput<T> { +public final class StreamTaskSourceInput<T> implements StreamTaskInput<T>, BlockableInput { private final SourceOperator<T, ?> operator; + private final int inputGateIndex; + private final AvailabilityHelper isBlockedAvailability = new AvailabilityHelper(); + private final List<InputChannelInfo> inputChannelInfos; - public StreamTaskSourceInput(SourceOperator<T, ?> operator) { + public StreamTaskSourceInput(SourceOperator<T, ?> operator, int inputGateIndex) { this.operator = checkNotNull(operator); + this.inputGateIndex = inputGateIndex; + inputChannelInfos = Collections.singletonList(new InputChannelInfo(inputGateIndex, 0)); + isBlockedAvailability.resetAvailable(); } @Override public InputStatus emitNext(DataOutput<T> output) throws Exception { + if (!isBlockedAvailability.isApproximatelyAvailable()) { Review comment: nit: this is a bit difficult to read (maybe just invert the condition?) ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java ########## @@ -56,12 +59,13 @@ public static CheckpointedInputGate createCheckpointedInputGate( taskIOMetricGroup, taskName, mailboxExecutor, - Arrays.asList(inputGates)); + new List[]{ Arrays.asList(inputGates) }, Review comment: How about using vararg parameter and replacing here with just `Arrays.asList(inputGates)`? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java ########## @@ -249,25 +259,34 @@ private void sendControlMail(RunnableWithException mail, String descriptionForma } /** - * This helper method handles all special actions from the mailbox. It returns true if the mailbox loop should - * continue running, false if it should stop. In the current design, this method also evaluates all control flag - * changes. This keeps the hot path in {@link #runMailboxLoop()} free from any other flag checking, at the cost + * This helper method handles all special actions from the mailbox. + * In the current design, this method also evaluates all control flag changes. + * This keeps the hot path in {@link #runMailboxLoop()} free from any other flag checking, at the cost * that all flag changes must make sure that the mailbox signals mailbox#hasMail. + * + * @return true if a mail has been processed. */ - private boolean processMail(TaskMailbox mailbox) throws Exception { - + private boolean processMail(TaskMailbox mailbox, boolean singleStep) throws Exception { + boolean processed = false; // Doing this check is an optimization to only have a volatile read in the expected hot path, locks are only // acquired after this point. if (!mailbox.createBatch()) { // We can also directly return true because all changes to #isMailboxLoopRunning must be connected to // mailbox.hasMail() == true. - return true; + return processed; } // Take mails in a non-blockingly and execute them. Optional<Mail> maybeMail; while (isMailboxLoopRunning() && (maybeMail = mailbox.tryTakeFromBatch()).isPresent()) { maybeMail.get().run(); + processed = true; + if (singleStep) { + break; + } + } Review comment: Why not just return `true` here? And then after the loop `if singleStep == true` we can also return false. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java ########## @@ -34,30 +40,75 @@ * unavailable or finished. */ @Internal -public final class StreamTaskSourceInput<T> implements StreamTaskInput<T> { +public final class StreamTaskSourceInput<T> implements StreamTaskInput<T>, BlockableInput { private final SourceOperator<T, ?> operator; + private final int inputGateIndex; + private final AvailabilityHelper isBlockedAvailability = new AvailabilityHelper(); + private final List<InputChannelInfo> inputChannelInfos; - public StreamTaskSourceInput(SourceOperator<T, ?> operator) { + public StreamTaskSourceInput(SourceOperator<T, ?> operator, int inputGateIndex) { this.operator = checkNotNull(operator); + this.inputGateIndex = inputGateIndex; + inputChannelInfos = Collections.singletonList(new InputChannelInfo(inputGateIndex, 0)); + isBlockedAvailability.resetAvailable(); } @Override public InputStatus emitNext(DataOutput<T> output) throws Exception { + if (!isBlockedAvailability.isApproximatelyAvailable()) { + // Safe guard Review comment: nit: this comment doesn't say much to me ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java ########## @@ -114,4 +134,67 @@ protected void createInputProcessor( operatorChain, setupNumRecordsInCounter(mainOperator)); } + + @Override + public Future<Boolean> triggerCheckpointAsync( + CheckpointMetaData metadata, + CheckpointOptions options, + boolean advanceToEndOfEventTime) { + + CompletableFuture<Boolean> resultFuture = new CompletableFuture<>(); + mainMailboxExecutor.execute( + () -> { + try { Review comment: Shouldn't we also update `super.latestAsyncCheckpointStartDelayNanos`? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java ########## @@ -18,32 +18,48 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.InputSelectable; import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler; import org.apache.flink.streaming.runtime.io.CheckpointedInputGate; import org.apache.flink.streaming.runtime.io.InputProcessorUtil; import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler; import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor; +import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput; import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge; import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; +import javax.annotation.Nullable; + +import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; - -import static org.apache.flink.util.Preconditions.checkState; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; /** * A {@link StreamTask} for executing a {@link MultipleInputStreamOperator} and supporting * the {@link MultipleInputStreamOperator} to select input for reading. */ @Internal public class MultipleInputStreamTask<OUT> extends StreamTask<OUT, MultipleInputStreamOperator<OUT>> { + private final HashMap<Long, CompletableFuture<Boolean>> pendingCheckpointCompletedFutures = new HashMap<>(); Review comment: I'm concerned about the cleanup of this map. From the code, I see it's assumed at least one triggerOnBarrier or abortOnBarrier after triggerAsync, right? But can abort come after triggerAsync? Should we state these ordering assumptions? Or maybe we can just remove the map? I see the future result is only used by `SourceStreamTask` which is irrelevant here. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java ########## @@ -178,17 +178,27 @@ public void runMailboxLoop() throws Exception { final MailboxController defaultActionContext = new MailboxController(this); - while (runMailboxStep(localMailbox, defaultActionContext)) { + while (isMailboxLoopRunning()) { Review comment: This is a change in the production code, so I think it's better to not mark it as `[test]` in commit message (even though the motivation is to fix tests). ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java ########## @@ -114,4 +134,67 @@ protected void createInputProcessor( operatorChain, setupNumRecordsInCounter(mainOperator)); } + + @Override + public Future<Boolean> triggerCheckpointAsync( + CheckpointMetaData metadata, + CheckpointOptions options, + boolean advanceToEndOfEventTime) { + + CompletableFuture<Boolean> resultFuture = new CompletableFuture<>(); + mainMailboxExecutor.execute( + () -> { + try { + pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture); + triggerSourcesCheckpoint(new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options)); + } + catch (Exception ex) { + // Report the failure both via the Future result but also to the mailbox + pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId()); + resultFuture.completeExceptionally(ex); + throw ex; + } + }, + "checkpoint %s with %s", + metadata, + options); + return resultFuture; + } + + private void triggerSourcesCheckpoint(CheckpointBarrier checkpointBarrier) throws IOException { + for (StreamTaskSourceInput<?> sourceInput : operatorChain.getSourceTaskInputs()) { Review comment: Shouldn't we differentiate for which `sourceInput` current barrier is (and call `processBarrier` only for it)? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java ########## @@ -56,12 +59,13 @@ public static CheckpointedInputGate createCheckpointedInputGate( taskIOMetricGroup, taskName, mailboxExecutor, - Arrays.asList(inputGates)); + new List[]{ Arrays.asList(inputGates) }, Review comment: Currently, they are array-of-lists and a list: ``` List<IndexedInputGate>[] inputGates, List<StreamTaskSourceInput<?>> sourceInputs) ``` So changing one array to vararg doesn't change consistency: ``` List<StreamTaskSourceInput<?>> sourceInputs, List<IndexedInputGate>... inputGates) ``` ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java ########## @@ -18,32 +18,48 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.InputSelectable; import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler; import org.apache.flink.streaming.runtime.io.CheckpointedInputGate; import org.apache.flink.streaming.runtime.io.InputProcessorUtil; import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler; import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor; +import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput; import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge; import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; +import javax.annotation.Nullable; + +import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; - -import static org.apache.flink.util.Preconditions.checkState; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; /** * A {@link StreamTask} for executing a {@link MultipleInputStreamOperator} and supporting * the {@link MultipleInputStreamOperator} to select input for reading. */ @Internal public class MultipleInputStreamTask<OUT> extends StreamTask<OUT, MultipleInputStreamOperator<OUT>> { + private final HashMap<Long, CompletableFuture<Boolean>> pendingCheckpointCompletedFutures = new HashMap<>(); Review comment: Without changing the signature, I think the only way is to rely on mailbox: after triggering, enqueue a mail and wait for its completion. Which is far from ideal, but at least it wouldn't affect production code path and is correct. WDYT? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java ########## @@ -101,10 +101,8 @@ public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelIn allBarriersReceivedFuture = new CompletableFuture<>(); checkpointCoordinator.initCheckpoint(barrierId, barrier.getCheckpointOptions()); - for (final InputGate gate : inputGates) { - for (int index = 0, numChannels = gate.getNumberOfInputChannels(); index < numChannels; index++) { - gate.getChannel(index).checkpointStarted(barrier); - } + for (final BlockableInput input : inputs) { + input.checkpointStarted(barrier); Review comment: Thanks! ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java ########## @@ -18,32 +18,51 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.InputSelectable; import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler; import org.apache.flink.streaming.runtime.io.CheckpointedInputGate; import org.apache.flink.streaming.runtime.io.InputProcessorUtil; import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler; import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor; +import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput; import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge; import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; +import javax.annotation.Nullable; + +import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; - -import static org.apache.flink.util.Preconditions.checkState; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; /** * A {@link StreamTask} for executing a {@link MultipleInputStreamOperator} and supporting * the {@link MultipleInputStreamOperator} to select input for reading. */ @Internal public class MultipleInputStreamTask<OUT> extends StreamTask<OUT, MultipleInputStreamOperator<OUT>> { + public static final int MAX_TRACKED_CHECKPOINTS = 100_000; Review comment: Can this be private? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java ########## @@ -114,4 +137,81 @@ protected void createInputProcessor( operatorChain, setupNumRecordsInCounter(mainOperator)); } + + @Override + public Future<Boolean> triggerCheckpointAsync( + CheckpointMetaData metadata, + CheckpointOptions options, + boolean advanceToEndOfEventTime) { + + CompletableFuture<Boolean> resultFuture = new CompletableFuture<>(); + mainMailboxExecutor.execute( + () -> { + try { + /** + * Contrary to {@link SourceStreamTask}, we are not using here + * {@link StreamTask#latestAsyncCheckpointStartDelayNanos} to measure the start delay + * metric, but we will be using {@link CheckpointBarrierHandler#getCheckpointStartDelayNanos()} + * instead. + */ + pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture); + checkPendingCheckpointCompletedFuturesSize(); + triggerSourcesCheckpoint(new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options)); + } + catch (Exception ex) { + // Report the failure both via the Future result but also to the mailbox + pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId()); + resultFuture.completeExceptionally(ex); + throw ex; + } + }, + "checkpoint %s with %s", + metadata, + options); + return resultFuture; + } + + private void checkPendingCheckpointCompletedFuturesSize() { + while (pendingCheckpointCompletedFutures.size() > MAX_TRACKED_CHECKPOINTS) { + Long minCheckpointID = Collections.min(pendingCheckpointCompletedFutures.keySet()); Review comment: I think we can end up with `O(MAX_TRACKED_CHECKPOINTS ^ 2)` here, right? Even though it shouldn't happen normally (if map is cleared by triggerOnBarrier/abort), I think it's better to cap it by either using `LinkedHashMap`, `TreeMap`, or just reducing `MAX_TRACKED_CHECKPOINTS` to something very small. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org