[ 
https://issues.apache.org/jira/browse/BEAM-4521?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16506228#comment-16506228
 ] 

Raghu Angadi commented on BEAM-4521:
------------------------------------

This is really a flink bug. All the Beam API methods are supposed to have the 
required context. Similar issue with Flink runner came up with in another 
context where it accesses readers concurrently even though spec explicitly 
disallows that ([PR 
3985|https://github.com/apache/beam/pull/3985#issuecomment-338051096]). We 
could do that in advance(), but that would unnecessary overhead for every 
single record. I would like to hear from Flink runner authors about this. Note 
that these issues affect all the unbounded sources, not just KafkaIO. 

> Backlog metrics not showing up
> ------------------------------
>
>                 Key: BEAM-4521
>                 URL: https://issues.apache.org/jira/browse/BEAM-4521
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.4.0
>            Reporter: Jozef Vilcek
>            Assignee: Aljoscha Krettek
>            Priority: Minor
>
> Hello,
> I wanted to track _backlog_bytes_ and _backlog_elements_ metrics from 
> SinkMetrics for kafka. I see in the code that KafkaUnboundedReader is 
> reporting them but was not able to get them visible (running on Flink).
> Metrics are reported here:
> [https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L668]
> In Flink runtime I noticed this message:
> {code:java}
> 2018-06-07 08:53:53,216 ERROR org.apache.beam.sdk.metrics.MetricsEnvironment  
>               - Unable to update metrics on the current thread. Most likely 
> caused by using metrics outside the managed work-execution thread.
> {code}
> I see that backlog is reported from getCheckpointMark(), which is done by 
> some other thread. Not sure why it is done there. 
> I tested locally to move it to the advance() method, where bytes_read is 
> reported and it worked. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to