[ 
https://issues.apache.org/jira/browse/FLINK-12887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16869118#comment-16869118
 ] 

Xiaogang Shi commented on FLINK-12887:
--------------------------------------

[~till.rohrmann] I am also very curious on the method we implement delayed 
{{runAsync}} operations. Now we first send the {{runAsync}} message into the 
actor and then schedule the operation with Akka dispatcher. There are two 
questions in the implementation:
1. It seems unnecessary to send the {{runAsync}} message to the actor at first. 
Can we simply schedule the message with Akka dispatcher?
2. The token is enveloped again by the actor. Rarely but possibily, the token 
at submit time is different from the one at envelope time:
       t1. A rpc endpoint submit a {{runAsync}} operation
       t2. The rpc endpoint loses its leadership
       t3. The rpc endpoint grants its leadership, creating a new fencing token 
       t4. The {{runAsync}} operation is executed by the actor. It's enveloped 
with the new fencing token, and is scheduled by the Akka dispatcher.
   In such cases, an operation in previous session will be executed. That may 
lead to unexpected results.
     


> Schedule UnfencedMessage would lost envelope info 
> --------------------------------------------------
>
>                 Key: FLINK-12887
>                 URL: https://issues.apache.org/jira/browse/FLINK-12887
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>    Affects Versions: 1.9.0
>            Reporter: TisonKun
>            Priority: Major
>
> We provide {{runAsync}}, {{callAsync}} and {{scheduleRunAsync}} for 
> {{MainThreadExecutable}}, while providing {{runAsyncWithoutFencing}} and 
> {{callAsyncWithoutFencing}} additionally for {{FencedMainThreadExecutable}}.
> Let's think about a case when we want to schedule a unfenced runnable or any 
> other unfenced message(currently, we don't have such code path but it's 
> semantically valid.). 
> 1. {{FencedAkkaRpcActor}} received an unfenced runnable with delay
> 2. It extracted the runnable from unfenced message and call 
> {{super.handleRpcMessage}}.
> 3. {{AkkaRpcActor}} enveloped the message and schedule it by 
> {{AkkaRpcActor#L410}}.
> However, {{FencedAkkaRpcActor#envelopeSelfMessage}} was called for envelope. 
> Thus the unfenced message now become a fenced message.
> We can anyway implement {{scheduleRunAsyncWithoutFencing}} to schedule 
> unfenced message directly by {{actorsystem.scheduler.scheduleOnce(..., 
> dispatcher)}}, but with current codebase I notice that {{RunAsync}} has a 
> wried {{atTimeNanos}}(i.e., delay) property. Ideally how to schedule a 
> message is shown on what params ScheduleExecutorService called with, at least 
> we cannot extract an unfenced message and envelop it into a fence message and 
> then schedule it, which goes into wrong semantic.
> cc [~till.rohrmann]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to