pnowojski commented on a change in pull request #13465:
URL: https://github.com/apache/flink/pull/13465#discussion_r494376371



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +137,81 @@ protected void createInputProcessor(
                        operatorChain,
                        setupNumRecordsInCounter(mainOperator));
        }
+
+       @Override
+       public Future<Boolean> triggerCheckpointAsync(
+                       CheckpointMetaData metadata,
+                       CheckpointOptions options,
+                       boolean advanceToEndOfEventTime) {
+
+               CompletableFuture<Boolean> resultFuture = new 
CompletableFuture<>();
+               mainMailboxExecutor.execute(
+                       () -> {
+                               try {
+                                       /**
+                                        * Contrary to {@link 
SourceStreamTask}, we are not using here
+                                        * {@link 
StreamTask#latestAsyncCheckpointStartDelayNanos} to measure the start delay
+                                        * metric, but we will be using {@link 
CheckpointBarrierHandler#getCheckpointStartDelayNanos()}
+                                        * instead.
+                                        */
+                                       
pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture);
+                                       
checkPendingCheckpointCompletedFuturesSize();
+                                       triggerSourcesCheckpoint(new 
CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), 
options));
+                               }
+                               catch (Exception ex) {
+                                       // Report the failure both via the 
Future result but also to the mailbox
+                                       
pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId());
+                                       resultFuture.completeExceptionally(ex);
+                                       throw ex;
+                               }
+                       },
+                       "checkpoint %s with %s",
+                       metadata,
+                       options);
+               return resultFuture;
+       }
+
+       private void checkPendingCheckpointCompletedFuturesSize() {
+               while (pendingCheckpointCompletedFutures.size() > 
MAX_TRACKED_CHECKPOINTS) {
+                       Long minCheckpointID = 
Collections.min(pendingCheckpointCompletedFutures.keySet());

Review comment:
       Ehhhh... ok, but that's overengineering a bit :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to