[ https://issues.apache.org/jira/browse/FLINK-21996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17326587#comment-17326587 ]
Dawid Wysakowicz commented on FLINK-21996: ------------------------------------------ Sorry, I should've posted that I am aware of the issue. I actually synced offline and created FLINK-22397 to track the test failure. Let's keep this issue closed. > Transient RPC failure without TaskManager failure can lead to split > assignment loss > ----------------------------------------------------------------------------------- > > Key: FLINK-21996 > URL: https://issues.apache.org/jira/browse/FLINK-21996 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.12.2 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Priority: Blocker > Labels: pull-request-available > Fix For: 1.13.0, 1.12.3 > > > NOTE: This bug has not been actually observed. It is based on reviews of the > current implementation. > I would expect it to be a pretty rare case, bu at scale, even the rare cases > happen often enough. > h2. Problem > Intermediate RPC messages from JM to TM can get dropped, even when the TM is > not marked as failed. > That can happen when the connection can be recovered before the heartbeat > times out. > So RPCs generally retry, or handle failures: For example Deploy-Task-RPC > retries, Trigger-Checkpoint RPC aborts the checkpoint on failure and triggers > a new checkpoint. > The "Send OperatorEvent" RPC call (from Coordinator to Operator) gives you a > Future with the acknowledgement. But if that one fails, we are in the > situation where we do not know whether the event sending was successful or > not (only the ack failed). > This is especially tricky for split assignments and checkpoints. Consider > this sequence of actions: > 1. Coordinator assigns a split. Ack not yet received. > 2. Coordinator takes a checkpoint. Split was sent before the checkpoint, so > is not included on the Coordinator. > 3. Split assignment RPC response is "failed". > 4. Checkpoint completes. > Now we don't know whether the split was in the checkpoint on the Operator > (TaskManager) or not, and with that we don't know whether we should add it > back to the coordinator. We need to do something to make sure the split is > now either on the coordinator or on the Operator. Currently, the split is > implicitly assumed to be on the Operator; if it isn't, then that split is > lost. > Not, it is worth pointing out that this is a pretty rare situation, because > it means that the RPC with the split assignment fails and the one for the > checkpoint succeeds, even though they are in close proximity. The way the > Akka-based RPC transport works (with retries, etc.), this can happen, but > isn't very likely. That why we haven't so far seen this bug in practice or > haven't gotten a report for it, yet. > h2. Proposed solution > The solution has two components: > 1. Fallback to consistent point: If the system doesn't know whether two > parts are still consistent with each other (here coordinator and Operator), > fall back to a consistent point. Here that is the case when the Ack-Future > for the "Send Operator Event" RPC fails or times out. Then we call the > scheduler to trigger a failover of the target operator to latest checkpoint > and signaling the coordinator the same. That restores consistency. We can > later optimize this (see below). > 2. We cannot trigger checkpoints while we are "in limbo" concerning our > knowledge about splits. Concretely that means that the Coordinator can only > acknowledge the checkpoint once the Acks for pending Operator Event RPCs > (Assign-Splits) have arrived. The checkpoint future is conditional on all > pending RPC futures. If the RPC futures fail (or time out) then the > checkpoint cannot complete (and the target operator will anyways go through a > failover). In the common case, RPC round trip time is milliseconds, which > would be added to the checkpoint latency if the checkpoint happends to > overlap with a split assignment (most won't). > h2. Possible Future Improvements > Step (1) above can be optimized by going with retries first and sequence > numbers to deduplicate the calls. That can help reduce the number of cases > were a failover is needed. However, the number of situations where the RPC > would need a retry and has a chance of succeeding (the TM is not down) should > be very few to begin with, so whether this optimization is worth it remains > to be seen. -- This message was sent by Atlassian Jira (v8.3.4#803005)