rkhachatryan commented on a change in pull request #13465: URL: https://github.com/apache/flink/pull/13465#discussion_r494366315
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java ########## @@ -18,32 +18,51 @@ package org.apache.flink.streaming.runtime.tasks; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig; import org.apache.flink.streaming.api.graph.StreamEdge; import org.apache.flink.streaming.api.operators.InputSelectable; import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator; +import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler; import org.apache.flink.streaming.runtime.io.CheckpointedInputGate; import org.apache.flink.streaming.runtime.io.InputProcessorUtil; import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler; import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor; +import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput; import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge; import org.apache.flink.streaming.runtime.metrics.WatermarkGauge; +import javax.annotation.Nullable; + +import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.List; - -import static org.apache.flink.util.Preconditions.checkState; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; /** * A {@link StreamTask} for executing a {@link MultipleInputStreamOperator} and supporting * the {@link MultipleInputStreamOperator} to select input for reading. */ @Internal public class MultipleInputStreamTask<OUT> extends StreamTask<OUT, MultipleInputStreamOperator<OUT>> { + public static final int MAX_TRACKED_CHECKPOINTS = 100_000; Review comment: Can this be private? ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java ########## @@ -114,4 +137,81 @@ protected void createInputProcessor( operatorChain, setupNumRecordsInCounter(mainOperator)); } + + @Override + public Future<Boolean> triggerCheckpointAsync( + CheckpointMetaData metadata, + CheckpointOptions options, + boolean advanceToEndOfEventTime) { + + CompletableFuture<Boolean> resultFuture = new CompletableFuture<>(); + mainMailboxExecutor.execute( + () -> { + try { + /** + * Contrary to {@link SourceStreamTask}, we are not using here + * {@link StreamTask#latestAsyncCheckpointStartDelayNanos} to measure the start delay + * metric, but we will be using {@link CheckpointBarrierHandler#getCheckpointStartDelayNanos()} + * instead. + */ + pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture); + checkPendingCheckpointCompletedFuturesSize(); + triggerSourcesCheckpoint(new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options)); + } + catch (Exception ex) { + // Report the failure both via the Future result but also to the mailbox + pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId()); + resultFuture.completeExceptionally(ex); + throw ex; + } + }, + "checkpoint %s with %s", + metadata, + options); + return resultFuture; + } + + private void checkPendingCheckpointCompletedFuturesSize() { + while (pendingCheckpointCompletedFutures.size() > MAX_TRACKED_CHECKPOINTS) { + Long minCheckpointID = Collections.min(pendingCheckpointCompletedFutures.keySet()); Review comment: I think we can end up with `O(MAX_TRACKED_CHECKPOINTS ^ 2)` here, right? Even though it shouldn't happen normally (if map is cleared by triggerOnBarrier/abort), I think it's better to cap it by either using `LinkedHashMap`, `TreeMap`, or just reducing `MAX_TRACKED_CHECKPOINTS` to something very small. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org