Re: Question about Flink counters

2022-03-08 Thread Shane Bishop
Hi,

My issue has been resolved through discussion with AWS support.

It turns out that Kinesis Data Analytics reports to CloudWatch in a way I did 
not expect. The way to view the accurate values for Flink counters is with 
Average in CloudWatch metrics.

Below is the response from AWS support, for anyone who might benefit from this 
in the future:

After discussing this issue with our Kinesis Data Analytics team, I was 
informed that currently metrics are reported every 15 seconds, hence every 
minute there would be 4 records, which might be the explanation to why you are 
seeing this number of records. However, please understand that this frequency 
may change in the future so please do not rely on this frequency as the source 
of truth. We would recommend to use base alarms on the AVG, P99 or other 
similar metric statistics rather than looking and examining individual values 
or verifying the numbers of values are as expected.

In addition, depending on how these metrics are calculated, displaying them 
with different statistic would show different graphs and scenarios. Please 
kindly refer to the "CloudWatch statistics definitions" [1] documentation for 
more details on how metric data are aggregated over specified periods of time 
with different statistic choices for metrics. Below are some examples of the 
most commonly used statistic options:
-"SampleCount" is the number of data points during the period.
-"Sum" is the sum of the values of the all data points collected during the 
period.
-"Average" is the value of Sum/SampleCount during the specified period.
-"Percentile (p)" indicates the relative standing of a value in a dataset. For 
example, p99 is the 99th percentile and means that 99 percent of the data 
within the period is lower than this value and 1 percent of the data is higher 
than this value.
-”Minimum“ is the lowest value observed during the specified period.
-“Maximum" is the highest value observed during the specified period.

In answer to your question regarding how custom metrics are pushed, in the 
documentation you have provided [2], it says that "Custom metrics in Kinesis 
Data Analytics use the Apache Flink metric system", Kinesis Data Analytics only 
report these metrics to CloudWatch as you define these custom metrics.

[1] 
https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/Statistics-definitions.html
[2] 
https://docs.aws.amazon.com/kinesisanalytics/latest/java/monitoring-metrics-custom.html

Regards,
Shane


Re: Question about Flink counters

2022-03-07 Thread Shane Bishop
Hi Dawid,

My team's Flink application's primary purpose is not to count the number of SQS 
messages received or the number of successful or failed S3 downloads. The 
application's primary purpose is to process events and the corresponding data, 
and for each event, create or update a new entry in our database with the 
result of this processing. (I can't really go into too much detail on what is 
processed and what results are produced because that is proprietary 
information.)

As you suggest, the counters for SQS messages and successful or unsuccessful 
downloads from S3 are just for the purposes of monitoring. We use these metrics 
to have an idea of how healthy our application is, and to help indicate which 
components may have faults. We are not using these counters to calculate 
results.

My team is trying to understand why we see inaccurate values for our metrics, 
with the intention of fixing the inaccuracies so we can better monitor our 
application.

I hope this helps to clarify the context of my inquiry.

Best,
Shane


Re: Question about Flink counters

2022-03-06 Thread Shane Bishop
Hi Zhanghao Chen,

Sure, I can give some context.

My team's Flink application runs as a Kinesis Data Analytics streaming 
application [1] in AWS.

Our application receives events from Amazon Simple Queue Service (SQS) [2] in 
our source, and then uses a property of the SQS event to download from Amazon 
S3 [3]. The external metrics system for our counters is Amazon CloudWatch 
metrics [4].

For both the SQS consumer source and our S3 downloader operator, we have a 
counter for number of received items, number of successfully processed items, 
and number of items that failed to process.

However, during testing we have found that the count for SQS events received 
and S3 downloads is much too high. The counts for our counters in CloudWatch is 
much higher than the number of records reported in the Flink dashboard.

The goal is that our metrics in CloudWatch should accurately reflect the number 
of SQS events received and successfully or unsuccessfully processed, and the 
number of S3 downloads that were attempted and succeeded or failed.

I am looking for help understanding why our counter values are inaccurate.

[1] https://docs.aws.amazon.com/kinesisanalytics/latest/java/what-is.html
[2] 
https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/welcome.html
[3] https://docs.aws.amazon.com/AmazonS3/latest/userguide/Welcome.html
[4] 
https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html


From: Zhanghao Chen 
Sent: March 5, 2022 11:11 PM
To: Shane Bishop ; user@flink.apache.org 

Subject: Re: Question about Flink counters

Hi Shane,

Could you share more information on what you would like to use the counter for?

The counter discussed here is primarily designed for exposing counts to 
external metric systems. Usually, each task would count on its own, and it is 
left for the external metric system (usu. a time series database) to do 
aggregations. Also, you cannot reference a counter from a different machine. 
I'm not sure if this is what you expected.

Best,
Zhanghao Chen

From: Shane Bishop 
Sent: Saturday, March 5, 2022 23:22
To: Zhanghao Chen ; user@flink.apache.org 

Subject: Re: Question about Flink counters

If I used a thread-safe counter implementation, would that be enough to make 
the count correct for a Flink cluster with multiple machines?

Best,
Shane

From: Zhanghao Chen 
Sent: March 4, 2022 11:08 PM
To: Shane Bishop ; user@flink.apache.org 

Subject: Re: Question about Flink counters

Hi Shane,

Flink provides a generic counter interface with a few implementations. The 
default implementation SimpleCounter, which is not thread-safe, is used when 
you calling counter(String name) on a MetricGroup. Therefore, you'll need to 
use your own thread-safe implementation, check out the second example of 
Metrics | Apache 
Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter>
 for reference.

Best,
Zhanghao Chen

From: Shane Bishop 
Sent: Saturday, March 5, 2022 5:24
To: user@flink.apache.org 
Subject: Question about Flink counters

Hi all,

For Flink counters [1], are increment operations guaranteed to be atomic across 
all parallel tasks? I.e., is there a guarantee that the counter values will not 
be higher than expected?

Thanks,
Shane

---
[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter


Re: Question about Flink counters

2022-03-05 Thread Shane Bishop
If I used a thread-safe counter implementation, would that be enough to make 
the count correct for a Flink cluster with multiple machines?

Best,
Shane

From: Zhanghao Chen 
Sent: March 4, 2022 11:08 PM
To: Shane Bishop ; user@flink.apache.org 

Subject: Re: Question about Flink counters

Hi Shane,

Flink provides a generic counter interface with a few implementations. The 
default implementation SimpleCounter, which is not thread-safe, is used when 
you calling counter(String name) on a MetricGroup. Therefore, you'll need to 
use your own thread-safe implementation, check out the second example of 
Metrics | Apache 
Flink<https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter>
 for reference.

Best,
Zhanghao Chen

From: Shane Bishop 
Sent: Saturday, March 5, 2022 5:24
To: user@flink.apache.org 
Subject: Question about Flink counters

Hi all,

For Flink counters [1], are increment operations guaranteed to be atomic across 
all parallel tasks? I.e., is there a guarantee that the counter values will not 
be higher than expected?

Thanks,
Shane

---
[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter


Question about Flink counters

2022-03-04 Thread Shane Bishop
Hi all,

For Flink counters [1], are increment operations guaranteed to be atomic across 
all parallel tasks? I.e., is there a guarantee that the counter values will not 
be higher than expected?

Thanks,
Shane

---
[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#counter


"No operators defined in streaming topology" error when Flink app still starts successfully

2022-02-14 Thread Shane Bishop
Hi all,

My team has started seeing the error "java.lang.IllegalStateException: No 
operators defined in streaming topology. Cannot execute." However, even with 
this error, the Flink application starts and runs fine, and the Flink job 
renders fine in the Flink Dashboard.

Attached is the full stacktrace.

This error comes from when we call 
StreamExecutionEnvironment#getExecutionPlan(). In the code snippet below, we 
call this method on the last line of the snippet.

>From poking around online, I found 
>https://stackoverflow.com/questions/54977290/flink-no-operators-defined-in-streaming-topology-cannot-execute,
> which suggests the problem could be that we don't set a sink, but in the code 
>below you will see we do set a sink (just maybe not in a way that 
>getExecutionPlan() expects).

Can this be safely ignored? Is there something we can do so that 
getExecutionPlan() will work properly, or otherwise fix/suppress this error?

Below is the code (some portions have been redacted):

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

DataStream source =
env.addSource(new RedactedType())
.uid()
.name()
.shuffle();

DataStream> stream =
AsyncDataStream.unorderedWait(source, new RedactedType(), 1, 
TimeUnit.MILLISECONDS)
.uid()
.name();

stream
.flatMap(new RedactedType())
.uid()
.name()
.flatMap(new RedactedType())
.uid()
.name()
.shuffle()
.addSink(new RedactedType()) // Set sink
.uid()
.name();

env.execute("");
logger.info("Started job; executionPlan={}", env.getExecutionPlan()); // line 66

Thanks,
Shane
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
application.
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:247)
at 
java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute 
application.
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3(JarRunOverrideHandler.java:238)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 6 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main 
method caused an error: No operators defined in streaming topology. Cannot 
execute.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at 
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
... 9 more
Caused by: java.lang.IllegalStateException: No operators defined in streaming 
topology. Cannot execute.
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:2019)
at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:2010)
at 

Re: Questions about Kryo setRegistrationRequired(false)

2022-02-08 Thread Shane Bishop
I tried ExecutionConfig#disableGenericTypes and I get this error when I try to 
execute my Flink job with StreamExecutionEnvironment#execute (attached is the 
full stacktrace):

java.lang.UnsupportedOperationException: Generic types have been disabled in 
the ExecutionConfig and type java.util.List is treated as a generic type.

I suppose then there is nothing I can do at this time other than wait for the 
ticket I filed [1] to be resolved. I am assuming if generic types are disabled, 
then UnsupportedOperationException with be thrown for all generic types, 
regardless of whether the generic type is registered with Kryo or not.

[1] https://issues.apache.org/jira/browse/FLINK-25993

Best regards,
Shane


From: Chesnay Schepler 
Sent: February 7, 2022 3:08 AM
To: Shane Bishop ; user@flink.apache.org 

Subject: Re: Questions about Kryo setRegistrationRequired(false)

There isn't any setting to control setRegistrationRequired().

You can however turn Kryo off via ExecutionConfig#disableGenericTypes, although 
this may require changes to your data types.

I'd recommend to file a ticket.

On 04/02/2022 20:12, Shane Bishop wrote:
Hi all,

TL;DR: I am concerned that kryo.setRegistrationRequired(false) in Apache Flink 
might introduce serialization/deserialization vulnerabilities, and I want to 
better understand the security implications of its use in Flink.

There is an issue on the Kryo GitHub repo 
(link<https://github.com/EsotericSoftware/kryo/issues/398>) regarding type 
registration. The "fix" the Kryo developers made was to make 
setRegistrationRequired(true) the default (comment on GitHub 
issue<https://github.com/EsotericSoftware/kryo/issues/398#issuecomment-371153541>,
 commit with this 
fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00>
 and the line in the commit that is the 
fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00#diff-6d4638ca49aa0d0d9171ff04a0faa22e241f8320fda4a8a12c95853188d055a0R130>).

This is not a true fix, as the default can still be overridden. This only sets 
a safe default.

In Flink, the default of true is overridden in the 1.14.3 Flink release (see 
KryoSerializer.java<https://github.com/apache/flink/blob/release-1.14.3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L492>
 and 
FlinkScalaKryoInstantiator.scala<https://github.com/apache/flink/blob/release-1.14.3/flink-scala/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala#L46>).

I am no Flink contributor, so I might be missing safety mechanisms that are in 
place to prevent the Kryo serialization/deserialization vulnerability even when 
registration required is set to false. Are there any such safety mechanisms in 
place?

Is there anything I can do as a user of Flink to protect myself against this 
Kryo vulnerability?

Best regards,
Shane Bishop

org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute 
application.
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$4(JarRunOverrideHandler.java:247)
at 
java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)
at 
java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)
at 
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at 
java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.util.concurrent.CompletionException: 
org.apache.flink.util.FlinkRuntimeException: Could not execute application.
at 
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at 
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at 
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
... 6 more
Caused by: org.apache.flink.util.FlinkRuntimeException: Could not execute 
application.
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:88)
at 
org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunOverrideHandler.lambda$handleRequest$3

Re: Questions about Kryo setRegistrationRequired(false)

2022-02-07 Thread Shane Bishop
Thanks Chesnay Schepler.

I filed a ticket: https://issues.apache.org/jira/browse/FLINK-25993

My team will try disabling Kyro with ExecutionConfig#disableGenericTypes and 
see if we need to change our data types or not.

Best regards,
Shane


From: Chesnay Schepler 
Sent: February 7, 2022 3:08 AM
To: Shane Bishop ; user@flink.apache.org 

Subject: Re: Questions about Kryo setRegistrationRequired(false)

There isn't any setting to control setRegistrationRequired().

You can however turn Kryo off via ExecutionConfig#disableGenericTypes, although 
this may require changes to your data types.

I'd recommend to file a ticket.

On 04/02/2022 20:12, Shane Bishop wrote:
Hi all,

TL;DR: I am concerned that kryo.setRegistrationRequired(false) in Apache Flink 
might introduce serialization/deserialization vulnerabilities, and I want to 
better understand the security implications of its use in Flink.

There is an issue on the Kryo GitHub repo 
(link<https://github.com/EsotericSoftware/kryo/issues/398>) regarding type 
registration. The "fix" the Kryo developers made was to make 
setRegistrationRequired(true) the default (comment on GitHub 
issue<https://github.com/EsotericSoftware/kryo/issues/398#issuecomment-371153541>,
 commit with this 
fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00>
 and the line in the commit that is the 
fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00#diff-6d4638ca49aa0d0d9171ff04a0faa22e241f8320fda4a8a12c95853188d055a0R130>).

This is not a true fix, as the default can still be overridden. This only sets 
a safe default.

In Flink, the default of true is overridden in the 1.14.3 Flink release (see 
KryoSerializer.java<https://github.com/apache/flink/blob/release-1.14.3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L492>
 and 
FlinkScalaKryoInstantiator.scala<https://github.com/apache/flink/blob/release-1.14.3/flink-scala/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala#L46>).

I am no Flink contributor, so I might be missing safety mechanisms that are in 
place to prevent the Kryo serialization/deserialization vulnerability even when 
registration required is set to false. Are there any such safety mechanisms in 
place?

Is there anything I can do as a user of Flink to protect myself against this 
Kryo vulnerability?

Best regards,
Shane Bishop



Questions about Kryo setRegistrationRequired(false)

2022-02-04 Thread Shane Bishop
Hi all,

TL;DR: I am concerned that kryo.setRegistrationRequired(false) in Apache Flink 
might introduce serialization/deserialization vulnerabilities, and I want to 
better understand the security implications of its use in Flink.

There is an issue on the Kryo GitHub repo 
(link<https://github.com/EsotericSoftware/kryo/issues/398>) regarding type 
registration. The "fix" the Kryo developers made was to make 
setRegistrationRequired(true) the default (comment on GitHub 
issue<https://github.com/EsotericSoftware/kryo/issues/398#issuecomment-371153541>,
 commit with this 
fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00>
 and the line in the commit that is the 
fix<https://github.com/EsotericSoftware/kryo/commit/fc7f0cc7037ff1384b4cdac5b7ada287c64f0a00#diff-6d4638ca49aa0d0d9171ff04a0faa22e241f8320fda4a8a12c95853188d055a0R130>).

This is not a true fix, as the default can still be overridden. This only sets 
a safe default.

In Flink, the default of true is overridden in the 1.14.3 Flink release (see 
KryoSerializer.java<https://github.com/apache/flink/blob/release-1.14.3/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java#L492>
 and 
FlinkScalaKryoInstantiator.scala<https://github.com/apache/flink/blob/release-1.14.3/flink-scala/src/main/scala/org/apache/flink/runtime/types/FlinkScalaKryoInstantiator.scala#L46>).

I am no Flink contributor, so I might be missing safety mechanisms that are in 
place to prevent the Kryo serialization/deserialization vulnerability even when 
registration required is set to false. Are there any such safety mechanisms in 
place?

Is there anything I can do as a user of Flink to protect myself against this 
Kryo vulnerability?

Best regards,
Shane Bishop