[ https://issues.apache.org/jira/browse/BEAM-3087?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16398841#comment-16398841 ]
Alexey Romanenko commented on BEAM-3087: ---------------------------------------- Does anyone work on this? It looks like it's related to this issue: [BEAM-3726|https://issues.apache.org/jira/browse/BEAM-3726], especially, to this Pawel's [comment|https://issues.apache.org/jira/browse/BEAM-3726?focusedCommentId=16383882&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16383882]. This call (actually, two calls inside this method) should be synchronized with other reader calls {code} public class UnboundedSourceWrapper ... @Override public void run(SourceContext<WindowedValue<ValueWithRecordId<OutputT>>> ctx) throws Exception { ... while (isRunning) { dataAvailable = readerInvoker.invokeAdvance(reader); ... {code} Does this assumption make sense? > Extend lock scope in Flink UnboundedSourceWrapper > ------------------------------------------------- > > Key: BEAM-3087 > URL: https://issues.apache.org/jira/browse/BEAM-3087 > Project: Beam > Issue Type: Bug > Components: runner-flink > Reporter: Aljoscha Krettek > Priority: Major > > In {{UnboundedSourceWrapper}} the lock scope is not big enough: we > synchronise in {{emitElement()}} but should instead synchronise inside the > reader loop in {{run()}} because the {{Source}} interface does not allow > concurrent calls. -- This message was sent by Atlassian JIRA (v7.6.3#76005)