Hi Zhanghao Chen, Sure, I can give some context.
My team's Flink application runs as a Kinesis Data Analytics streaming application [1] in AWS. Our application receives events from Amazon Simple Queue Service (SQS) [2] in our source, and then uses a property of the SQS event to download from Amazon S3 [3]. The external metrics system for our counters is Amazon CloudWatch metrics [4]. For both the SQS consumer source and our S3 downloader operator, we have a counter for number of received items, number of successfully processed items, and number of items that failed to process. However, during testing we have found that the count for SQS events received and S3 downloads is much too high. The counts for our counters in CloudWatch is much higher than the number of records reported in the Flink dashboard. The goal is that our metrics in CloudWatch should accurately reflect the number of SQS events received and successfully or unsuccessfully processed, and the number of S3 downloads that were attempted and succeeded or failed. I am looking for help understanding why our counter values are inaccurate. [1] https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html [2] https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html [3] https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html [4] https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html ________________________________ From: Zhanghao Chen <zhanghao.c...@outlook.com> Sent: March 5, 2022 11:11 PM To: Shane Bishop <shane.bis...@outlook.com>; user@flink.apache.org <user@flink.apache.org> Subject: Re: Question about Flink counters Hi Shane, Could you share more information on what you would like to use the counter for? The counter discussed here is primarily designed for exposing counts to external metric systems. Usually, each task would count on its own, and it is left for the external metric system (usu. a time series database) to do aggregations. Also, you cannot reference a counter from a different machine. I'm not sure if this is what you expected. Best, Zhanghao Chen ________________________________ From: Shane Bishop <shane.bis...@outlook.com> Sent: Saturday, March 5, 2022 23:22 To: Zhanghao Chen <zhanghao.c...@outlook.com>; user@flink.apache.org <user@flink.apache.org> Subject: Re: Question about Flink counters If I used a thread-safe counter implementation, would that be enough to make the count correct for a Flink cluster with multiple machines? Best, Shane ________________________________ From: Zhanghao Chen <zhanghao.c...@outlook.com> Sent: March 4, 2022 11:08 PM To: Shane Bishop <shane.bis...@outlook.com>; user@flink.apache.org <user@flink.apache.org> Subject: Re: Question about Flink counters Hi Shane, Flink provides a generic counter interface with a few implementations. The default implementation SimpleCounter, which is not thread-safe, is used when you calling counter(String name) on a MetricGroup. Therefore, you'll need to use your own thread-safe implementation, check out the second example of Metrics | Apache Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter> for reference. Best, Zhanghao Chen ________________________________ From: Shane Bishop <shane.bis...@outlook.com> Sent: Saturday, March 5, 2022 5:24 To: user@flink.apache.org <user@flink.apache.org> Subject: Question about Flink counters Hi all, For Flink counters [1], are increment operations guaranteed to be atomic across all parallel tasks? I.e., is there a guarantee that the counter values will not be higher than expected? Thanks, Shane --- [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter