Re: Kryo version 2.24.0 from 2015?
It sounds like the better option is to make a Flink bridge release that runs both Kryo v2 and Kryo v5 side-by-side. The code base for the Flink bridge release should only use Kryo v2 for deserializing legacy data, and use Kryo v5 for serializing+deserializing new data. User APIs registering custom serializers would have a breaking change to take both v2+v5 custom serializers, rather than v2 only. This would be a breaking API change, but it's the least bad choice. Staying on Kryo v2 is not a good option. This approach wouldn't require application downtime beyond what is normal for Flink upgrades. It would require a breaking API change to custom serializers but that is unavoidable. Then, in a future Flink release, you can drop Kryo v2 entirely, and be Kryo v5 only. On Thu, Feb 9, 2023 at 11:21 AM Chesnay Schepler wrote: > > Can't programmers just code up migration tools to the current version of > Kryo or whatever serialization platform you choose? > > Well yes, if someone writes a tool to implement a reasonable migration > path than we may be able to upgrade Kryo. > Until that happens we are blocked on upgrading Kryo. > > > Versions older than 3.x aren't supposed to compile nor run correctly > under Java 11+. > > In fact our Java 17 support is currently blocked by an issue that we > suspect is related to Kryo. > https://issues.apache.org/jira/browse/FLINK-24998 > > > I'd presume you would make a tool to upgrade files with Kryo persisted > state in savepoints and checkpoints > > That doesn't cover everything and may not necessarily be a viable approach. > > Kryo is exposed a fair bit in our APIs (mistakes of the past...) so users > that have custom Serializers might also have to change things. > Upgrading Kryo is thus also an API breaking change. > > As for viability, such an approach implies taking down the application on > one Flink version, converting the state, and restarting the job on a newer > Flink version. > This implies a certain amount of downtime for the application, which > depending on the state size may just not be acceptable to a user. > Having to migrate the savepoint and upgrading Flink at the same time is > also not ideal since it makes the effort more complicated; being able to > run the job on the same Flink version with a different Kryo version would > make things easier, but that'd mean we have to be able to run 2 Kryo > versions in parallel. > > Something else to consider is when we already break everything to upgrade > Kryo, then maybe things should be re-written such that upgrading Kryo isn't > such a problem in the future; in essence reworking how Kryo is integrated > into Flink. > > That said, the v5 migration guide is quite interesting; specifically that > Kryo offers a versioned jar. > > On 09/02/2023 17:32, Clayton Wohl wrote: > > What do you mean you are blocked? Can't programmers just code up migration > tools to the current version of Kryo or whatever serialization platform you > choose? > > Can't you follow the Kryo migration guide that supports loading data > serialized with Kryo v2 and reserializing with Kryo v5? > https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5 > > I'd presume you would make a tool to upgrade files with Kryo persisted > state in savepoints and checkpoints, that would allow for users to register > custom serializers. I also presume that new versions of Flink would > politely refuse to start with old format state files and require the > migration process to be completed. > > Kryo v2 also pulls in objenesis v2.1 from 2013, before Java 8. Versions > older than 3.x aren't supposed to compile nor run correctly under Java 11+. > > > > On Thu, Feb 9, 2023 at 2:34 AM Chesnay Schepler > wrote: > >> > you can't reasonably stay on the 2015 version forever, refuse to >> adopt any of the updates or fixes in the 8 years since then, and >> reasonably expect things to continue to work well. >> >> We are well aware that Kryo is a ticking time bomb. >> >> > Is there any possibility a future release of Flink can upgrade to a >> recent version of Kryo serialization? >> >> Of course there is, but someone needs to figure out a way to do this >> without breaking everything or providing a reasonable upgrade path, >> which has been blocking us so far. >> >> On 09/02/2023 07:34, Clayton Wohl wrote: >> > I've noticed the latest Flink is using the Kryo serializer library >> > version 2.24.0 which is back from 2015! >> > >> > The Kryo project is actively maintained, it's on version 5.4.0, so >> > 2.24.0 is really quite ancient. I presume the concern is maintaining >> > compatibility with persisted savepoints. That's a valid concern, but >> > you can't reasonably stay on the 2015 version forever, refuse to adopt >> > any of the updates or fixes in the 8 years since then, and reasonably >> > expect things to continue to work well. >> > >> > Is there any possibility a future release of Flink can upgrade to a >> > recent version of Kryo serialization? >> > >> > >> > >> >> >
Re: Kryo version 2.24.0 from 2015?
What do you mean you are blocked? Can't programmers just code up migration tools to the current version of Kryo or whatever serialization platform you choose? Can't you follow the Kryo migration guide that supports loading data serialized with Kryo v2 and reserializing with Kryo v5? https://github.com/EsotericSoftware/kryo/wiki/Migration-to-v5 I'd presume you would make a tool to upgrade files with Kryo persisted state in savepoints and checkpoints, that would allow for users to register custom serializers. I also presume that new versions of Flink would politely refuse to start with old format state files and require the migration process to be completed. Kryo v2 also pulls in objenesis v2.1 from 2013, before Java 8. Versions older than 3.x aren't supposed to compile nor run correctly under Java 11+. On Thu, Feb 9, 2023 at 2:34 AM Chesnay Schepler wrote: > > you can't reasonably stay on the 2015 version forever, refuse to > adopt any of the updates or fixes in the 8 years since then, and > reasonably expect things to continue to work well. > > We are well aware that Kryo is a ticking time bomb. > > > Is there any possibility a future release of Flink can upgrade to a > recent version of Kryo serialization? > > Of course there is, but someone needs to figure out a way to do this > without breaking everything or providing a reasonable upgrade path, > which has been blocking us so far. > > On 09/02/2023 07:34, Clayton Wohl wrote: > > I've noticed the latest Flink is using the Kryo serializer library > > version 2.24.0 which is back from 2015! > > > > The Kryo project is actively maintained, it's on version 5.4.0, so > > 2.24.0 is really quite ancient. I presume the concern is maintaining > > compatibility with persisted savepoints. That's a valid concern, but > > you can't reasonably stay on the 2015 version forever, refuse to adopt > > any of the updates or fixes in the 8 years since then, and reasonably > > expect things to continue to work well. > > > > Is there any possibility a future release of Flink can upgrade to a > > recent version of Kryo serialization? > > > > > > > >
Kryo version 2.24.0 from 2015?
I've noticed the latest Flink is using the Kryo serializer library version 2.24.0 which is back from 2015! The Kryo project is actively maintained, it's on version 5.4.0, so 2.24.0 is really quite ancient. I presume the concern is maintaining compatibility with persisted savepoints. That's a valid concern, but you can't reasonably stay on the 2015 version forever, refuse to adopt any of the updates or fixes in the 8 years since then, and reasonably expect things to continue to work well. Is there any possibility a future release of Flink can upgrade to a recent version of Kryo serialization?
Re: Flink 1.16.0: java.lang.NoSuchMethodException: org.apache.flink.metrics.prometheus.PrometheusReporter.()
I had to change this configuration: metrics.reporter.prom.class: "org.apache.flink.metrics.prometheus.PrometheusReporter" to this for Flink 1.16: metrics.reporter.prom.factory.class: "org.apache.flink.metrics.prometheus.PrometheusReporterFactory" Someone emailed me this fix directly. It works! Thank you :) On Wed, Nov 23, 2022 at 4:39 PM Clayton Wohl wrote: > When upgrading an application from Flink 1.14.6 to Flink 1.16.0, I get the > following exception: > > ERROR org.apache.flink.runtime.metrics.ReporterSetup - Could not > instantiate metrics reporter prom. Metrics might not be exposed/reported. > > java.lang.InstantiationException: > org.apache.flink.metrics.prometheus.PrometheusReporter > > at java.lang.Class.newInstance(Unknown Source) ~[?:?] > > at > org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:467) > ~[flink-runtime-1.16.0.jar:1.16.0] > > at > org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:409) > ~[flink-runtime-1.16.0.jar:1.16.0] > > at > org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:328) > ~[flink-runtime-1.16.0.jar:1.16.0] > > at > org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:209) > ~[flink-runtime-1.16.0.jar:1.16.0] > > at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManagerRunnerServices(TaskManagerRunner.java:223) > ~[flink-runtime-1.16.0.jar:1.16.0] > > at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:288) > ~[flink-runtime-1.16.0.jar:1.16.0] > > at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:481) > ~[flink-runtime-1.16.0.jar:1.16.0] > > at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$5(TaskManagerRunner.java:525) > ~[flink-runtime-1.16.0.jar:1.16.0] > > at > org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) > ~[flink-runtime-1.16.0.jar:1.16.0] > > at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:525) > ~[flink-runtime-1.16.0.jar:1.16.0] > > at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:505) > ~[flink-runtime-1.16.0.jar:1.16.0] > > at > org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:463) > ~[flink-runtime-1.16.0.jar:1.16.0] > > Caused by: java.lang.NoSuchMethodException: > org.apache.flink.metrics.prometheus.PrometheusReporter.() > > at java.lang.Class.getConstructor0(Unknown Source) ~[?:?] > > ... 13 more > > > > Has the method mentioned been removed or changed in 1.16.0? > > > If it matters, I'm running this on Kubernetes with the Spotify Flink > Operator. >
Apache vs Spotify Flink Operator?
At my job, we are using the Spotify Flink Operator in production. Are there any pros/cons of this Spotify Flink Operator versus the Apache Flink Operator? We are particularly interested in the forthcoming autoscaling functionality, but I understand that functionality isn't ready yet. Are there any other advantages in the current version? Apache Flink Operator 1.2.0 says it adds support for standalone deployment mode, but still recommends native deployment mode. Which deployment mode does the Spotify Flink Operator use? Is there any way I can see the deployment mode from the GUI or the logs?
Flink 1.16.0: java.lang.NoSuchMethodException: org.apache.flink.metrics.prometheus.PrometheusReporter.()
When upgrading an application from Flink 1.14.6 to Flink 1.16.0, I get the following exception: ERROR org.apache.flink.runtime.metrics.ReporterSetup - Could not instantiate metrics reporter prom. Metrics might not be exposed/reported. java.lang.InstantiationException: org.apache.flink.metrics.prometheus.PrometheusReporter at java.lang.Class.newInstance(Unknown Source) ~[?:?] at org.apache.flink.runtime.metrics.ReporterSetup.loadViaReflection(ReporterSetup.java:467) ~[flink-runtime-1.16.0.jar:1.16.0] at org.apache.flink.runtime.metrics.ReporterSetup.loadReporter(ReporterSetup.java:409) ~[flink-runtime-1.16.0.jar:1.16.0] at org.apache.flink.runtime.metrics.ReporterSetup.setupReporters(ReporterSetup.java:328) ~[flink-runtime-1.16.0.jar:1.16.0] at org.apache.flink.runtime.metrics.ReporterSetup.fromConfiguration(ReporterSetup.java:209) ~[flink-runtime-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManagerRunnerServices(TaskManagerRunner.java:223) ~[flink-runtime-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.start(TaskManagerRunner.java:288) ~[flink-runtime-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManager(TaskManagerRunner.java:481) ~[flink-runtime-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.lambda$runTaskManagerProcessSecurely$5(TaskManagerRunner.java:525) ~[flink-runtime-1.16.0.jar:1.16.0] at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28) ~[flink-runtime-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:525) ~[flink-runtime-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.runTaskManagerProcessSecurely(TaskManagerRunner.java:505) ~[flink-runtime-1.16.0.jar:1.16.0] at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.main(TaskManagerRunner.java:463) ~[flink-runtime-1.16.0.jar:1.16.0] Caused by: java.lang.NoSuchMethodException: org.apache.flink.metrics.prometheus.PrometheusReporter.() at java.lang.Class.getConstructor0(Unknown Source) ~[?:?] ... 13 more Has the method mentioned been removed or changed in 1.16.0? If it matters, I'm running this on Kubernetes with the Spotify Flink Operator.
NativeIoException PartitionRequestQueue - Encountered error while consuming partitions
I have a streaming Flink job that runs 24/7 on a Kubernetes cluster hosted in AWS. Every few weeks or sometimes months, the job fails down with network errors like the following error in the logs. This is with Flink 1.14.5. Is there anything that I can do to help my application automatically retry and recover from this type of error. Do newer versions of Flink possibly make this issue any better? org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer 19:13:57.893 [Flink Netty Server (0) Thread 0] ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered error while consuming partitions org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer 19:13:57.894 [Flink Netty Server (0) Thread 0] ERROR org.apache.flink.runtime.io.network.netty.PartitionRequestQueue - Encountered error while consuming partitions org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer I see several questions similar to this on stackoverflow with no helpful answers. Thank you for any help.
Re: [DISCUSS] FLIP-265 Deprecate and remove Scala API support
+1 At my employer, we maintain several Flink jobs in Scala. We've been writing newer jobs in Java, and we'd be fine with porting our Scala jobs over to the Java API. I'd like to request Java 17 support. Specifically, Java records is a feature our Flink code would use a lot of and make the Java syntax much nicer.
Java 17 Support?
May I ask if there are any plans to support Java 17 in a future version of Flink? I see this ticket, but it is inactive: https://issues.apache.org/jira/browse/FLINK-15736 There are several Java 17 features that would help in writing+supporting Flink applications. Thank you :)
Flink 1.14.4 -> 1.15.0 Upgrade Problem
I have a Flink job that has been running with Flink 1.14.4 perfectly for a few months. I tried upgrading to Flink 1.15.0. There are no error messages or exceptions, it runs perfectly fine for several hours, but after a few hours the Flink app starts to lag in processing an input Kafka topic. I can see the lag grow linearly in my Grafana dashboards that track Kafka lag. The lag continues to grow indefinitely until I manually restart the Flink job, then the Flink job will catch up with old data, the lag will drop to zero, the application will run fine for several hours, and then the lag issue will happen again and lag will steadily grow until I manually restart the Flink job. When I revert the application back to Flink 1.14.4, this lag issue completely goes away. I see no runtime errors or exceptions. A few quick environment details: - The Kafka brokers are running Kafka 2.8.1 - The Flink app is running on Kubernetes with the Spotify Flink Operator - The Flink code is Java using the newer KafkaSource/KafkaSink API, not the older KafkaConsumer/KafkaProduer API. The Flink app consumes from seven input Kafka topics, and for each distinct input topic, writes output values to a distinct output topic. Most of the processing happens within a RichAsyncFunction which does some processing against an external database. The lag issue mentioned here happens on different topics. And if I let the app run long enough, it will happen on multiple topics. Also, when the lag issue is happening, the app is still processing records on the affected topics. For some reason it's processing fewer record slower than the incoming message rate, which is the definition of lag. But clearly, the lag isn't caused by resources, but by a software bug within Flink. I intend to keep this job running Flink 1.14.4 until a Flink 1.15.1 patch comes out that supposedly addresses this issue. This job is not using or requiring any new Flink 1.15.0 functionality. However, we prefer to use the newest versions when we can. Switching Flink versions is just changing Maven dependencies, changing the base Flink Docker image version, and the Flink version tag specified to the Kubernetes Spotify Operator. I was hoping this report would help the flink developers with a heads up that there is a new bug introduced in 1.15.0. If there is anything I should try, let me know. Thanks :)
Save app-global cache used by RichAsyncFunction to Flink State?
Is there any way to save a custom application-global cache into Flink state so that it is used with checkpoints + savepoints? This cache is used by a RichAsyncFunction that queries an external database, and RichAsyncFunction doesn't support the Flink state functionality directly. I asked this last week but got no answers. I wanted to ask a second time. Thank you :)
RichAsyncFunction + Cache or Map State?
I have a RichAsyncFunction that does async queries to an external database. I'm using a Guava cache within the Flink app. I'd like this Guava cache to be serialized with the rest of Flink state in checkpoint/savepoints. However, RichAsyncFunction doesn't support the state functionality at all. There is one Guava cache for the entire Flink app which might make this scenario simpler. Is there a recommended way to handle this situation? Also, the Flink MapState interface doesn't support check-and-set type functionality and doesn't support lock-free concurrent use like java.util.concurrent.ConcurrentMap and Guava's cache do. I need both of these features for proper concurrent operation. So even if I could use Flink MapState, that doesn't seem like a practical solution.
Re: Log4j2 Issues
sorry. nevermind. I see the Flink operator overrides with JVM argument configuration. JVM Options: -Dlog.file=/opt/flink/log/flink--client-4b57ba2b2597.log -Dlog4j.configuration=file:/opt/flink/conf/log4j-cli.properties -Dlog4j.configurationFile=file:/opt/flink/conf/log4j-cli.properties -Dlogback.configurationFile=file:/opt/flink/conf/logback.xml that all makes sense. thank you. On Fri, Jan 28, 2022 at 4:02 PM Clayton Wohl wrote: > When I run my Flink app via the Google Flink Operator, log4j2 logging > doesn't show up in logs. System.out.println messages work. > > Everything should be very plain-vanilla-standard setup. I have a > log4j2.xml config file in my classpath included in my application .jar > file. I'm building a custom Docker image based on > `flink:1.14.3-scala_2.12-java11` that simply copies my application .jar and > all transitive dependencies to `/opt/flink/lib`. My app is built with the > same `org.apache.logging.log4j:log4j-core:2.17.1` dependency that Flink > 1.14.3 uses. > > If I shell into my docker image and run `java -cp "/opt/flink/lib/*" > myapp.LoggingTest`, the logging works, I see it on the console. When I > submit this via the Google Flink Kubernetes Operator, I don't see the > logging output, but I do see println output. > > If I change my app to use logback, I see that output, which is weird. How > do I troubleshoot the log4j2 option? Or should I just use logback if that > works? For a long running Flink job, I just want to see startup+config > logs + exception logs. > > thank you :) >
Log4j2 Issues
When I run my Flink app via the Google Flink Operator, log4j2 logging doesn't show up in logs. System.out.println messages work. Everything should be very plain-vanilla-standard setup. I have a log4j2.xml config file in my classpath included in my application .jar file. I'm building a custom Docker image based on `flink:1.14.3-scala_2.12-java11` that simply copies my application .jar and all transitive dependencies to `/opt/flink/lib`. My app is built with the same `org.apache.logging.log4j:log4j-core:2.17.1` dependency that Flink 1.14.3 uses. If I shell into my docker image and run `java -cp "/opt/flink/lib/*" myapp.LoggingTest`, the logging works, I see it on the console. When I submit this via the Google Flink Kubernetes Operator, I don't see the logging output, but I do see println output. If I change my app to use logback, I see that output, which is weird. How do I troubleshoot the log4j2 option? Or should I just use logback if that works? For a long running Flink job, I just want to see startup+config logs + exception logs. thank you :)
Re: Custom Kafka Keystore on Amazon Kinesis Analytics
Custom code can create subclasses of FlinkKafkaConsumer, because the constructors are public. Custom code can't create subclasses of KafkaSource because the constructors are package private. So the same solution of creating code subclasses won't work for KafkaSource. Thank you for the response :) On Mon, Jan 10, 2022 at 6:22 AM Piotr Nowojski wrote: > Hi Clayton, > > I think in principle this example should be still valid, however instead > of providing a `CustomFlinkKafkaConsumer` and overriding it's `open` > method, you would probably need to override > `org.apache.flink.connector.kafka.source.reader.KafkaSourceReader#start`. > So you would most likely need both at the very least a custom > `KafkaSourceReader` and `KafkaSource` to instantiate your custom > `KafkaSourceReader`. But I'm not sure if anyone has ever tried this so far. > > Best, > Piotrek > > pt., 7 sty 2022 o 21:18 Clayton Wohl napisaĆ(a): > >> If I want to migrate from FlinkKafkaConsumer to KafkaSource, does the >> latter support this: >> >> >> https://docs.aws.amazon.com/kinesisanalytics/latest/java/example-keystore.html >> >> Basically, I'm running my Flink app in Amazon's Kinesis Analytics hosted >> Flink environment. I don't have reliable access to the local file system. >> At the documentation link above, Amazon recommends adding a hook to copy >> the keystore files from the classpath to a /tmp directory at runtime. Can >> KafkaSource do something similar? >> >
Request: Java 17 Support?
Are there any plans for Flink to support Java 17 and provide Java 17-based Docker images? There are a variety of new language/VM features we'd like to use and we were hoping Flink would support Java 17. thanks Kurt
Feature Request: Upgrade Kafka Library
The latest version of flink-connector-kafka, still uses kafka-clients 2.4.1. There have been a lot of upgrades in the Kafka consumer/producer library since then. May I request that the Flink project upgrade to a recent version of the Kafka library? thanks!
Custom Kafka Keystore on Amazon Kinesis Analytics
If I want to migrate from FlinkKafkaConsumer to KafkaSource, does the latter support this: https://docs.aws.amazon.com/kinesisanalytics/latest/java/example-keystore.html Basically, I'm running my Flink app in Amazon's Kinesis Analytics hosted Flink environment. I don't have reliable access to the local file system. At the documentation link above, Amazon recommends adding a hook to copy the keystore files from the classpath to a /tmp directory at runtime. Can KafkaSource do something similar?