Piotr Nowojski created FLINK-37670:
--------------------------------------
Summary: Watermark alignment can deadlock job if there are no more
splits to be assigned
Key: FLINK-37670
URL: https://issues.apache.org/jira/browse/FLINK-37670
Project: Flink
Issue Type: Bug
Components: API / DataStream, Runtime / Task
Affects Versions: 1.20.1, 1.19.2, 2.0.0
Reporter: Piotr Nowojski
{{SourceCoordinator#announceCombinedWatermark}} stops announcing combined
watermark if {{context.hasNoMoreSplits(subtaskId)}} returns true.
{noformat}
for (Integer subtaskId : subTaskIds) {
// when subtask have been finished, do not send event.
if (!context.hasNoMoreSplits(subtaskId)) {
// Subtask maybe during deploying or restarting, so we only send
// WatermarkAlignmentEvent to ready task to avoid period task
fail
// (Java-ThreadPoolExecutor will not schedule the period task
if it throws an
// exception).
context.sendEventToSourceOperatorIfTaskReady(
subtaskId, new
WatermarkAlignmentEvent(maxAllowedWatermark));
}
}
{noformat}
Which means new max allowed watermark will not be announced anymore, preventing
sources from making progress. The intention was to prevent failures caused by
tasks not running:
{noformat}
Caused by:
org.apache.flink.runtime.operators.coordination.TaskNotRunningException: Task
is not running, but in state FINISHED
at
org.apache.flink.runtime.taskmanager.Task.deliverOperatorEvent(Task.java:1613)
~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at
org.apache.flink.runtime.taskexecutor.TaskExecutor.sendOperatorEventToTask(TaskExecutor.java:1478)
~[flink-runtime-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at jdk.internal.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
~[?:?]
at
jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:?]
at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
~[flink-rpc-akka8c031913-bb13-4662-a216-58134efdb2d9.jar:1.19-SNAPSHOT]
at
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
~[flink-rpc-core-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
~[flink-rpc-akka8c031913-bb13-4662-a216-58134efdb2d9.jar:1.19-SNAPSHOT]
at
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
~[flink-rpc-akka8c031913-bb13-4662-a216-58134efdb2d9.jar:1.19-SNAPSHOT]
{noformat}
but:
# This is an incorrect check. This, contrary to the comment, doesn't check if
all subtasks have finished, but only that source coordinator has no more splits
to assign.
# Even if it was checking if all subtasks finished, that wouldn't work because:
* one subtask can be finished, while others are still running
* there can be race conditions between sending an operator event to a running
operator and that operator switching to finished
I think, at least for some non critical events, we need to just ignore
{{TaskNotRunningException}}.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)