rkhachatryan commented on a change in pull request #13465:
URL: https://github.com/apache/flink/pull/13465#discussion_r494366315



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -18,32 +18,51 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.InputSelectable;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
 import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
 import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
 import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
 import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
+import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
 import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkState;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 
 /**
  * A {@link StreamTask} for executing a {@link MultipleInputStreamOperator} 
and supporting
  * the {@link MultipleInputStreamOperator} to select input for reading.
  */
 @Internal
 public class MultipleInputStreamTask<OUT> extends StreamTask<OUT, 
MultipleInputStreamOperator<OUT>> {
+       public static final int MAX_TRACKED_CHECKPOINTS = 100_000;

Review comment:
       Can this be private?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +137,81 @@ protected void createInputProcessor(
                        operatorChain,
                        setupNumRecordsInCounter(mainOperator));
        }
+
+       @Override
+       public Future<Boolean> triggerCheckpointAsync(
+                       CheckpointMetaData metadata,
+                       CheckpointOptions options,
+                       boolean advanceToEndOfEventTime) {
+
+               CompletableFuture<Boolean> resultFuture = new 
CompletableFuture<>();
+               mainMailboxExecutor.execute(
+                       () -> {
+                               try {
+                                       /**
+                                        * Contrary to {@link 
SourceStreamTask}, we are not using here
+                                        * {@link 
StreamTask#latestAsyncCheckpointStartDelayNanos} to measure the start delay
+                                        * metric, but we will be using {@link 
CheckpointBarrierHandler#getCheckpointStartDelayNanos()}
+                                        * instead.
+                                        */
+                                       
pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture);
+                                       
checkPendingCheckpointCompletedFuturesSize();
+                                       triggerSourcesCheckpoint(new 
CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), 
options));
+                               }
+                               catch (Exception ex) {
+                                       // Report the failure both via the 
Future result but also to the mailbox
+                                       
pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId());
+                                       resultFuture.completeExceptionally(ex);
+                                       throw ex;
+                               }
+                       },
+                       "checkpoint %s with %s",
+                       metadata,
+                       options);
+               return resultFuture;
+       }
+
+       private void checkPendingCheckpointCompletedFuturesSize() {
+               while (pendingCheckpointCompletedFutures.size() > 
MAX_TRACKED_CHECKPOINTS) {
+                       Long minCheckpointID = 
Collections.min(pendingCheckpointCompletedFutures.keySet());

Review comment:
       I think we can end up with `O(MAX_TRACKED_CHECKPOINTS ^ 2)` here, right?
   Even though it shouldn't happen normally (if map is cleared by 
triggerOnBarrier/abort), I think it's better to cap it by either using 
`LinkedHashMap`, `TreeMap`, or just reducing 
   `MAX_TRACKED_CHECKPOINTS` to something very small.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to