[ANNOUNCE] Apache Flink Kafka Connectors 3.0.2 released

2023-12-01 Thread Tzu-Li (Gordon) Tai
The Apache Flink community is very happy to announce the release of Apache
Flink Kafka Connectors 3.0.2. This release is compatible with the Apache
Flink 1.17.x and 1.18.x release series.

Apache Flink® is an open-source stream processing framework for
distributed, high-performing, always-available, and accurate data streaming
applications.

The release is available for download at:
https://flink.apache.org/downloads.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522&version=12353768

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gordon


Re: Getting java.lang.ClassNotFoundException in Tests (Flink 1.18.0)

2023-12-01 Thread Jim Hughes via user
Hi Barak,

The missing class is in "flink-core", I think adding that dependency will
provide it.

The release notes for 1.14 note that Connectors no longer bundle
"flink-core".  I imagine this is what has caused your issue.
https://nightlies.apache.org/flink/flink-docs-release-1.18/release-notes/flink-1.14/#connector-base-exposes-dependency-to-flink-core

Cheers,

Jim

On Fri, Dec 1, 2023 at 3:30 PM Barak Ben-Nathan 
wrote:

>
> hi,
>
> I am trying to upgrade my app to Flink 1.18.
>
> I have tests that run my stream from/to Embedded (in-memory) Kafka.
> I.e. They create a flink cluster thus:
>
> val flinkCluster = new MiniClusterWithClientResource(new 
> MiniClusterResourceConfiguration.Builder()
>   .setNumberSlotsPerTaskManager(2)
>   .setNumberTaskManagers(1)
>   .build)
>
> before {
>   flinkCluster.before()
> }
>
> after {
>   flinkCluster.after()
> }
>
> get an execution env thus:
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
>
> build the pipeline and execute it.
>
> These tests fail due to:
>
> org.apache.flink.runtime.client.JobInitializationException: Could not
> start the JobMaster.
>
> at
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>
> at
> java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
>
> at
> java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
>
> at
> java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
>
> at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
>
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>
> at java.base/java.lang.Thread.run(Thread.java:829)
>
> Caused by: java.util.concurrent.CompletionException:
> java.lang.RuntimeException: java.lang.ClassNotFoundException:
> org.apache.flink.api.common.ExecutionConfig
>
> at
> java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
>
> at
> java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
>
> at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
>
> ... 3 more
>
> Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException:
> org.apache.flink.api.common.ExecutionConfig
>
> at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
>
> at
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
>
> at
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
>
> ... 3 more
>
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.api.common.ExecutionConfig
>
> at
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>
> at
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
>
> at java.base/java.lang.Class.forName0(Native Method)
>
> at java.base/java.lang.Class.forName(Class.java:398)
>
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>
> at
> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
>
> at
> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870)
>
> at
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2201)
>
> at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
>
> at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
>
> at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
>
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
>
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
>
> at
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
>
> at
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:101)
>
> at
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122)
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379)
>
> at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:356)
>
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)
>
> at
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)
>
> at
> org.apac

Fwd: Getting java.lang.ClassNotFoundException in Tests (Flink 1.18.0)

2023-12-01 Thread Barak Ben-Nathan
hi,

I am trying to upgrade my app to Flink 1.18.

I have tests that run my stream from/to Embedded (in-memory) Kafka.
I.e. They create a flink cluster thus:

val flinkCluster = new MiniClusterWithClientResource(new
MiniClusterResourceConfiguration.Builder()
  .setNumberSlotsPerTaskManager(2)
  .setNumberTaskManagers(1)
  .build)

before {
  flinkCluster.before()
}

after {
  flinkCluster.after()
}

get an execution env thus:

val env = StreamExecutionEnvironment.getExecutionEnvironment

build the pipeline and execute it.

These tests fail due to:

org.apache.flink.runtime.client.JobInitializationException: Could not start
the JobMaster.

at
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)

at
java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)

at
java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)

at
java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)

at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)

at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)

at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

at java.base/java.lang.Thread.run(Thread.java:829)

Caused by: java.util.concurrent.CompletionException:
java.lang.RuntimeException: java.lang.ClassNotFoundException:
org.apache.flink.api.common.ExecutionConfig

at
java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)

at
java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)

at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)

... 3 more

Caused by: java.lang.RuntimeException: java.lang.ClassNotFoundException:
org.apache.flink.api.common.ExecutionConfig

at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)

at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)

at
java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)

... 3 more

Caused by: java.lang.ClassNotFoundException:
org.apache.flink.api.common.ExecutionConfig

at
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)

at
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)

at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)

at java.base/java.lang.Class.forName0(Native Method)

at java.base/java.lang.Class.forName(Class.java:398)

at
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)

at
java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)

at
java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870)

at
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2201)

at
java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)

at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)

at
java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)

at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)

at
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)

at
org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)

at
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:101)

at
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:122)

at
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:379)

at org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:356)

at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:128)

at
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:100)

at
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
... 4 more

My flink dependencies are:

val flinkDependencies = Seq(
//  "org.apache.flink" %% "flink-scala" % flinkVersion % Provided,
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % Provided,
//  "org.apache.flink" % "flink-streaming-java" % flinkVersion % Provided,
  "org.apache.flink" % "flink-connector-base" % "1.18.0",
  "org.apache.flink" % "flink-connector-kafka" % "3.0.1-1.18")

val testFlinkDependencies = Seq(
  // flink-test-utils brings log4j-slf4j-impl, which causes
java.lang.NoSuchMethodError when running tests
  // 
https://stackoverflow.com/questio

inputQueueSize metric

2023-12-01 Thread rania duni
Regarding the inputQueueSize metric. What does it express after all?
Observing the inputQueueSize and inPoolUsage metrics in Prometheus,
 I found that when the inPoolUsage is high, the inputQueueSize is low.
Thisobservation does not match the description of the inputQueueSize.
Therefore, does this metric express the size of the input buffers of
each task, or
does it express how full the input buffers of a task are?
Thank you in advance.


Re: Flink Kubernetes Operator: Why one Helm repo for each version?

2023-12-01 Thread Salva Alcántara
Ok, thanks for the clarification.

On Fri, Dec 1, 2023 at 2:05 PM Gyula Fóra  wrote:

> Hi!
>
> I already answered your question on slack :
>
>
> “The main reason is that this allows us to completely separate release
> resources etc. much easier for the release process
>
> But this could be improved in the future if there is a good proposal for
> the process”
>
> Please do not cross post questions between slack and ml immediately, it’s
> more respectful of the contributors’ time if you only ask in a single place
> at a time.
>
> Cheers
> Gyula
>
> On Fri, 1 Dec 2023 at 13:31, Salva Alcántara 
> wrote:
>
>> Hi! I'm curious why there is not a single repo URL for the Flink
>> Kubernetes Operator Helm Chart, but multiples ones, one for each version.
>> This forces users to add one repo for each version, like this (directly
>> from the docs):
>>
>> ```
>> helm repo add flink-operator-repo
>> https://downloads.apache.org/flink/flink-kubernetes-operator-
>> /
>> helm install flink-kubernetes-operator
>> flink-operator-repo/flink-kubernetes-operator
>> ```
>>
>> This is weird and different from what I'm used to where you add the
>> (version-less) repo once and then install whatever version you need from
>> it...
>>
>


Re: Flink Kubernetes Operator: Why one Helm repo for each version?

2023-12-01 Thread Gyula Fóra
Hi!

I already answered your question on slack :


“The main reason is that this allows us to completely separate release
resources etc. much easier for the release process

But this could be improved in the future if there is a good proposal for
the process”

Please do not cross post questions between slack and ml immediately, it’s
more respectful of the contributors’ time if you only ask in a single place
at a time.

Cheers
Gyula

On Fri, 1 Dec 2023 at 13:31, Salva Alcántara 
wrote:

> Hi! I'm curious why there is not a single repo URL for the Flink
> Kubernetes Operator Helm Chart, but multiples ones, one for each version.
> This forces users to add one repo for each version, like this (directly
> from the docs):
>
> ```
> helm repo add flink-operator-repo
> https://downloads.apache.org/flink/flink-kubernetes-operator-
> /
> helm install flink-kubernetes-operator
> flink-operator-repo/flink-kubernetes-operator
> ```
>
> This is weird and different from what I'm used to where you add the
> (version-less) repo once and then install whatever version you need from
> it...
>


Flink Kubernetes Operator: Why one Helm repo for each version?

2023-12-01 Thread Salva Alcántara
Hi! I'm curious why there is not a single repo URL for the Flink Kubernetes
Operator Helm Chart, but multiples ones, one for each version. This forces
users to add one repo for each version, like this (directly from the docs):

```
helm repo add flink-operator-repo
https://downloads.apache.org/flink/flink-kubernetes-operator-
/
helm install flink-kubernetes-operator
flink-operator-repo/flink-kubernetes-operator
```

This is weird and different from what I'm used to where you add the
(version-less) repo once and then install whatever version you need from
it...


Re: Profiling on flink jobs

2023-12-01 Thread Matthias Pohl via user
I missed the Reply All button in my previous message. Here's my previous
email for the sake of transparency sent to the user ML once more:

Hi Oscar,
sorry for the late reply. I didn't see that you posted the question at the
beginning of the month already.

I used jmap [1] in the past to get some statistics out and generate *.hprof
files. I haven't looked into creating dump files as documented in [2].

env.java.opts.all will be passed to each java process that's triggered
within Apache Flink.  "dumponexit" (which is used in the documented code
parameter list) suggests that the dump file would be created when the JVM
process exits. Without any more detailed investigation on how the Java
Flight Recorder works, I'd assume that a *.hprof file should be created
when killing the JobManager/TaskManager process rather than cancelling an
individual job. Cancelling the job should only trigger this file creation
if you're using Flink in Application Mode because terminating the job would
trigger the shutdown of the Flink cluster entirely in that case.

Best,
Matthias

[1]
https://docs.oracle.com/javase/8/docs/technotes/guides/troubleshoot/tooldescr014.html
[2]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/application_profiling/

On Thu, Nov 9, 2023 at 9:39 AM Oscar Perez via user 
wrote:

> hi [image: :wave:]  I am trying to do profiling on one of our flink jobs
> according to these docs:
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/debugging/application_profiling/We
> are using OpenJDK 8.0. I am adding this line to the flink properties file
> in docker-compose:
>
> env.java.opts.all: "-XX:+UnlockCommercialFeatures 
> -XX:+UnlockDiagnosticVMOptions -XX:+FlightRecorder -XX:+DebugNonSafepoints 
> -XX:FlightRecorderOptions=defaultrecording=true,dumponexit=true,dumponexitpath=/tmp/dump.jfr"
>
> I would expect to see the dump.jfr file created once I cancel the job but
> unfortunately I dont see anything created. How can I manage to get a valid
> profile file? Thanks!
> Regards,
> Oscar
>