[
https://issues.apache.org/jira/browse/FLINK-14375?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zhu Zhu updated FLINK-14375:
----------------------------
Description:
The DefaultScheduler triggers failover if a task is notified to be FAILED.
However, in the case the multiple tasks in the same region fail together, it
will trigger multiple failovers. The later triggered failovers are useless,
lead to concurrent failovers and will increase the restart attempts count.
I think the deep reason for this issue is that some fake state changes are
notified to the DefaultScheduler.
The case above is a FAILED state change from TM will turn a CANCELING vertex to
CANCELED, and the actual state transition is to CANCELED. But a FAILED state is
notified to DefaultScheduler.
And there can be another possible issue caused by it, that a FINISHED state
change is notified from TM when a vertex is CANCELING. The vertex will become
CANCELED, while its FINISHED state change will be notified to DefaultScheduler
which may trigger downstream task scheduling.
I'd propose to translate a state update to be correct before notifying it to
DefaultScheduler, namely do changes in SchedulerBase:
@Override
public final boolean updateTaskExecutionState(final TaskExecutionState
taskExecutionState) {
final Optional<ExecutionVertexID> executionVertexId =
getExecutionVertexId(taskExecutionState.getID());
if (executionVertexId.isPresent()) {
executionGraph.updateState(taskExecutionState);
updateTaskExecutionStateInternal(executionVertexId.get(), new
TaskExecutionState(
taskExecutionState.getJobID(),
taskExecutionState.getID(),
getExecutionVertex(executionVertexId.get()).getExecutionState(),
taskExecutionState.getError(userCodeLoader)
));
return true;
}
return false;
}
was:
The DefaultScheduler triggers failover if a task is notified to be FAILED.
However, in the case the multiple tasks in the same region fail together, it
will trigger multiple failovers. The later triggered failovers are useless,
lead to concurrent failovers and will increase the restart attempts count.
To avoid that, I'd propose to check that the effectiveness of a task failure to
decide whether to trigger a failover, namely checking in
{{DefaultScheduler#maybeHandleTaskFailure()}} to see whether the vertex state
is really FAILED rather than CANCELED.
> Avoid to trigger failover on a non-effective task failure notification
> ----------------------------------------------------------------------
>
> Key: FLINK-14375
> URL: https://issues.apache.org/jira/browse/FLINK-14375
> Project: Flink
> Issue Type: Sub-task
> Components: Runtime / Coordination
> Affects Versions: 1.10.0
> Reporter: Zhu Zhu
> Priority: Major
> Fix For: 1.10.0
>
>
> The DefaultScheduler triggers failover if a task is notified to be FAILED.
> However, in the case the multiple tasks in the same region fail together, it
> will trigger multiple failovers. The later triggered failovers are useless,
> lead to concurrent failovers and will increase the restart attempts count.
> I think the deep reason for this issue is that some fake state changes are
> notified to the DefaultScheduler.
> The case above is a FAILED state change from TM will turn a CANCELING vertex
> to CANCELED, and the actual state transition is to CANCELED. But a FAILED
> state is notified to DefaultScheduler.
> And there can be another possible issue caused by it, that a FINISHED state
> change is notified from TM when a vertex is CANCELING. The vertex will become
> CANCELED, while its FINISHED state change will be notified to
> DefaultScheduler which may trigger downstream task scheduling.
> I'd propose to translate a state update to be correct before notifying it to
> DefaultScheduler, namely do changes in SchedulerBase:
> @Override
> public final boolean updateTaskExecutionState(final TaskExecutionState
> taskExecutionState) {
> final Optional<ExecutionVertexID> executionVertexId =
> getExecutionVertexId(taskExecutionState.getID());
> if (executionVertexId.isPresent()) {
> executionGraph.updateState(taskExecutionState);
>
> updateTaskExecutionStateInternal(executionVertexId.get(), new
> TaskExecutionState(
> taskExecutionState.getJobID(),
> taskExecutionState.getID(),
>
> getExecutionVertex(executionVertexId.get()).getExecutionState(),
> taskExecutionState.getError(userCodeLoader)
> ));
> return true;
> }
> return false;
> }
--
This message was sent by Atlassian Jira
(v8.3.4#803005)