[ 
https://issues.apache.org/jira/browse/FLINK-21996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326587#comment-17326587
 ] 

Dawid Wysakowicz commented on FLINK-21996:
------------------------------------------

Sorry, I should've posted that I am aware of the issue. I actually synced 
offline and created FLINK-22397 to track the test failure. Let's keep this 
issue closed.

> Transient RPC failure without TaskManager failure can lead to split 
> assignment loss
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-21996
>                 URL: https://issues.apache.org/jira/browse/FLINK-21996
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.12.2
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.13.0, 1.12.3
>
>
> NOTE: This bug has not been actually observed. It is based on reviews of the 
> current implementation.
> I would expect it to be a pretty rare case, bu at scale, even the rare cases 
> happen often enough.
> h2. Problem
> Intermediate RPC messages from JM to TM can get dropped, even when the TM is 
> not marked as failed.
> That can happen when the connection can be recovered before the heartbeat 
> times out.
> So RPCs generally retry, or handle failures: For example Deploy-Task-RPC 
> retries, Trigger-Checkpoint RPC aborts the checkpoint on failure and triggers 
> a new checkpoint.
> The "Send OperatorEvent" RPC call (from Coordinator to Operator) gives you a 
> Future with the acknowledgement. But if that one fails, we are in the 
> situation where we do not know whether the event sending was successful or 
> not (only the ack failed).
> This is especially tricky for split assignments and checkpoints. Consider 
> this sequence of actions:
>   1. Coordinator assigns a split. Ack not yet received.
>   2. Coordinator takes a checkpoint. Split was sent before the checkpoint, so 
> is not included on the Coordinator.
>   3. Split assignment RPC response is "failed".
>   4. Checkpoint completes.
> Now we don't know whether the split was in the checkpoint on the Operator 
> (TaskManager) or not, and with that we don't know whether we should add it 
> back to the coordinator. We need to do something to make sure the split is 
> now either on the coordinator or on the Operator. Currently, the split is 
> implicitly assumed to be on the Operator; if it isn't, then that split is 
> lost.
> Not, it is worth pointing out that this is a pretty rare situation, because 
> it means that the RPC with the split assignment fails and the one for the 
> checkpoint succeeds, even though they are in close proximity. The way the 
> Akka-based RPC transport works (with retries, etc.), this can happen, but 
> isn't very likely. That why we haven't so far seen this bug in practice or 
> haven't gotten a report for it, yet.
> h2. Proposed solution
> The solution has two components:
>   1. Fallback to consistent point: If the system doesn't know whether two 
> parts are still consistent with each other (here coordinator and Operator), 
> fall back to a consistent point. Here that is the case when the Ack-Future 
> for the "Send Operator Event" RPC fails or times out. Then we call the 
> scheduler to trigger a failover of the target operator to latest checkpoint 
> and signaling the coordinator the same. That restores consistency. We can 
> later optimize this (see below).
>   2. We cannot trigger checkpoints while we are "in limbo" concerning our 
> knowledge about splits. Concretely that means that the Coordinator can only 
> acknowledge the checkpoint once the Acks for pending Operator Event RPCs 
> (Assign-Splits) have arrived. The checkpoint future is conditional on all 
> pending RPC futures. If the RPC futures fail (or time out) then the 
> checkpoint cannot complete (and the target operator will anyways go through a 
> failover). In the common case, RPC round trip time is milliseconds, which 
> would be added to the checkpoint latency if the checkpoint happends to 
> overlap with a split assignment (most won't).
> h2. Possible Future Improvements
> Step (1) above can be optimized by going with retries first and sequence 
> numbers to deduplicate the calls. That can help reduce the number of cases 
> were a failover is needed. However, the number of situations where the RPC 
> would need a retry and has a chance of succeeding (the TM is not down) should 
> be very few to begin with, so whether this optimization is worth it remains 
> to be seen.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to