Aljoscha Krettek created BEAM-11251:
---------------------------------------

             Summary: Unfair lock/monitor in UnboundedSourceWrapper can starve 
checkpointing
                 Key: BEAM-11251
                 URL: https://issues.apache.org/jira/browse/BEAM-11251
             Project: Beam
          Issue Type: Bug
          Components: runner-flink
            Reporter: Aljoscha Krettek


>From my email on this Flink ML thread: 
>https://lists.apache.org/thread.html/rca7eb85183ac17be478b4d6c59c5ca348077689e5ba4a42c87b71bf4%40%3Cuser.flink.apache.org%3E:

We need the synchronized block in the source because the call to 
{{reader.advance()}} (via the invoker) and {{reader.getCurrent()}} (via 
{{emitElement()}}) must be atomic with respect to state. We cannot advance the 
reader state, not emit that record but still checkpoint the new reader state. 
The monitor ensures that no checkpoint can happen in between those to calls.

The basic problem is now that we can starve checkpointing because the 
monitor/lock is not fair. This could be solved by using a fair lock but that 
would require Flink proper to be changed to use a fair lock instead of a 
monitor/synchronized. I don't see this as an immediate solution.

One thing that exacerbates this problem is that too many things are happening 
"under" the synchronized block. All the transforms before a 
shuffle/rebalance/keyBy are chained to the source, which means that they are 
invoked from the {{emitElement()}} call.

A possible mitigation would be to disable chaining globally by inserting a 
{{flinkStreamEnv.disableOperatorChaining()}} in [1].

A more surgical version would be to only disable chaining for sources but this 
can also have an impact on performance since without chaining we potentially 
have more serialization between tasks/operators.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to