[ANNOUNCE] Apache Flink Kafka Connectors 3.0.2 released
The Apache Flink community is very happy to announce the release of Apache Flink Kafka Connectors 3.0.2. This release is compatible with the Apache Flink 1.17.x and 1.18.x release series. Apache Flink® is an open-source stream processing framework for distributed, high-performing, always-available, and accurate data streaming applications. The release is available for download at: https://flink.apache.org/downloads.html The full release notes are available in Jira: https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12353768 We would like to thank all contributors of the Apache Flink community who made this release possible! Regards, Gordon
Re: Getting java.lang.ClassNotFoundException in Tests (Flink 1.18.0)
Hi Barak, The missing class is in "flink-core", I think adding that dependency will provide it. The release notes for 1.14 note that Connectors no longer bundle "flink-core". I imagine this is what has caused your issue. https://nightlies.apache.org/flink/flink-docs-release-1.18/release-notes/flink-1.14/#connector-base-exposes-dependency-to-flink-core Cheers, Jim On Fri, Dec 1, 2023 at 3:30 PM Barak Ben-Nathan wrote: > > hi, > > I am trying to upgrade my app to Flink 1.18. > > I have tests that run my stream from/to Embedded (in-memory) Kafka. > I.e. They create a flink cluster thus: > > val flinkCluster = new MiniClusterWithClientResource(new > MiniClusterResourceConfiguration.Builder() > .setNumberSlotsPerTaskManager(2) > .setNumberTaskManagers(1) > .build) > > before { > flinkCluster.before() > } > > after { > flinkCluster.after() > } > > get an execution env thus: > > val env = StreamExecutionEnvironment.getExecutionEnvironment > > build the pipeline and execute it. > > These tests fail due to: > > org.apache.flink.runtime.client.JobInitializationException: Could not > start the JobMaster. > > at > org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) > > at > java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > > at > java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > > at > java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) > > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > > at java.base/java.lang.Thread.run(Thread.java:829) > > Caused by: java.util.concurrent.CompletionException: > java.lang.RuntimeException: java.lang.ClassNotFoundException: > org.apache.flink.api.common.ExecutionConfig > > at > java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) > > at > java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) > > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) > > ... 3 more > > Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: > org.apache.flink.api.common.ExecutionConfig > > at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) > > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) > > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) > > ... 3 more > > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.api.common.ExecutionConfig > > at > java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) > > at > java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) > > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527) > > at java.base/java.lang.Class.forName0(Native Method) > > at java.base/java.lang.Class.forName(Class.java:398) > > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) > > at > java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003) > > at > java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870) > > at > java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2201) > > at > java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) > > at > java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) > > at > java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) > > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539) > > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527) > > at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67) > > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:101) > > at > org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122) > > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) > > at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:356) > > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) > > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) > > at > org.apac
Fwd: Getting java.lang.ClassNotFoundException in Tests (Flink 1.18.0)
hi, I am trying to upgrade my app to Flink 1.18. I have tests that run my stream from/to Embedded (in-memory) Kafka. I.e. They create a flink cluster thus: val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder() .setNumberSlotsPerTaskManager(2) .setNumberTaskManagers(1) .build) before { flinkCluster.before() } after { flinkCluster.after() } get an execution env thus: val env = StreamExecutionEnvironment.getExecutionEnvironment build the pipeline and execute it. These tests fail due to: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) ... 3 more Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) ... 3 more Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527) at java.base/java.lang.Class.forName0(Native Method) at java.base/java.lang.Class.forName(Class.java:398) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003) at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870) at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2201) at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489) at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527) at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67) at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:101) at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122) at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379) at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:356) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128) at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100) at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) ... 4 more My flink dependencies are: val flinkDependencies = Seq( // "org.apache.flink" %% "flink-scala" % flinkVersion % Provided, "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided, // "org.apache.flink" % "flink-streaming-java" % flinkVersion % Provided, "org.apache.flink" % "flink-connector-base" % "1.18.0", "org.apache.flink" % "flink-connector-kafka" % "3.0.1-1.18") val testFlinkDependencies = Seq( // flink-test-utils brings log4j-slf4j-impl, which causes java.lang.NoSuchMethodError when running tests // https://stackoverflow.com/questio
inputQueueSize metric
Regarding the inputQueueSize metric. What does it express after all? Observing the inputQueueSize and inPoolUsage metrics in Prometheus, I found that when the inPoolUsage is high, the inputQueueSize is low. Thisobservation does not match the description of the inputQueueSize. Therefore, does this metric express the size of the input buffers of each task, or does it express how full the input buffers of a task are? Thank you in advance.
Re: Flink Kubernetes Operator: Why one Helm repo for each version?
Ok, thanks for the clarification. On Fri, Dec 1, 2023 at 2:05 PM Gyula Fóra wrote: > Hi! > > I already answered your question on slack : > > > “The main reason is that this allows us to completely separate release > resources etc. much easier for the release process > > But this could be improved in the future if there is a good proposal for > the process” > > Please do not cross post questions between slack and ml immediately, it’s > more respectful of the contributors’ time if you only ask in a single place > at a time. > > Cheers > Gyula > > On Fri, 1 Dec 2023 at 13:31, Salva Alcántara > wrote: > >> Hi! I'm curious why there is not a single repo URL for the Flink >> Kubernetes Operator Helm Chart, but multiples ones, one for each version. >> This forces users to add one repo for each version, like this (directly >> from the docs): >> >> ``` >> helm repo add flink-operator-repo >> https://downloads.apache.org/flink/flink-kubernetes-operator- >> / >> helm install flink-kubernetes-operator >> flink-operator-repo/flink-kubernetes-operator >> ``` >> >> This is weird and different from what I'm used to where you add the >> (version-less) repo once and then install whatever version you need from >> it... >> >
Re: Flink Kubernetes Operator: Why one Helm repo for each version?
Hi! I already answered your question on slack : “The main reason is that this allows us to completely separate release resources etc. much easier for the release process But this could be improved in the future if there is a good proposal for the process” Please do not cross post questions between slack and ml immediately, it’s more respectful of the contributors’ time if you only ask in a single place at a time. Cheers Gyula On Fri, 1 Dec 2023 at 13:31, Salva Alcántara wrote: > Hi! I'm curious why there is not a single repo URL for the Flink > Kubernetes Operator Helm Chart, but multiples ones, one for each version. > This forces users to add one repo for each version, like this (directly > from the docs): > > ``` > helm repo add flink-operator-repo > https://downloads.apache.org/flink/flink-kubernetes-operator- > / > helm install flink-kubernetes-operator > flink-operator-repo/flink-kubernetes-operator > ``` > > This is weird and different from what I'm used to where you add the > (version-less) repo once and then install whatever version you need from > it... >
Flink Kubernetes Operator: Why one Helm repo for each version?
Hi! I'm curious why there is not a single repo URL for the Flink Kubernetes Operator Helm Chart, but multiples ones, one for each version. This forces users to add one repo for each version, like this (directly from the docs): ``` helm repo add flink-operator-repo https://downloads.apache.org/flink/flink-kubernetes-operator- / helm install flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator ``` This is weird and different from what I'm used to where you add the (version-less) repo once and then install whatever version you need from it...
Re: Profiling on flink jobs
I missed the Reply All button in my previous message. Here's my previous email for the sake of transparency sent to the user ML once more: Hi Oscar, sorry for the late reply. I didn't see that you posted the question at the beginning of the month already. I used jmap [1] in the past to get some statistics out and generate *.hprof files. I haven't looked into creating dump files as documented in [2]. env.java.opts.all will be passed to each java process that's triggered within Apache Flink. "dumponexit" (which is used in the documented code parameter list) suggests that the dump file would be created when the JVM process exits. Without any more detailed investigation on how the Java Flight Recorder works, I'd assume that a *.hprof file should be created when killing the JobManager/TaskManager process rather than cancelling an individual job. Cancelling the job should only trigger this file creation if you're using Flink in Application Mode because terminating the job would trigger the shutdown of the Flink cluster entirely in that case. Best, Matthias [1] https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/tooldescr014.html [2] https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/application_profiling/ On Thu, Nov 9, 2023 at 9:39 AM Oscar Perez via user wrote: > hi [image: :wave:] I am trying to do profiling on one of our flink jobs > according to these docs: > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/application_profiling/We > are using OpenJDK 8.0. I am adding this line to the flink properties file > in docker-compose: > > env.java.opts.all: "-XX:+UnlockCommercialFeatures > -XX:+UnlockDiagnosticVMOptions -XX:+FlightRecorder -XX:+DebugNonSafepoints > -XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=/tmp/dump.jfr" > > I would expect to see the dump.jfr file created once I cancel the job but > unfortunately I dont see anything created. How can I manage to get a valid > profile file? Thanks! > Regards, > Oscar >