pnowojski commented on a change in pull request #13465: URL: https://github.com/apache/flink/pull/13465#discussion_r494376371
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java ########## @@ -114,4 +137,81 @@ protected void createInputProcessor( operatorChain, setupNumRecordsInCounter(mainOperator)); } + + @Override + public Future<Boolean> triggerCheckpointAsync( + CheckpointMetaData metadata, + CheckpointOptions options, + boolean advanceToEndOfEventTime) { + + CompletableFuture<Boolean> resultFuture = new CompletableFuture<>(); + mainMailboxExecutor.execute( + () -> { + try { + /** + * Contrary to {@link SourceStreamTask}, we are not using here + * {@link StreamTask#latestAsyncCheckpointStartDelayNanos} to measure the start delay + * metric, but we will be using {@link CheckpointBarrierHandler#getCheckpointStartDelayNanos()} + * instead. + */ + pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture); + checkPendingCheckpointCompletedFuturesSize(); + triggerSourcesCheckpoint(new CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), options)); + } + catch (Exception ex) { + // Report the failure both via the Future result but also to the mailbox + pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId()); + resultFuture.completeExceptionally(ex); + throw ex; + } + }, + "checkpoint %s with %s", + metadata, + options); + return resultFuture; + } + + private void checkPendingCheckpointCompletedFuturesSize() { + while (pendingCheckpointCompletedFutures.size() > MAX_TRACKED_CHECKPOINTS) { + Long minCheckpointID = Collections.min(pendingCheckpointCompletedFutures.keySet()); Review comment: Ehhhh... ok, but that's overengineering a bit :) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org