Haibo Sun created FLINK-16174:
---------------------------------
Summary: Add a better tryYield() method to MailboxExecutor to
return the lowest priority of the remaining mails
Key: FLINK-16174
URL: https://issues.apache.org/jira/browse/FLINK-16174
Project: Flink
Issue Type: Improvement
Components: Runtime / Task
Affects Versions: 1.10.0
Reporter: Haibo Sun
Currently, we use chainIndex as the priority to create MailboxExecutor to
process its mails. When MailboxExecutor#tryYield is called to process mails,
it will take the mails of this operator and all downstream operators in the
chain. But sometimes, after calling MailboxExecutor#tryYield, we need to know
whether there is any mail of the current operator in the mailbox, which can
simplify some operations.
For example, when we close a operator in runtime, after quiescing the
processing time service and waiting for its running timers to finish, if there
is no mail of the current operator in the mailbox, we call
StreamOperator#close to close the operator. Then the runtime code of closing a
operator can be simplified as follows.
{code:java}
quiesceProcessingTimeService().get();
while (mailboxExecuto.betterTryYield() <= self.priority) {}
closeOperator(actionExecutor);
{code}
With the existing #tryYield method, if the following simplified code is used to
close a operator, then when a downstream operator is implemented like
MailboxOperatorTest.ReplicatingMail, the tryyield() loop will
be prevented from exiting, which results deadlock.
{code:java}
quiesceProcessingTimeService().get();
while (mailboxExecuto.tryYield()) {}
closeOperator(actionExecutor);
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)