zhijiangW commented on a change in pull request #11687:
URL: https://github.com/apache/flink/pull/11687#discussion_r420178858



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -464,6 +472,20 @@ protected void beforeInvoke() throws Exception {
                                        
writer.readRecoveredState(getEnvironment().getTaskStateManager().getChannelStateReader());
                                }
                        }
+
+                       // It would get possible benefits to recovery input 
side after output side, which guarantees the
+                       // output can request more floating buffers from global 
firstly.
+                       InputGate[] inputGates = 
getEnvironment().getAllInputGates();
+                       if (inputGates != null) {
+                               for (InputGate inputGate : inputGates) {
+                                       
inputGate.readRecoveredState(channelIOExecutor, 
getEnvironment().getTaskStateManager().getChannelStateReader());
+                               }
+
+                               // Note that we must request partition after 
all the single gate finishes recovery.
+                               for (InputGate inputGate : inputGates) {
+                                       
inputGate.requestPartitions(channelIOExecutor);
+                               }

Review comment:
       Yeah, actually I also considered the way of requesting partition by 
mailbox thread after all futures completed returned by 
`inputGate.readRecoveredState`.
   
   But I also thought of another existing case to execute partition request by 
non-task main thread. During `SingleInputGate#updateInputChannel`, when the 
unknown channel transform into local or remote channel, then it would request 
partition directly by rpc thread.  So this case makes sense, then my assumption 
was that partition request actually can be executed by any other threads 
without race condition issues. So I take the current way instead to save some 
efforts.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to