Hi Zhanghao Chen,

Sure, I can give some context.

My team's Flink application runs as a Kinesis Data Analytics streaming 
application [1] in AWS.

Our application receives events from Amazon Simple Queue Service (SQS) [2] in 
our source, and then uses a property of the SQS event to download from Amazon 
S3 [3]. The external metrics system for our counters is Amazon CloudWatch 
metrics [4].

For both the SQS consumer source and our S3 downloader operator, we have a 
counter for number of received items, number of successfully processed items, 
and number of items that failed to process.

However, during testing we have found that the count for SQS events received 
and S3 downloads is much too high. The counts for our counters in CloudWatch is 
much higher than the number of records reported in the Flink dashboard.

The goal is that our metrics in CloudWatch should accurately reflect the number 
of SQS events received and successfully or unsuccessfully processed, and the 
number of S3 downloads that were attempted and succeeded or failed.

I am looking for help understanding why our counter values are inaccurate.

[1] https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html
[2] 
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html
[3] https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html
[4] 
https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html

________________________________
From: Zhanghao Chen <zhanghao.c...@outlook.com>
Sent: March 5, 2022 11:11 PM
To: Shane Bishop <shane.bis...@outlook.com>; user@flink.apache.org 
<user@flink.apache.org>
Subject: Re: Question about Flink counters

Hi Shane,

Could you share more information on what you would like to use the counter for?

The counter discussed here is primarily designed for exposing counts to 
external metric systems. Usually, each task would count on its own, and it is 
left for the external metric system (usu. a time series database) to do 
aggregations. Also, you cannot reference a counter from a different machine. 
I'm not sure if this is what you expected.

Best,
Zhanghao Chen
________________________________
From: Shane Bishop <shane.bis...@outlook.com>
Sent: Saturday, March 5, 2022 23:22
To: Zhanghao Chen <zhanghao.c...@outlook.com>; user@flink.apache.org 
<user@flink.apache.org>
Subject: Re: Question about Flink counters

If I used a thread-safe counter implementation, would that be enough to make 
the count correct for a Flink cluster with multiple machines?

Best,
Shane
________________________________
From: Zhanghao Chen <zhanghao.c...@outlook.com>
Sent: March 4, 2022 11:08 PM
To: Shane Bishop <shane.bis...@outlook.com>; user@flink.apache.org 
<user@flink.apache.org>
Subject: Re: Question about Flink counters

Hi Shane,

Flink provides a generic counter interface with a few implementations. The 
default implementation SimpleCounter, which is not thread-safe, is used when 
you calling counter(String name) on a MetricGroup. Therefore, you'll need to 
use your own thread-safe implementation, check out the second example of 
Metrics | Apache 
Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter>
 for reference.

Best,
Zhanghao Chen
________________________________
From: Shane Bishop <shane.bis...@outlook.com>
Sent: Saturday, March 5, 2022 5:24
To: user@flink.apache.org <user@flink.apache.org>
Subject: Question about Flink counters

Hi all,

For Flink counters [1], are increment operations guaranteed to be atomic across 
all parallel tasks? I.e., is there a guarantee that the counter values will not 
be higher than expected?

Thanks,
Shane

---
[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter

Reply via email to