Haibo Sun created FLINK-16174:
---------------------------------

             Summary: Add a better tryYield() method to MailboxExecutor to 
return the lowest priority of the remaining mails
                 Key: FLINK-16174
                 URL: https://issues.apache.org/jira/browse/FLINK-16174
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Task
    Affects Versions: 1.10.0
            Reporter: Haibo Sun


Currently, we use chainIndex as the priority to create MailboxExecutor to 
process its mails. When MailboxExecutor#tryYield  is called to process mails, 
it will take the mails of this operator and all downstream operators in the 
chain. But sometimes, after calling MailboxExecutor#tryYield, we need to know 
whether there is any mail of the current operator in the mailbox, which can 
simplify some operations. 

For example, when we close a operator in runtime, after quiescing the 
processing time service and waiting for its running timers to finish, if there 
is no mail of the current operator in the mailbox,  we call 
StreamOperator#close to close the operator. Then the runtime code of closing a 
operator can be simplified as follows.
{code:java}
                quiesceProcessingTimeService().get();
                while (mailboxExecuto.betterTryYield() <= self.priority) {}
                closeOperator(actionExecutor);
{code}

With the existing #tryYield method, if the following simplified code is used to 
close a operator, then when a downstream operator is implemented like 
MailboxOperatorTest.ReplicatingMail, the tryyield() loop will 
 be prevented from exiting, which results deadlock.

{code:java}
                quiesceProcessingTimeService().get();
                while (mailboxExecuto.tryYield()) {}
                closeOperator(actionExecutor);
{code}





--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to