akalash commented on a change in pull request #15375:
URL: https://github.com/apache/flink/pull/15375#discussion_r604760303



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -535,41 +545,82 @@ protected void beforeInvoke() throws Exception {
         // -------- Invoke --------
         LOG.debug("Invoking {}", getName());
 
+        SuspendableMailboxLoop suspendableLoop = 
mailboxProcessor.getSuspendableLoop();
+
         // we need to make sure that any triggers scheduled in open() cannot be
         // executed before all operators are opened
-        actionExecutor.runThrowing(
-                () -> {
-                    SequentialChannelStateReader reader =
-                            getEnvironment()
-                                    .getTaskStateManager()
-                                    .getSequentialChannelStateReader();
-                    reader.readOutputData(
-                            getEnvironment().getAllWriters(),
-                            !configuration.isGraphContainingLoops());
-
-                    operatorChain.initializeStateAndOpenOperators(
-                            createStreamTaskStateInitializer());
-
-                    channelIOExecutor.execute(
-                            () -> {
-                                try {
-                                    
reader.readInputData(getEnvironment().getAllInputGates());
-                                } catch (Exception e) {
-                                    asyncExceptionHandler.handleAsyncException(
-                                            "Unable to read channel state", e);
-                                }
-                            });
+        CompletableFuture<Void> allGatesRecoveredFuture =
+                actionExecutor.call(
+                        () -> {
+                            SequentialChannelStateReader reader =
+                                    getEnvironment()
+                                            .getTaskStateManager()
+                                            .getSequentialChannelStateReader();
+                            reader.readOutputData(
+                                    getEnvironment().getAllWriters(),
+                                    !configuration.isGraphContainingLoops());
+
+                            operatorChain.initializeStateAndOpenOperators(
+                                    createStreamTaskStateInitializer());
+
+                            IndexedInputGate[] inputGates = 
getEnvironment().getAllInputGates();
+                            channelIOExecutor.execute(
+                                    () -> {
+                                        try {
+                                            reader.readInputData(inputGates);
+                                        } catch (Exception e) {
+                                            
asyncExceptionHandler.handleAsyncException(
+                                                    "Unable to read channel 
state", e);
+                                        }
+                                    });
+
+                            List<CompletableFuture<?>> recoveredFutures =
+                                    new ArrayList<>(inputGates.length);
+                            for (InputGate inputGate : inputGates) {
+                                
recoveredFutures.add(inputGate.getStateConsumedFuture());
+
+                                inputGate
+                                        .getStateConsumedFuture()
+                                        .thenRun(
+                                                () ->
+                                                        
mainMailboxExecutor.execute(
+                                                                
inputGate::requestPartitions,
+                                                                "Input gate 
request partitions"));
+                            }
 
-                    for (InputGate inputGate : 
getEnvironment().getAllInputGates()) {
-                        inputGate
-                                .getStateConsumedFuture()
-                                .thenRun(
-                                        () ->
-                                                mainMailboxExecutor.execute(
-                                                        
inputGate::requestPartitions,
-                                                        "Input gate request 
partitions"));
-                    }
-                });
+                            return gatesRecoveredFuture(suspendableLoop, 
recoveredFutures);
+                        });
+
+        // Run mailbox until all gates will be recovered.
+        do {
+            suspendableLoop.run();
+        } while (isMailboxLoopRunning() && !allGatesRecoveredFuture.isDone());

Review comment:
       I actually agree that we can do something like that:
   ```
   do {
     mailboxProcessor.runMailboxStep(singleStep = true);
   } while (isMailboxLoopRunning() && !allGatesRecoveredFuture.isDone())
   ```
   But I am not fully sure about **singleStep = false**. Doesn't it lead to 
potentially infinity loop in processMailsWhenDefaultActionUnavailable if for 
example it doesn't have default action until state is not running but state is 
not running because we wait for at least one default action after 
allGatesRecoveredFuture was set to true? Or doesn't my example make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to