Re: Question about Flink counters
Hi, My issue has been resolved through discussion with AWS support. It turns out that Kinesis Data Analytics reports to CloudWatch in a way I did not expect. The way to view the accurate values for Flink counters is with Average in CloudWatch metrics. Below is the response from AWS support, for anyone who might benefit from this in the future: After discussing this issue with our Kinesis Data Analytics team, I was informed that currently metrics are reported every 15 seconds, hence every minute there would be 4 records, which might be the explanation to why you are seeing this number of records. However, please understand that this frequency may change in the future so please do not rely on this frequency as the source of truth. We would recommend to use base alarms on the AVG, P99 or other similar metric statistics rather than looking and examining individual values or verifying the numbers of values are as expected. In addition, depending on how these metrics are calculated, displaying them with different statistic would show different graphs and scenarios. Please kindly refer to the "CloudWatch statistics definitions" [1] documentation for more details on how metric data are aggregated over specified periods of time with different statistic choices for metrics. Below are some examples of the most commonly used statistic options: -"SampleCount" is the number of data points during the period. -"Sum" is the sum of the values of the all data points collected during the period. -"Average" is the value of Sum/SampleCount during the specified period. -"Percentile (p)" indicates the relative standing of a value in a dataset. For example, p99 is the 99th percentile and means that 99 percent of the data within the period is lower than this value and 1 percent of the data is higher than this value. -”Minimum“ is the lowest value observed during the specified period. -“Maximum" is the highest value observed during the specified period. In answer to your question regarding how custom metrics are pushed, in the documentation you have provided [2], it says that "Custom metrics in Kinesis Data Analytics use the Apache Flink metric system", Kinesis Data Analytics only report these metrics to CloudWatch as you define these custom metrics. [1] https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/Statistics-definitions.html [2] https://docs.aws.amazon.com/kinesisanalytics/latest/java/monitoring-metrics-custom.html Regards, Shane
Re: Question about Flink counters
Hi Dawid, My team's Flink application's primary purpose is not to count the number of SQS messages received or the number of successful or failed S3 downloads. The application's primary purpose is to process events and the corresponding data, and for each event, create or update a new entry in our database with the result of this processing. (I can't really go into too much detail on what is processed and what results are produced because that is proprietary information.) As you suggest, the counters for SQS messages and successful or unsuccessful downloads from S3 are just for the purposes of monitoring. We use these metrics to have an idea of how healthy our application is, and to help indicate which components may have faults. We are not using these counters to calculate results. My team is trying to understand why we see inaccurate values for our metrics, with the intention of fixing the inaccuracies so we can better monitor our application. I hope this helps to clarify the context of my inquiry. Best, Shane
Re: Question about Flink counters
Hi Zhanghao Chen, Sure, I can give some context. My team's Flink application runs as a Kinesis Data Analytics streaming application [1] in AWS. Our application receives events from Amazon Simple Queue Service (SQS) [2] in our source, and then uses a property of the SQS event to download from Amazon S3 [3]. The external metrics system for our counters is Amazon CloudWatch metrics [4]. For both the SQS consumer source and our S3 downloader operator, we have a counter for number of received items, number of successfully processed items, and number of items that failed to process. However, during testing we have found that the count for SQS events received and S3 downloads is much too high. The counts for our counters in CloudWatch is much higher than the number of records reported in the Flink dashboard. The goal is that our metrics in CloudWatch should accurately reflect the number of SQS events received and successfully or unsuccessfully processed, and the number of S3 downloads that were attempted and succeeded or failed. I am looking for help understanding why our counter values are inaccurate. [1] https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html [2] https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html [3] https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html [4] https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html From: Zhanghao Chen Sent: March 5, 2022 11:11 PM To: Shane Bishop ; user@flink.apache.org Subject: Re: Question about Flink counters Hi Shane, Could you share more information on what you would like to use the counter for? The counter discussed here is primarily designed for exposing counts to external metric systems. Usually, each task would count on its own, and it is left for the external metric system (usu. a time series database) to do aggregations. Also, you cannot reference a counter from a different machine. I'm not sure if this is what you expected. Best, Zhanghao Chen From: Shane Bishop Sent: Saturday, March 5, 2022 23:22 To: Zhanghao Chen ; user@flink.apache.org Subject: Re: Question about Flink counters If I used a thread-safe counter implementation, would that be enough to make the count correct for a Flink cluster with multiple machines? Best, Shane From: Zhanghao Chen Sent: March 4, 2022 11:08 PM To: Shane Bishop ; user@flink.apache.org Subject: Re: Question about Flink counters Hi Shane, Flink provides a generic counter interface with a few implementations. The default implementation SimpleCounter, which is not thread-safe, is used when you calling counter(String name) on a MetricGroup. Therefore, you'll need to use your own thread-safe implementation, check out the second example of Metrics | Apache Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter> for reference. Best, Zhanghao Chen From: Shane Bishop Sent: Saturday, March 5, 2022 5:24 To: user@flink.apache.org Subject: Question about Flink counters Hi all, For Flink counters [1], are increment operations guaranteed to be atomic across all parallel tasks? I.e., is there a guarantee that the counter values will not be higher than expected? Thanks, Shane --- [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter
Re: Question about Flink counters
If I used a thread-safe counter implementation, would that be enough to make the count correct for a Flink cluster with multiple machines? Best, Shane From: Zhanghao Chen Sent: March 4, 2022 11:08 PM To: Shane Bishop ; user@flink.apache.org Subject: Re: Question about Flink counters Hi Shane, Flink provides a generic counter interface with a few implementations. The default implementation SimpleCounter, which is not thread-safe, is used when you calling counter(String name) on a MetricGroup. Therefore, you'll need to use your own thread-safe implementation, check out the second example of Metrics | Apache Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter> for reference. Best, Zhanghao Chen From: Shane Bishop Sent: Saturday, March 5, 2022 5:24 To: user@flink.apache.org Subject: Question about Flink counters Hi all, For Flink counters [1], are increment operations guaranteed to be atomic across all parallel tasks? I.e., is there a guarantee that the counter values will not be higher than expected? Thanks, Shane --- [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter
Question about Flink counters
Hi all, For Flink counters [1], are increment operations guaranteed to be atomic across all parallel tasks? I.e., is there a guarantee that the counter values will not be higher than expected? Thanks, Shane --- [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter
"No operators defined in streaming topology" error when Flink app still starts successfully
Hi all, My team has started seeing the error "java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute." However, even with this error, the Flink application starts and runs fine, and the Flink job renders fine in the Flink Dashboard. Attached is the full stacktrace. This error comes from when we call StreamExecutionEnvironment#getExecutionPlan(). In the code snippet below, we call this method on the last line of the snippet. >From poking around online, I found >https://stackoverflow.com/questions/54977290/flink-no-operators-defined-in-streaming-topology-cannot-execute, > which suggests the problem could be that we don't set a sink, but in the code >below you will see we do set a sink (just maybe not in a way that >getExecutionPlan() expects). Can this be safely ignored? Is there something we can do so that getExecutionPlan() will work properly, or otherwise fix/suppress this error? Below is the code (some portions have been redacted): final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream source = env.addSource(new RedactedType()) .uid() .name() .shuffle(); DataStream> stream = AsyncDataStream.unorderedWait(source, new RedactedType(), 1, TimeUnit.MILLISECONDS) .uid() .name(); stream .flatMap(new RedactedType()) .uid() .name() .flatMap(new RedactedType()) .uid() .name() .shuffle() .addSink(new RedactedType()) // Set sink .uid() .name(); env.execute(""); logger.info("Started job; executionPlan={}", env.getExecutionPlan()); // line 66 Thanks, Shane org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application. at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:247) at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) ... 6 more Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:238) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ... 6 more Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No operators defined in streaming topology. Cannot execute. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84) ... 9 more Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:2019) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2010) at
Re: Questions about Kryo setRegistrationRequired(false)
I tried ExecutionConfig#disableGenericTypes and I get this error when I try to execute my Flink job with StreamExecutionEnvironment#execute (attached is the full stacktrace): java.lang.UnsupportedOperationException: Generic types have been disabled in the ExecutionConfig and type java.util.List is treated as a generic type. I suppose then there is nothing I can do at this time other than wait for the ticket I filed [1] to be resolved. I am assuming if generic types are disabled, then UnsupportedOperationException with be thrown for all generic types, regardless of whether the generic type is registered with Kryo or not. [1] https://issues.apache.org/jira/browse/FLINK-25993 Best regards, Shane From: Chesnay Schepler Sent: February 7, 2022 3:08 AM To: Shane Bishop ; user@flink.apache.org Subject: Re: Questions about Kryo setRegistrationRequired(false) There isn't any setting to control setRegistrationRequired(). You can however turn Kryo off via ExecutionConfig#disableGenericTypes, although this may require changes to your data types. I'd recommend to file a ticket. On 04/02/2022 20:12, Shane Bishop wrote: Hi all, TL;DR: I am concerned that kryo.setRegistrationRequired(false) in Apache Flink might introduce serialization/deserialization vulnerabilities, and I want to better understand the security implications of its use in Flink. There is an issue on the Kryo GitHub repo (link<https://github.com/EsotericSoftware/kryo/issues/398>) regarding type registration. The "fix" the Kryo developers made was to make setRegistrationRequired(true) the default (comment on GitHub issue<https://github.com/EsotericSoftware/kryo/issues/398#issuecomment-371153541>, commit with this fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00> and the line in the commit that is the fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00#diff-6d4638ca49aa0d0d9171ff04a0faa22e241f8320fda4a8a12c95853188d055a0R130>). This is not a true fix, as the default can still be overridden. This only sets a safe default. In Flink, the default of true is overridden in the 1.14.3 Flink release (see KryoSerializer.java<https://github.com/apache/flink/blob/release-1.14.3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L492> and FlinkScalaKryoInstantiator.scala<https://github.com/apache/flink/blob/release-1.14.3/flink-scala/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala#L46>). I am no Flink contributor, so I might be missing safety mechanisms that are in place to prevent the Kryo serialization/deserialization vulnerability even when registration required is set to false. Are there any such safety mechanisms in place? Is there anything I can do as a user of Flink to protect myself against this Kryo vulnerability? Best regards, Shane Bishop org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application. at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:247) at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.util.concurrent.CompletionException: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) ... 6 more Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute application. at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88) at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70) at org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3
Re: Questions about Kryo setRegistrationRequired(false)
Thanks Chesnay Schepler. I filed a ticket: https://issues.apache.org/jira/browse/FLINK-25993 My team will try disabling Kyro with ExecutionConfig#disableGenericTypes and see if we need to change our data types or not. Best regards, Shane From: Chesnay Schepler Sent: February 7, 2022 3:08 AM To: Shane Bishop ; user@flink.apache.org Subject: Re: Questions about Kryo setRegistrationRequired(false) There isn't any setting to control setRegistrationRequired(). You can however turn Kryo off via ExecutionConfig#disableGenericTypes, although this may require changes to your data types. I'd recommend to file a ticket. On 04/02/2022 20:12, Shane Bishop wrote: Hi all, TL;DR: I am concerned that kryo.setRegistrationRequired(false) in Apache Flink might introduce serialization/deserialization vulnerabilities, and I want to better understand the security implications of its use in Flink. There is an issue on the Kryo GitHub repo (link<https://github.com/EsotericSoftware/kryo/issues/398>) regarding type registration. The "fix" the Kryo developers made was to make setRegistrationRequired(true) the default (comment on GitHub issue<https://github.com/EsotericSoftware/kryo/issues/398#issuecomment-371153541>, commit with this fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00> and the line in the commit that is the fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00#diff-6d4638ca49aa0d0d9171ff04a0faa22e241f8320fda4a8a12c95853188d055a0R130>). This is not a true fix, as the default can still be overridden. This only sets a safe default. In Flink, the default of true is overridden in the 1.14.3 Flink release (see KryoSerializer.java<https://github.com/apache/flink/blob/release-1.14.3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L492> and FlinkScalaKryoInstantiator.scala<https://github.com/apache/flink/blob/release-1.14.3/flink-scala/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala#L46>). I am no Flink contributor, so I might be missing safety mechanisms that are in place to prevent the Kryo serialization/deserialization vulnerability even when registration required is set to false. Are there any such safety mechanisms in place? Is there anything I can do as a user of Flink to protect myself against this Kryo vulnerability? Best regards, Shane Bishop
Questions about Kryo setRegistrationRequired(false)
Hi all, TL;DR: I am concerned that kryo.setRegistrationRequired(false) in Apache Flink might introduce serialization/deserialization vulnerabilities, and I want to better understand the security implications of its use in Flink. There is an issue on the Kryo GitHub repo (link<https://github.com/EsotericSoftware/kryo/issues/398>) regarding type registration. The "fix" the Kryo developers made was to make setRegistrationRequired(true) the default (comment on GitHub issue<https://github.com/EsotericSoftware/kryo/issues/398#issuecomment-371153541>, commit with this fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00> and the line in the commit that is the fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00#diff-6d4638ca49aa0d0d9171ff04a0faa22e241f8320fda4a8a12c95853188d055a0R130>). This is not a true fix, as the default can still be overridden. This only sets a safe default. In Flink, the default of true is overridden in the 1.14.3 Flink release (see KryoSerializer.java<https://github.com/apache/flink/blob/release-1.14.3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L492> and FlinkScalaKryoInstantiator.scala<https://github.com/apache/flink/blob/release-1.14.3/flink-scala/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala#L46>). I am no Flink contributor, so I might be missing safety mechanisms that are in place to prevent the Kryo serialization/deserialization vulnerability even when registration required is set to false. Are there any such safety mechanisms in place? Is there anything I can do as a user of Flink to protect myself against this Kryo vulnerability? Best regards, Shane Bishop