Aljoscha Krettek created BEAM-11251:
---------------------------------------
Summary: Unfair lock/monitor in UnboundedSourceWrapper can starve
checkpointing
Key: BEAM-11251
URL: https://issues.apache.org/jira/browse/BEAM-11251
Project: Beam
Issue Type: Bug
Components: runner-flink
Reporter: Aljoscha Krettek
>From my email on this Flink ML thread:
>https://lists.apache.org/thread.html/rca7eb85183ac17be478b4d6c59c5ca348077689e5ba4a42c87b71bf4%40%3Cuser.flink.apache.org%3E:
We need the synchronized block in the source because the call to
{{reader.advance()}} (via the invoker) and {{reader.getCurrent()}} (via
{{emitElement()}}) must be atomic with respect to state. We cannot advance the
reader state, not emit that record but still checkpoint the new reader state.
The monitor ensures that no checkpoint can happen in between those to calls.
The basic problem is now that we can starve checkpointing because the
monitor/lock is not fair. This could be solved by using a fair lock but that
would require Flink proper to be changed to use a fair lock instead of a
monitor/synchronized. I don't see this as an immediate solution.
One thing that exacerbates this problem is that too many things are happening
"under" the synchronized block. All the transforms before a
shuffle/rebalance/keyBy are chained to the source, which means that they are
invoked from the {{emitElement()}} call.
A possible mitigation would be to disable chaining globally by inserting a
{{flinkStreamEnv.disableOperatorChaining()}} in [1].
A more surgical version would be to only disable chaining for sources but this
can also have an impact on performance since without chaining we potentially
have more serialization between tasks/operators.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)