[ https://issues.apache.org/jira/browse/BEAM-9439?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17071204#comment-17071204 ]
Sebastian Graca commented on BEAM-9439: --------------------------------------- [~aromanenko]The problem with current implementation is that it does not return total backlog bytes because each reader tracks only its own shards and does not know the progress of other readers' shards. Therefore it can happen that it can report much smaller backlog (even 0) while the actual backlog can be hundreds of MB (that's what happened in our case). So it's actually more like {{getSplitBacklogBytes()}} instead of {{getTotalBacklogBytes()}}. The only correct solution would be to use shard-level metrics as [~pawel.kaczmarczyk] suggested but it adds extra cost and also needs to be enabled for each stream, which requires extra action to be performed in order to use that feature. For that reason such feature should be an opt-in and there should be a way to use KinesisIO without enabling it. I can see two ways of improving backlog handling without resorting to use of shard-level metrics: * correctly implementing {{getTotalBacklogBytes()}} - this is hard as it needs a way to know the stream watermark across all shards - the readers would need a way to share that information * implementing {{getSplitBacklogBytes()}} instead of {{getTotalBacklogBytes()}} - our implementation reuses the current implementation of {{getTotalBacklogBytes()}}. Yes, it can happen that if a reader have shards that are progressing slowly and are behind other shards the reported backlog will be bigger than it should but on the other hand if some shards are progressing quickly, only that reader will report small backlog. This can potentially be improved but so far we see that in case of Dataflow autoscaling algorithm the exact value of backlog is not that important. Much more important is how it changes over time and if it goes up or down or stays at the same level. If you can think of anything else, please let me know. So far we have had no issues with our fix and the pipelines have been running smoothly and scaling, both, up and down. > KinesisReader does not report correct backlog statistics > --------------------------------------------------------- > > Key: BEAM-9439 > URL: https://issues.apache.org/jira/browse/BEAM-9439 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis > Reporter: Sam Whittle > Priority: Major > > The KinesisReader implementing KinesisIO reports backlog by implementing the > UnboundedSource.getTotalBacklogBytes() > method as opposed to the > UnboundedSource.getSplitBacklogBytes() > This value is supposed to represent the total backlog across all shards. > This function is implemented by calling > SimplifiedKinesisClient.getBacklogBytes with the watermark of the kinesis > shards managed within the UnboundedReader instance. As this watermark may be > further ahead than the watermark across all shards, this may miss backlog > bytes. > An additional concern is that the watermark is calculated using a > WatermarkPolicy, which means that the watermark may be inconsistent to the > kinesis timestamp for querying backlog. -- This message was sent by Atlassian Jira (v8.3.4#803005)