rkhachatryan commented on a change in pull request #13465:
URL: https://github.com/apache/flink/pull/13465#discussion_r494181467



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/AvailabilityProvider.java
##########
@@ -66,6 +66,32 @@ default boolean isApproximatelyAvailable() {
                return getAvailableFuture() == AVAILABLE;
        }
 
+       static CompletableFuture<?> and(CompletableFuture<?> first, 
CompletableFuture<?> second) {
+               if (first == AVAILABLE) {
+                       if (second == AVAILABLE) {

Review comment:
       I think less `if` nesting would be more readabile:
   ```
   if (first == AVAILABLE && second == AVAILABLE) return AVAILABLE;
   else if (first == AVAILABLE) return second;
   else if (second == AVAILABLE) return first;
   else return allOf(first, second);
   ```

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -101,10 +101,8 @@ public void processBarrier(CheckpointBarrier barrier, 
InputChannelInfo channelIn
                        allBarriersReceivedFuture = new CompletableFuture<>();
                        checkpointCoordinator.initCheckpoint(barrierId, 
barrier.getCheckpointOptions());
 
-                       for (final InputGate gate : inputGates) {
-                               for (int index = 0, numChannels = 
gate.getNumberOfInputChannels(); index < numChannels; index++) {
-                                       
gate.getChannel(index).checkpointStarted(barrier);
-                               }
+                       for (final BlockableInput input : inputs) {
+                               input.checkpointStarted(barrier);

Review comment:
       Why do we block source inputs when we receive a barrier from a 
non-source input?
   (maybe a comment is missing here or for 
`StreamTaskSourceInput.checkpointStarted`)

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
##########
@@ -249,25 +259,34 @@ private void sendControlMail(RunnableWithException mail, 
String descriptionForma
        }
 
        /**
-        * This helper method handles all special actions from the mailbox. It 
returns true if the mailbox loop should
-        * continue running, false if it should stop. In the current design, 
this method also evaluates all control flag
-        * changes. This keeps the hot path in {@link #runMailboxLoop()} free 
from any other flag checking, at the cost
+        * This helper method handles all special actions from the mailbox.
+        * In the current design, this method also evaluates all control flag 
changes.
+        * This keeps the hot path in {@link #runMailboxLoop()} free from any 
other flag checking, at the cost
         * that all flag changes must make sure that the mailbox signals 
mailbox#hasMail.
+        *
+        * @return true if a mail has been processed.
         */
-       private boolean processMail(TaskMailbox mailbox) throws Exception {
-
+       private boolean processMail(TaskMailbox mailbox, boolean singleStep) 
throws Exception {
+               boolean processed = false;
                // Doing this check is an optimization to only have a volatile 
read in the expected hot path, locks are only
                // acquired after this point.
                if (!mailbox.createBatch()) {
                        // We can also directly return true because all changes 
to #isMailboxLoopRunning must be connected to
                        // mailbox.hasMail() == true.
-                       return true;
+                       return processed;

Review comment:
       Why not just return false here?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
##########
@@ -34,30 +40,75 @@
  * unavailable or finished.
  */
 @Internal
-public final class StreamTaskSourceInput<T> implements StreamTaskInput<T> {
+public final class StreamTaskSourceInput<T> implements StreamTaskInput<T>, 
BlockableInput {
 
        private final SourceOperator<T, ?> operator;
+       private final int inputGateIndex;
+       private final AvailabilityHelper isBlockedAvailability = new 
AvailabilityHelper();
+       private final List<InputChannelInfo> inputChannelInfos;
 
-       public StreamTaskSourceInput(SourceOperator<T, ?> operator) {
+       public StreamTaskSourceInput(SourceOperator<T, ?> operator, int 
inputGateIndex) {
                this.operator = checkNotNull(operator);
+               this.inputGateIndex = inputGateIndex;
+               inputChannelInfos = Collections.singletonList(new 
InputChannelInfo(inputGateIndex, 0));
+               isBlockedAvailability.resetAvailable();
        }
 
        @Override
        public InputStatus emitNext(DataOutput<T> output) throws Exception {
+               if (!isBlockedAvailability.isApproximatelyAvailable()) {

Review comment:
       nit: this is a bit difficult to read (maybe just invert the condition?)
   
   

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -56,12 +59,13 @@ public static CheckpointedInputGate 
createCheckpointedInputGate(
                        taskIOMetricGroup,
                        taskName,
                        mailboxExecutor,
-                       Arrays.asList(inputGates));
+                       new List[]{ Arrays.asList(inputGates) },

Review comment:
       How about using vararg parameter and replacing here with just 
`Arrays.asList(inputGates)`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
##########
@@ -249,25 +259,34 @@ private void sendControlMail(RunnableWithException mail, 
String descriptionForma
        }
 
        /**
-        * This helper method handles all special actions from the mailbox. It 
returns true if the mailbox loop should
-        * continue running, false if it should stop. In the current design, 
this method also evaluates all control flag
-        * changes. This keeps the hot path in {@link #runMailboxLoop()} free 
from any other flag checking, at the cost
+        * This helper method handles all special actions from the mailbox.
+        * In the current design, this method also evaluates all control flag 
changes.
+        * This keeps the hot path in {@link #runMailboxLoop()} free from any 
other flag checking, at the cost
         * that all flag changes must make sure that the mailbox signals 
mailbox#hasMail.
+        *
+        * @return true if a mail has been processed.
         */
-       private boolean processMail(TaskMailbox mailbox) throws Exception {
-
+       private boolean processMail(TaskMailbox mailbox, boolean singleStep) 
throws Exception {
+               boolean processed = false;
                // Doing this check is an optimization to only have a volatile 
read in the expected hot path, locks are only
                // acquired after this point.
                if (!mailbox.createBatch()) {
                        // We can also directly return true because all changes 
to #isMailboxLoopRunning must be connected to
                        // mailbox.hasMail() == true.
-                       return true;
+                       return processed;
                }
 
                // Take mails in a non-blockingly and execute them.
                Optional<Mail> maybeMail;
                while (isMailboxLoopRunning() && (maybeMail = 
mailbox.tryTakeFromBatch()).isPresent()) {
                        maybeMail.get().run();
+                       processed = true;
+                       if (singleStep) {
+                               break;
+                       }
+               }

Review comment:
       Why not just return `true` here?
   And then after the loop `if singleStep == true` we can also return false.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
##########
@@ -34,30 +40,75 @@
  * unavailable or finished.
  */
 @Internal
-public final class StreamTaskSourceInput<T> implements StreamTaskInput<T> {
+public final class StreamTaskSourceInput<T> implements StreamTaskInput<T>, 
BlockableInput {
 
        private final SourceOperator<T, ?> operator;
+       private final int inputGateIndex;
+       private final AvailabilityHelper isBlockedAvailability = new 
AvailabilityHelper();
+       private final List<InputChannelInfo> inputChannelInfos;
 
-       public StreamTaskSourceInput(SourceOperator<T, ?> operator) {
+       public StreamTaskSourceInput(SourceOperator<T, ?> operator, int 
inputGateIndex) {
                this.operator = checkNotNull(operator);
+               this.inputGateIndex = inputGateIndex;
+               inputChannelInfos = Collections.singletonList(new 
InputChannelInfo(inputGateIndex, 0));
+               isBlockedAvailability.resetAvailable();
        }
 
        @Override
        public InputStatus emitNext(DataOutput<T> output) throws Exception {
+               if (!isBlockedAvailability.isApproximatelyAvailable()) {
+                       // Safe guard

Review comment:
       nit: this comment doesn't say much to me

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +134,67 @@ protected void createInputProcessor(
                        operatorChain,
                        setupNumRecordsInCounter(mainOperator));
        }
+
+       @Override
+       public Future<Boolean> triggerCheckpointAsync(
+                       CheckpointMetaData metadata,
+                       CheckpointOptions options,
+                       boolean advanceToEndOfEventTime) {
+
+               CompletableFuture<Boolean> resultFuture = new 
CompletableFuture<>();
+               mainMailboxExecutor.execute(
+                       () -> {
+                               try {

Review comment:
       Shouldn't we also update `super.latestAsyncCheckpointStartDelayNanos`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -18,32 +18,48 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.InputSelectable;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
 import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
 import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
 import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
 import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
+import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
 import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkState;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 
 /**
  * A {@link StreamTask} for executing a {@link MultipleInputStreamOperator} 
and supporting
  * the {@link MultipleInputStreamOperator} to select input for reading.
  */
 @Internal
 public class MultipleInputStreamTask<OUT> extends StreamTask<OUT, 
MultipleInputStreamOperator<OUT>> {
+       private final HashMap<Long, CompletableFuture<Boolean>> 
pendingCheckpointCompletedFutures = new HashMap<>();

Review comment:
       I'm concerned about the cleanup of this map. 
   From the code, I see it's assumed at least one triggerOnBarrier or 
abortOnBarrier after triggerAsync, right?
   But can abort come after triggerAsync?
   Should we state these ordering assumptions?
   
   Or maybe we can just remove the map? I see the future result is only used by 
`SourceStreamTask` which is irrelevant here.

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/mailbox/MailboxProcessor.java
##########
@@ -178,17 +178,27 @@ public void runMailboxLoop() throws Exception {
 
                final MailboxController defaultActionContext = new 
MailboxController(this);
 
-               while (runMailboxStep(localMailbox, defaultActionContext)) {
+               while (isMailboxLoopRunning()) {

Review comment:
       This is a change in the production code, so I think it's better to not 
mark it as `[test]` in commit message (even though the motivation is to fix 
tests).

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +134,67 @@ protected void createInputProcessor(
                        operatorChain,
                        setupNumRecordsInCounter(mainOperator));
        }
+
+       @Override
+       public Future<Boolean> triggerCheckpointAsync(
+                       CheckpointMetaData metadata,
+                       CheckpointOptions options,
+                       boolean advanceToEndOfEventTime) {
+
+               CompletableFuture<Boolean> resultFuture = new 
CompletableFuture<>();
+               mainMailboxExecutor.execute(
+                       () -> {
+                               try {
+                                       
pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture);
+                                       triggerSourcesCheckpoint(new 
CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), 
options));
+                               }
+                               catch (Exception ex) {
+                                       // Report the failure both via the 
Future result but also to the mailbox
+                                       
pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId());
+                                       resultFuture.completeExceptionally(ex);
+                                       throw ex;
+                               }
+                       },
+                       "checkpoint %s with %s",
+                       metadata,
+                       options);
+               return resultFuture;
+       }
+
+       private void triggerSourcesCheckpoint(CheckpointBarrier 
checkpointBarrier) throws IOException {
+               for (StreamTaskSourceInput<?> sourceInput : 
operatorChain.getSourceTaskInputs()) {

Review comment:
       Shouldn't we differentiate for which `sourceInput` current barrier is 
(and call `processBarrier` only for it)?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/InputProcessorUtil.java
##########
@@ -56,12 +59,13 @@ public static CheckpointedInputGate 
createCheckpointedInputGate(
                        taskIOMetricGroup,
                        taskName,
                        mailboxExecutor,
-                       Arrays.asList(inputGates));
+                       new List[]{ Arrays.asList(inputGates) },

Review comment:
       Currently, they are array-of-lists and a list:
   ```
   List<IndexedInputGate>[] inputGates,
   List<StreamTaskSourceInput<?>> sourceInputs)
   ```
   So changing one array to vararg doesn't change consistency:
   ```
   List<StreamTaskSourceInput<?>> sourceInputs,
   List<IndexedInputGate>... inputGates)
   ```

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -18,32 +18,48 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.InputSelectable;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
 import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
 import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
 import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
 import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
+import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
 import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkState;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 
 /**
  * A {@link StreamTask} for executing a {@link MultipleInputStreamOperator} 
and supporting
  * the {@link MultipleInputStreamOperator} to select input for reading.
  */
 @Internal
 public class MultipleInputStreamTask<OUT> extends StreamTask<OUT, 
MultipleInputStreamOperator<OUT>> {
+       private final HashMap<Long, CompletableFuture<Boolean>> 
pendingCheckpointCompletedFutures = new HashMap<>();

Review comment:
       Without changing the signature, I think the only way is to rely on 
mailbox: after triggering, enqueue a mail and wait for its completion.
   Which is far from ideal, but at least it wouldn't affect production code 
path and is correct.
   WDYT?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
##########
@@ -101,10 +101,8 @@ public void processBarrier(CheckpointBarrier barrier, 
InputChannelInfo channelIn
                        allBarriersReceivedFuture = new CompletableFuture<>();
                        checkpointCoordinator.initCheckpoint(barrierId, 
barrier.getCheckpointOptions());
 
-                       for (final InputGate gate : inputGates) {
-                               for (int index = 0, numChannels = 
gate.getNumberOfInputChannels(); index < numChannels; index++) {
-                                       
gate.getChannel(index).checkpointStarted(barrier);
-                               }
+                       for (final BlockableInput input : inputs) {
+                               input.checkpointStarted(barrier);

Review comment:
       Thanks!

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -18,32 +18,51 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.InputSelectable;
 import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
+import org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler;
 import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
 import org.apache.flink.streaming.runtime.io.InputProcessorUtil;
 import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
 import org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor;
+import org.apache.flink.streaming.runtime.io.StreamTaskSourceInput;
 import org.apache.flink.streaming.runtime.metrics.MinWatermarkGauge;
 import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
-
-import static org.apache.flink.util.Preconditions.checkState;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
 
 /**
  * A {@link StreamTask} for executing a {@link MultipleInputStreamOperator} 
and supporting
  * the {@link MultipleInputStreamOperator} to select input for reading.
  */
 @Internal
 public class MultipleInputStreamTask<OUT> extends StreamTask<OUT, 
MultipleInputStreamOperator<OUT>> {
+       public static final int MAX_TRACKED_CHECKPOINTS = 100_000;

Review comment:
       Can this be private?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -114,4 +137,81 @@ protected void createInputProcessor(
                        operatorChain,
                        setupNumRecordsInCounter(mainOperator));
        }
+
+       @Override
+       public Future<Boolean> triggerCheckpointAsync(
+                       CheckpointMetaData metadata,
+                       CheckpointOptions options,
+                       boolean advanceToEndOfEventTime) {
+
+               CompletableFuture<Boolean> resultFuture = new 
CompletableFuture<>();
+               mainMailboxExecutor.execute(
+                       () -> {
+                               try {
+                                       /**
+                                        * Contrary to {@link 
SourceStreamTask}, we are not using here
+                                        * {@link 
StreamTask#latestAsyncCheckpointStartDelayNanos} to measure the start delay
+                                        * metric, but we will be using {@link 
CheckpointBarrierHandler#getCheckpointStartDelayNanos()}
+                                        * instead.
+                                        */
+                                       
pendingCheckpointCompletedFutures.put(metadata.getCheckpointId(), resultFuture);
+                                       
checkPendingCheckpointCompletedFuturesSize();
+                                       triggerSourcesCheckpoint(new 
CheckpointBarrier(metadata.getCheckpointId(), metadata.getTimestamp(), 
options));
+                               }
+                               catch (Exception ex) {
+                                       // Report the failure both via the 
Future result but also to the mailbox
+                                       
pendingCheckpointCompletedFutures.remove(metadata.getCheckpointId());
+                                       resultFuture.completeExceptionally(ex);
+                                       throw ex;
+                               }
+                       },
+                       "checkpoint %s with %s",
+                       metadata,
+                       options);
+               return resultFuture;
+       }
+
+       private void checkPendingCheckpointCompletedFuturesSize() {
+               while (pendingCheckpointCompletedFutures.size() > 
MAX_TRACKED_CHECKPOINTS) {
+                       Long minCheckpointID = 
Collections.min(pendingCheckpointCompletedFutures.keySet());

Review comment:
       I think we can end up with `O(MAX_TRACKED_CHECKPOINTS ^ 2)` here, right?
   Even though it shouldn't happen normally (if map is cleared by 
triggerOnBarrier/abort), I think it's better to cap it by either using 
`LinkedHashMap`, `TreeMap`, or just reducing 
   `MAX_TRACKED_CHECKPOINTS` to something very small.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to