Re: Kubernetes JobManager and TaskManager minimum/maximum resources

2023-01-21 Thread Gyula Fóra
Hi!

I think the examples allocate too many resources by default and we should
reduce it in the yamls.

1gb memory and 0.5 cpu should be more than enough , we could probably get
away with even less for example purposes.

Would you have time trying this out and maybe contributing this
improvement? :)

Thanks
Gyula


On Fri, 20 Jan 2023 at 05:32, Lee Parayno  wrote:

> For application mode FlinkDeployments (maybe even session mode) in
> Kubernetes from the Flink Kubernetes Operator what is the absolute minimum
> amount of CPU and RAM that is required to run the JobManager and
> TaskManager processes?
>
> Some of the example deployment yaml examples have CPU set at 1 full vCPU
> and memory at 2GB (2048 MB).  If you factor in JobManager HA, and 1 or more
> TaskManagers (not sure what is the bounding limit for these processes), you
> can be at 3 vCPU and 6 GB memory used just by the “Flink Infrastructure”
> not counting the Job pods.
>
> Has anyone seen a need to have more resources dedicated to these processes
> for some reason?  Has anyone run it leaner than this (like with 0.5 vCPU
> and less than 1GB memory) in production?
>
> Comparing this to Google Cloud Platform and the Dataflow Runner, AFAIK the
> only resources utilized (that customers pay for) are the Job instances.
>
> Lee Parayno
> Sent from my iPhone


Re: DuplicateJobSubmissionException on restart after taskmanagers crash

2023-01-21 Thread Gyula Fóra
Hi Javier,

I will try to look into this as I have not personally seen this problem
while using the operator .

It would be great if you could reach out to me on slack or email directly
so we can discuss the issue and get to the bottom of it.

Cheer
Gyula

On Fri, 20 Jan 2023 at 23:53, Javier Vegas  wrote:

> My issue is described in https://issues.apache.org/jira/browse/FLINK-21928
> where it says was fixed in 1.14, but I am still seeing the problem.
> Although there it says:
>
> "Additionally, it is still required that the user cleans up the
> corresponding HA entries for the running jobs registry because these
> entries won't be reliably cleaned up when encountering the situation
> described by FLINK-21928
> ."
>
> so I guess I need to do some manual cleanup of my S3 HA data before
> restarting
>
> El vie, 20 ene 2023 a las 4:58, Javier Vegas ()
> escribió:
>
>>
>> I have a Flink app (Flink 1.16.0, deployed to Kubernetes via operator
>> 1.3.1 and using Kubernetes HighAvailaibilty with storage in S3) that
>> depends on multiple Thrift services for data queries. When one of those
>> services is down (or throws exceptions) the Flink job managers end up
>> crashing and only the task managers remain up. Once the dependencies are
>> fixed, when I try to restart the Flink app I end up with a
>> "DuplicateJobSubmissionException: Job has already been submitted" (see
>> below for detailed log) and the task managers never start. The only
>> solution I have found is to delete the deployment from Kubernetes and then
>> deploy again as a new job.
>>
>> 1) Is there a better way to handle failures on dependencies than letting
>> task managers crash and keep job managers up, and restart after
>> dependencies are fixed?
>> 1) If not, is there a way to handle the DuplicateJobSubmissionException
>> so the Flink app can be restarted without having to uninstall it first?
>>
>> Thanks,
>>
>> Javier Vegas
>>
>>
>> org.apache.flink.util.FlinkException: Failed to execute job
>> at
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2203)
>> Caused by:
>> org.apache.flink.runtime.client.DuplicateJobSubmissionException: Job has
>> already been submitted.
>> at
>> org.apache.flink.runtime.client.DuplicateJobSubmissionException.ofGloballyTerminated(DuplicateJobSubmissionException.java:35)
>> at
>> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:449)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
>> Method)
>> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown
>> Source)
>> at
>> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown
>> Source)
>> at java.base/java.lang.reflect.Method.invoke(Unknown Source)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
>> at
>> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
>> at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
>> at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>> at akka.actor.Actor.aroundReceive(Actor.scala:537)
>> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>> ... 5 more
>> Exception thrown in main on startup
>>
>>
>>
>


Re: Flink Kubernetes Operator podTemplate and 'app' pod label bug?

2023-01-19 Thread Gyula Fóra
To clarify this logic is inherited from the Flink Native Kubernetes
integration itself. The operator specific labels we use are already fully
qualified.
I agree that this could be improved in Flink by a better label.

Cheers,
Gyula

On Thu, Jan 19, 2023 at 11:00 PM Mason Chen  wrote:

> @Andrew I was also confused by this earlier and FYI this line where it is
> referenced
> https://github.com/apache/flink-kubernetes-operator/blame/7d5bf9536bdfbf86de5803766b28e503cd32ee04/flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/utils/StandaloneKubernetesUtils.java#L43
>
> On Thu, Jan 19, 2023 at 1:59 PM Őrhidi Mátyás 
> wrote:
>
>> On a side note, we should probably use a qualified label name instead of
>> the pretty common app here. WDYT Gyula?
>>
>> On Thu, Jan 19, 2023 at 1:48 PM Gyula Fóra  wrote:
>>
>>> Hi!
>>>
>>> The app label itself is used by Flink internally for a different purpose
>>> so it’s overriden. This is completely expected.
>>>
>>> I think it would be better to use some other label :)
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Thu, 19 Jan 2023 at 19:02, Andrew Otto  wrote:
>>>
>>>> Hello!
>>>>
>>>> I'm seeing an unexpected label value assignment happening, and I'm not
>>>> sure how it's happening.  It is possible it is in my own helm charts and
>>>> templates somewhere, but I'm not seeing it, so I'm beginning to think this
>>>> is happening in the FlinkDeployment CRD in the operator code somewhere.
>>>>
>>>> I'm using FlinkDeployment podTemplate
>>>> <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/>
>>>> to add an 'app' label:
>>>>
>>>>  podTemplate:
>>>> apiVersion: v1
>>>> kind: Pod
>>>> metadata:
>>>>   labels:
>>>> app: flink-app
>>>> release: flink-example
>>>> ...
>>>>
>>>> I also have this app label set in the FlinkDeployment labels:
>>>>
>>>> kind: FlinkDeployment
>>>> metadata:
>>>>   name: flink-app-flink-example
>>>>   labels:
>>>> app: flink-app
>>>> chart: flink-app-0.1.1
>>>> release: flink-example
>>>>
>>>> Since I've set app: flink-app in the podTemplate, I would expect all
>>>> pods to get this label.  The FlinkDeployment resource has this label
>>>> value as expected.  However, I see that in the pods, as well as the
>>>> Deployment that are created by FlinkDeployment:
>>>>
>>>> *$ kubectl -n flink-app0 describe deployments flink-app-flink-example*
>>>> ...
>>>> Name:   flink-app-flink-example
>>>> Namespace:  flink-app0
>>>> CreationTimestamp:  Thu, 19 Jan 2023 12:42:05 -0500
>>>> Labels: app=flink-app-flink-example
>>>> component=jobmanager
>>>> ...
>>>>
>>>> Pod Template:
>>>>   Labels:   app=flink-app-flink-example
>>>> component=jobmanager
>>>> release=flink-example
>>>> ...
>>>>
>>>>
>>>> *$ kubectl -n flink-app0 describe pod
>>>> flink-app-flink-example-d974cb595-788ch*
>>>> ...
>>>> Labels:   app=flink-app-flink-example
>>>>   component=jobmanager
>>>>   pod-template-hash=d974cb595
>>>>   release=flink-example
>>>> ...
>>>>
>>>>
>>>> I'd expect the app label to be 'flink-app' for at least the Deployment
>>>> PodTemplate and the Pod, if not the Deployment itself too.
>>>>
>>>> Something is overriding the app label in podTemplate, and I don't think
>>>> it's my chart or installation.  I looked in flink-kubernetes-operator code
>>>> and I didn't find where this was happening either.  I am not setting e.g.
>>>> kubernetes.jobmanager.labels
>>>> <https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#kubernetes-jobmanager-labels>
>>>> .
>>>>
>>>> Is this expected?
>>>>
>>>> Thank you!
>>>>
>>>> -Andrew Otto
>>>>  Wikimedia Foundation
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>


Re: Flink Kubernetes Operator podTemplate and 'app' pod label bug?

2023-01-19 Thread Gyula Fóra
Hi!

The app label itself is used by Flink internally for a different purpose so
it’s overriden. This is completely expected.

I think it would be better to use some other label :)

Cheers,
Gyula

On Thu, 19 Jan 2023 at 19:02, Andrew Otto  wrote:

> Hello!
>
> I'm seeing an unexpected label value assignment happening, and I'm not
> sure how it's happening.  It is possible it is in my own helm charts and
> templates somewhere, but I'm not seeing it, so I'm beginning to think this
> is happening in the FlinkDeployment CRD in the operator code somewhere.
>
> I'm using FlinkDeployment podTemplate
> 
> to add an 'app' label:
>
>  podTemplate:
> apiVersion: v1
> kind: Pod
> metadata:
>   labels:
> app: flink-app
> release: flink-example
> ...
>
> I also have this app label set in the FlinkDeployment labels:
>
> kind: FlinkDeployment
> metadata:
>   name: flink-app-flink-example
>   labels:
> app: flink-app
> chart: flink-app-0.1.1
> release: flink-example
>
> Since I've set app: flink-app in the podTemplate, I would expect all pods
> to get this label.  The FlinkDeployment resource has this label value as
> expected.  However, I see that in the pods, as well as the Deployment
> that are created by FlinkDeployment:
>
> *$ kubectl -n flink-app0 describe deployments flink-app-flink-example*
> ...
> Name:   flink-app-flink-example
> Namespace:  flink-app0
> CreationTimestamp:  Thu, 19 Jan 2023 12:42:05 -0500
> Labels: app=flink-app-flink-example
> component=jobmanager
> ...
>
> Pod Template:
>   Labels:   app=flink-app-flink-example
> component=jobmanager
> release=flink-example
> ...
>
>
> *$ kubectl -n flink-app0 describe pod
> flink-app-flink-example-d974cb595-788ch*
> ...
> Labels:   app=flink-app-flink-example
>   component=jobmanager
>   pod-template-hash=d974cb595
>   release=flink-example
> ...
>
>
> I'd expect the app label to be 'flink-app' for at least the Deployment
> PodTemplate and the Pod, if not the Deployment itself too.
>
> Something is overriding the app label in podTemplate, and I don't think
> it's my chart or installation.  I looked in flink-kubernetes-operator code
> and I didn't find where this was happening either.  I am not setting e.g.
> kubernetes.jobmanager.labels
> 
> .
>
> Is this expected?
>
> Thank you!
>
> -Andrew Otto
>  Wikimedia Foundation
>
>
>
>
>
>


Re: Regarding Changes in Flink Operator 1.3.1

2023-01-18 Thread Gyula Fóra
Please see the release announcements:
https://flink.apache.org/news/2022/10/07/release-kubernetes-operator-1.2.0.html
https://flink.apache.org/news/2022/12/14/release-kubernetes-operator-1.3.0.html
https://flink.apache.org/news/2023/01/10/release-kubernetes-operator-1.3.1.html

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/upgrade/

Gyula

On Thu, Jan 19, 2023 at 8:17 AM Sumit Aich  wrote:

> also are the changes in operator version 1.3.1 backward compatible ?
>
> On Thu, Jan 19, 2023 at 12:38 PM Sumit Aich  wrote:
>
>> Hi Team,
>>
>> Can you please share what has changed in Flink Kubernetes Operator
>> version 1.3.1 from the 1.1.0 version.
>>
>> Thanks,
>> Sumit
>>
>


Re: Flink reactive mode for application clusters on AWS EMR - Auto Scaling

2023-01-15 Thread Gyula Fóra
I am not aware of any autoscaler/operator like functionality that works on
EMR/YARN at the moment.

It is probably easier to migrate to Kubernetes, or improve the Flink
Kubernetes Operator itself to be able to manage YARN clusters than to
improve all these things for yarn directly :)

Gyula

On Sun, Jan 15, 2023 at 5:27 AM Madan D  wrote:

> Hi Gyula/Team,
>
> Thanks for your response. I see that the above one works on Kubernetes,
> but all our applications are currently running on EMR Yarn and looking to
> integrate with EMR-managed scaling with adaptive scheduler.
> Do we have any features or operators which we can use to run on Yarn
> similar to Flink Kubernetes Operator  which can perform auto-scaling.
>
>
> Regards,
> Madan
>
> On Friday, 13 January 2023 at 09:37:11 pm GMT-8, Gyula Fóra <
> gyula.f...@gmail.com> wrote:
>
>
> Hi Madan,
>
> With reactive mode you need to build a completely custom auto scaling
> logic, it can work but it takes considerable effort.
>
> Instead I recommend using the Flink Kubernetes Operator which now contains
> the first version of an actual autoscaler module that collects metrics and
> scales Flink jobs .
>
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/
>
> The operator autoscaler uses Kubernetes native integration and works more
> or less out of the box.
>
> Cheers
> Gyula
>
> On Sat, 14 Jan 2023 at 01:16, Madan D via user 
> wrote:
>
> Hello Team,
> I would like to understand auto scaling on EMR using either reactive mode
> or adaptive scheduler with custom or managed scaling.
> Can some one help me on this.
>
>
> Regards,
> Madan
>
>
>
>


Re: Flink reactive mode for application clusters on AWS EMR - Auto Scaling

2023-01-13 Thread Gyula Fóra
Hi Madan,

With reactive mode you need to build a completely custom auto scaling
logic, it can work but it takes considerable effort.

Instead I recommend using the Flink Kubernetes Operator which now contains
the first version of an actual autoscaler module that collects metrics and
scales Flink jobs .

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/

The operator autoscaler uses Kubernetes native integration and works more
or less out of the box.

Cheers
Gyula

On Sat, 14 Jan 2023 at 01:16, Madan D via user 
wrote:

> Hello Team,
> I would like to understand auto scaling on EMR using either reactive mode
> or adaptive scheduler with custom or managed scaling.
> Can some one help me on this.
>
>
> Regards,
> Madan
>
>
>
>


Re: [EXTERNAL] Re: Flink reactive mode for application clusters on AWS EKS

2023-01-13 Thread Gyula Fóra
I would be happy to answer more questions later but it would be best if you
could first try the operator or at least read the documentation:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/

You will find answers to most of your questions, and running it locally on
minikube to try some test scenarios will be even more beneficial.

Thanks
Gyula

On Fri, Jan 13, 2023 at 5:50 PM Tamir Sagi 
wrote:

> Hey Gyula,
> Thank you for fast response.
>
> I understand it completely. I believe the operator has similar
> functionalities to the custom service we have developed regarding deploy,
> update and delete clusters.
> The different from our perspective is that we have added several more
> capabilities and configurations for the deployment phase.
>
> Assuming there is an application cluster(Native k8s) with 3 Job managers
> and 2 Task managers. The cluster is running for several hours. let's say In
> a given point in time, the operator decides to scale the cluster up (based
> on pre-defined configurations).
>
> you wrote
>
> *The operator also now contains an autoscaler module that runs within the
> operator and monitors Flink clusters and determines whether a jobvertex
> should be scaled up or down. It will then scale the job accordingly.*
>
> it sounds great,  Does that mean that it simply creates a new TM pod
> which then becomes part of the cluster? (I'm asking because the graph is
> created while deploying the cluster at first place).
> If yes, is this module can be used outside that operator?
>
> If not, would you please elaborate whether this scale(up/down) operation
> leads to downtime?
>
> Best,
> Tamir.
>
>
>
>
>
>
>
>
> --
> *From:* Gyula Fóra 
> *Sent:* Friday, January 13, 2023 4:42 PM
> *To:* Tamir Sagi 
> *Cc:* Chesnay Schepler ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: [EXTERNAL] Re: Flink reactive mode for application
> clusters on AWS EKS
>
>
> *EXTERNAL EMAIL*
>
>
> Hi Tamir!
>
> Let me try to clarify a few points here.
>
> The operator works based on FlinkDeployment Custom Resources (Yaml
> definition) and the operator creates the required clusters / taskmanagers
> based on that. If you change the parallelism of your FlinkDeployment Yaml,
> the operator will adjust the cluster size (scale up or down).
>
> The operator also now contains an autoscaler module that runs within the
> operator and monitors Flink clusters and determines whether a jobvertex
> should be scaled up or down. It will then scale the job accordingly.
> The autoscaler currently only works with the default Native Deployment
> mode.
>
> The operator does not use Flink reactive mode to perform autoscaling.
>
> I highly recommend trying to migrate to the operator (or at least testing
> it locally so you fully understand the functionality), you will save
> yourself a tremendous amount of work especially if you are looking to build
> an autoscaler.
>
> Cheers,
> Gyula
>
> On Fri, Jan 13, 2023 at 3:37 PM Tamir Sagi 
> wrote:
>
> Hey Gyula,
>
> Thanks for clarifying that.
>
> We created a custom service before an official Flink k8s operator was
> released. That service deploys/upgrades/deletes clusters (no Yamls are
> needed). It handles failures including retries and cleanups based on our
> needs. Hence, moving to the official Flink operator might take a while.
>
> Does the operator also perform scale down?
>
> Regarding HPA, Task managers are created by Flink based on parallelism &
> number of slots. Then the cluster has fixed size of X JMs and Y TMs.
> I was thinking about adding HPA but wondered whether or not Flink will
> handle the new TMs properly (I have not tested it).
>
> We are probably left with the option to implement the auto scaling
> mechanism ourselves on top of Flink clusters.
>
> Best,
> Tamir.
> --
> *From:* Gyula Fóra 
> *Sent:* Friday, January 13, 2023 8:39 AM
> *To:* Swathi Chandrashekar 
> *Cc:* Chesnay Schepler ; Tamir Sagi <
> tamir.s...@niceactimize.com>; user@flink.apache.org  >
> *Subject:* Re: [EXTERNAL] Re: Flink reactive mode for application
> clusters on AWS EKS
>
>
> *EXTERNAL EMAIL*
>
>
> What I am trying to say is use the Kubernetes operator with Native
> (default) mode and forget about reactive .
>
> The operator does everything you wwant plus has an actual autoscaler.
>
> Gyula
>
> On Fri, 13 Jan 2023 at 07:24, Swathi Chandrashekar 
> wrote:
>
> Got it, so this means, we should have standalone app mode cluster which is
> managed by a flink Kubernetes operator and the operator would update the
> replicas

Re: [EXTERNAL] Re: Flink reactive mode for application clusters on AWS EKS

2023-01-13 Thread Gyula Fóra
Hi Tamir!

Let me try to clarify a few points here.

The operator works based on FlinkDeployment Custom Resources (Yaml
definition) and the operator creates the required clusters / taskmanagers
based on that. If you change the parallelism of your FlinkDeployment Yaml,
the operator will adjust the cluster size (scale up or down).

The operator also now contains an autoscaler module that runs within the
operator and monitors Flink clusters and determines whether a jobvertex
should be scaled up or down. It will then scale the job accordingly.
The autoscaler currently only works with the default Native Deployment mode.

The operator does not use Flink reactive mode to perform autoscaling.

I highly recommend trying to migrate to the operator (or at least testing
it locally so you fully understand the functionality), you will save
yourself a tremendous amount of work especially if you are looking to build
an autoscaler.

Cheers,
Gyula

On Fri, Jan 13, 2023 at 3:37 PM Tamir Sagi 
wrote:

> Hey Gyula,
>
> Thanks for clarifying that.
>
> We created a custom service before an official Flink k8s operator was
> released. That service deploys/upgrades/deletes clusters (no Yamls are
> needed). It handles failures including retries and cleanups based on our
> needs. Hence, moving to the official Flink operator might take a while.
>
> Does the operator also perform scale down?
>
> Regarding HPA, Task managers are created by Flink based on parallelism &
> number of slots. Then the cluster has fixed size of X JMs and Y TMs.
> I was thinking about adding HPA but wondered whether or not Flink will
> handle the new TMs properly (I have not tested it).
>
> We are probably left with the option to implement the auto scaling
> mechanism ourselves on top of Flink clusters.
>
> Best,
> Tamir.
> --
> *From:* Gyula Fóra 
> *Sent:* Friday, January 13, 2023 8:39 AM
> *To:* Swathi Chandrashekar 
> *Cc:* Chesnay Schepler ; Tamir Sagi <
> tamir.s...@niceactimize.com>; user@flink.apache.org  >
> *Subject:* Re: [EXTERNAL] Re: Flink reactive mode for application
> clusters on AWS EKS
>
>
> *EXTERNAL EMAIL*
>
>
> What I am trying to say is use the Kubernetes operator with Native
> (default) mode and forget about reactive .
>
> The operator does everything you wwant plus has an actual autoscaler.
>
> Gyula
>
> On Fri, 13 Jan 2023 at 07:24, Swathi Chandrashekar 
> wrote:
>
> Got it, so this means, we should have standalone app mode cluster which is
> managed by a flink Kubernetes operator and the operator would update the
> replicas based on the metrics ( autoscale ) which in-tern changes the
> parallelism as reactivemode is enabled.
>
>
>
> Regards,
>
> Swathi C
>
>
>
> *From:* Gyula Fóra 
> *Sent:* Friday, January 13, 2023 11:31 AM
> *To:* Swathi Chandrashekar 
> *Cc:* Chesnay Schepler ; Tamir Sagi <
> tamir.s...@niceactimize.com>; user@flink.apache.org
> *Subject:* Re: [EXTERNAL] Re: Flink reactive mode for application
> clusters on AWS EKS
>
>
>
> No but the Kubernetes operator itself already provides similar feature set.
>
>
>
> Not sure why you want the reactive mode in the first place . If it's
> because you want to implement auto scaling on top of it, then I think the
> operator is a better alternative.
>
>
>
> I think you should try to understand what exactly the reactive mode
> provides vs what the operator does. Reactive mode alone doesn’t do too much.
>
>
>
> Gyula
>
>
>
> On Fri, 13 Jan 2023 at 06:33, Swathi Chandrashekar 
> wrote:
>
> Hi @Gyula Fóra ,
>
>
>
> Does this mean, with Kubernetes operator, we can have reactive mode in
> native flink which is in app mode ? [ Not just standalone app mode ]
>
>
>
> Regards,
>
> Swathi C
>
>
>
> *From:* Gyula Fóra 
> *Sent:* Thursday, January 12, 2023 11:14 PM
> *To:* Tamir Sagi 
> *Cc:* Chesnay Schepler ; user@flink.apache.org
> *Subject:* [EXTERNAL] Re: Flink reactive mode for application clusters on
> AWS EKS
>
>
>
> Hey!
>
> I think the reactive scaling is a somewhat misunderstood feature. It only
> works in standalone deployments (not in Kubernetes native for instace) and
> it doesn't actually provide any autoscaling functionality on its own.
> You would have to implement your scaling logic yourself somehow
> (Kubernetes HPA or something similar)
>
> I suggest looking at the Flink Kubernetes Operator (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
> <https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnightlies.apache.org%2Fflink%2Fflink-kubernetes-operator-docs-main%2F=05%7C01%7Ccswathi%40microsoft.com%7C503a40530f07463cc60

Re: [EXTERNAL] Re: Flink reactive mode for application clusters on AWS EKS

2023-01-12 Thread Gyula Fóra
What I am trying to say is use the Kubernetes operator with Native
(default) mode and forget about reactive .

The operator does everything you wwant plus has an actual autoscaler.

Gyula

On Fri, 13 Jan 2023 at 07:24, Swathi Chandrashekar 
wrote:

> Got it, so this means, we should have standalone app mode cluster which is
> managed by a flink Kubernetes operator and the operator would update the
> replicas based on the metrics ( autoscale ) which in-tern changes the
> parallelism as reactivemode is enabled.
>
>
>
> Regards,
>
> Swathi C
>
>
>
> *From:* Gyula Fóra 
> *Sent:* Friday, January 13, 2023 11:31 AM
> *To:* Swathi Chandrashekar 
> *Cc:* Chesnay Schepler ; Tamir Sagi <
> tamir.s...@niceactimize.com>; user@flink.apache.org
> *Subject:* Re: [EXTERNAL] Re: Flink reactive mode for application
> clusters on AWS EKS
>
>
>
> No but the Kubernetes operator itself already provides similar feature set.
>
>
>
> Not sure why you want the reactive mode in the first place . If it's
> because you want to implement auto scaling on top of it, then I think the
> operator is a better alternative.
>
>
>
> I think you should try to understand what exactly the reactive mode
> provides vs what the operator does. Reactive mode alone doesn’t do too much.
>
>
>
> Gyula
>
>
>
> On Fri, 13 Jan 2023 at 06:33, Swathi Chandrashekar 
> wrote:
>
> Hi @Gyula Fóra ,
>
>
>
> Does this mean, with Kubernetes operator, we can have reactive mode in
> native flink which is in app mode ? [ Not just standalone app mode ]
>
>
>
> Regards,
>
> Swathi C
>
>
>
> *From:* Gyula Fóra 
> *Sent:* Thursday, January 12, 2023 11:14 PM
> *To:* Tamir Sagi 
> *Cc:* Chesnay Schepler ; user@flink.apache.org
> *Subject:* [EXTERNAL] Re: Flink reactive mode for application clusters on
> AWS EKS
>
>
>
> Hey!
>
> I think the reactive scaling is a somewhat misunderstood feature. It only
> works in standalone deployments (not in Kubernetes native for instace) and
> it doesn't actually provide any autoscaling functionality on its own.
> You would have to implement your scaling logic yourself somehow
> (Kubernetes HPA or something similar)
>
> I suggest looking at the Flink Kubernetes Operator (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
> <https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnightlies.apache.org%2Fflink%2Fflink-kubernetes-operator-docs-main%2F=05%7C01%7Ccswathi%40microsoft.com%7C503a40530f07463cc60008daf52b880a%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C638091864604462716%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=R2zk4xBZKAX7EuRWuhc5Sp%2FYBlZuv60OnSvtZ9QWG9Y%3D=0>)
> that will provide actual autoscaler capability for native Kubernetes
> deployments.
>
> Cheers,
> Gyula
>
>
>
> On Thu, Jan 12, 2023 at 6:23 PM Tamir Sagi 
> wrote:
>
> Hey Chesnay,
>
>
>
> Just to be more clear,
>
> I'm talking about plans to support reactive mode for application clusters
> in Native Kubernetes.
>
>
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#application-mode
> <https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnightlies.apache.org%2Fflink%2Fflink-docs-master%2Fdocs%2Fdeployment%2Fresource-providers%2Fnative_kubernetes%2F%23application-mode=05%7C01%7Ccswathi%40microsoft.com%7C503a40530f07463cc60008daf52b880a%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C638091864604462716%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=%2FbL%2BgCndsgf%2BaJfYkMrvaqOa3f5gUXwnukBB0ddU34w%3D=0>
>
>
>
> Thanks,
>
> Tamir.
>
>
> --
>
> *From:* Tamir Sagi 
> *Sent:* Thursday, January 12, 2023 6:17 PM
> *To:* Chesnay Schepler ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Flink reactive mode for application clusters on AWS EKS
>
>
>
> Hey Chesnay,
>
>
>
> Thank you for your response.
>
>
>
> Since we are running our Flink jobs on EKS (Elastic Kubernetes Service)  I
> was asking regarding Application cluster on Kubernetes.
>
>
>
> The documentations I referred to clearly state that it is not
> supported, the same as shown on Flink website.
>
>
>
> Is there any plan to support that anytime soon?
>
>
>
> Thanks
>
>
>
> Tamir.
> --
>
> *From:* Chesnay Schepler 
> *Sent:* Thursday, January 12, 2023 4:30 PM
> *To:* Tamir Sagi ; user@flink.apache.org <
> user@flink.apache.org>
> 

Re: [EXTERNAL] Re: Flink reactive mode for application clusters on AWS EKS

2023-01-12 Thread Gyula Fóra
No but the Kubernetes operator itself already provides similar feature set.

Not sure why you want the reactive mode in the first place . If it's
because you want to implement auto scaling on top of it, then I think the
operator is a better alternative.

I think you should try to understand what exactly the reactive mode
provides vs what the operator does. Reactive mode alone doesn’t do too much.

Gyula

On Fri, 13 Jan 2023 at 06:33, Swathi Chandrashekar 
wrote:

> Hi @Gyula Fóra ,
>
>
>
> Does this mean, with Kubernetes operator, we can have reactive mode in
> native flink which is in app mode ? [ Not just standalone app mode ]
>
>
>
> Regards,
>
> Swathi C
>
>
>
> *From:* Gyula Fóra 
> *Sent:* Thursday, January 12, 2023 11:14 PM
> *To:* Tamir Sagi 
> *Cc:* Chesnay Schepler ; user@flink.apache.org
> *Subject:* [EXTERNAL] Re: Flink reactive mode for application clusters on
> AWS EKS
>
>
>
> Hey!
>
> I think the reactive scaling is a somewhat misunderstood feature. It only
> works in standalone deployments (not in Kubernetes native for instace) and
> it doesn't actually provide any autoscaling functionality on its own.
> You would have to implement your scaling logic yourself somehow
> (Kubernetes HPA or something similar)
>
> I suggest looking at the Flink Kubernetes Operator (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
> <https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnightlies.apache.org%2Fflink%2Fflink-kubernetes-operator-docs-main%2F=05%7C01%7Ccswathi%40microsoft.com%7C86c2c646cd6e4d9d221e08daf4c49f6d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C638091422654166711%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=pu6i2YmIsU9WtvaXRIZnmEJraW9c8X5NT81F7%2BZLyVI%3D=0>)
> that will provide actual autoscaler capability for native Kubernetes
> deployments.
>
> Cheers,
> Gyula
>
>
>
> On Thu, Jan 12, 2023 at 6:23 PM Tamir Sagi 
> wrote:
>
> Hey Chesnay,
>
>
>
> Just to be more clear,
>
> I'm talking about plans to support reactive mode for application clusters
> in Native Kubernetes.
>
>
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#application-mode
> <https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnightlies.apache.org%2Fflink%2Fflink-docs-master%2Fdocs%2Fdeployment%2Fresource-providers%2Fnative_kubernetes%2F%23application-mode=05%7C01%7Ccswathi%40microsoft.com%7C86c2c646cd6e4d9d221e08daf4c49f6d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C638091422654166711%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=8Pg0eyyEZc5SC0cBnjh14nLY2EWXOhRI3tLIK9sEGFM%3D=0>
>
>
>
> Thanks,
>
> Tamir.
>
>
> --
>
> *From:* Tamir Sagi 
> *Sent:* Thursday, January 12, 2023 6:17 PM
> *To:* Chesnay Schepler ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Flink reactive mode for application clusters on AWS EKS
>
>
>
> Hey Chesnay,
>
>
>
> Thank you for your response.
>
>
>
> Since we are running our Flink jobs on EKS (Elastic Kubernetes Service)  I
> was asking regarding Application cluster on Kubernetes.
>
>
>
> The documentations I referred to clearly state that it is not
> supported, the same as shown on Flink website.
>
>
>
> Is there any plan to support that anytime soon?
>
>
>
> Thanks
>
>
>
> Tamir.
> --
>
> *From:* Chesnay Schepler 
> *Sent:* Thursday, January 12, 2023 4:30 PM
> *To:* Tamir Sagi ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Flink reactive mode for application clusters on AWS EKS
>
>
>
> *EXTERNAL EMAIL*
>
>
>
> The adaptive scheduler and reactive mode both already support application
> clusters since 1.13.
>
>
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/elastic_scaling/
> <https://nam06.safelinks.protection.outlook.com/?url=https%3A%2F%2Fnightlies.apache.org%2Fflink%2Fflink-docs-release-1.16%2Fdocs%2Fdeployment%2Felastic_scaling%2F=05%7C01%7Ccswathi%40microsoft.com%7C86c2c646cd6e4d9d221e08daf4c49f6d%7C72f988bf86f141af91ab2d7cd011db47%7C1%7C0%7C638091422654166711%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C3000%7C%7C%7C=hz3WIHO4qE8kc9l5RwzWVdaZ1eXIPy329hozq%2BNM0Vs%3D=0>
>
>
>
> On 19/12/2022 10:17, Tamir Sagi wrote:
>
> Hey,
>
>
>
> We are running stream jobs on application clusters (v1.15.2) on AWS EKS.

Re: Flink reactive mode for application clusters on AWS EKS

2023-01-12 Thread Gyula Fóra
Hey!

I think the reactive scaling is a somewhat misunderstood feature. It only
works in standalone deployments (not in Kubernetes native for instace) and
it doesn't actually provide any autoscaling functionality on its own.
You would have to implement your scaling logic yourself somehow (Kubernetes
HPA or something similar)

I suggest looking at the Flink Kubernetes Operator (
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/)
that will provide actual autoscaler capability for native Kubernetes
deployments.

Cheers,
Gyula

On Thu, Jan 12, 2023 at 6:23 PM Tamir Sagi 
wrote:

> Hey Chesnay,
>
> Just to be more clear,
> I'm talking about plans to support reactive mode for application clusters
> in Native Kubernetes.
>
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#application-mode
>
> Thanks,
> Tamir.
>
> --
> *From:* Tamir Sagi 
> *Sent:* Thursday, January 12, 2023 6:17 PM
> *To:* Chesnay Schepler ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Flink reactive mode for application clusters on AWS EKS
>
> Hey Chesnay,
>
> Thank you for your response.
>
> Since we are running our Flink jobs on EKS (Elastic Kubernetes Service)  I
> was asking regarding Application cluster on Kubernetes.
>
> The documentations I referred to clearly state that it is not supported, the
> same as shown on Flink website.
>
>
> Is there any plan to support that anytime soon?
>
> Thanks
>
> Tamir.
> --
> *From:* Chesnay Schepler 
> *Sent:* Thursday, January 12, 2023 4:30 PM
> *To:* Tamir Sagi ; user@flink.apache.org <
> user@flink.apache.org>
> *Subject:* Re: Flink reactive mode for application clusters on AWS EKS
>
>
> *EXTERNAL EMAIL*
>
>
> The adaptive scheduler and reactive mode both already support application
> clusters since 1.13.
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/deployment/elastic_scaling/
>
> On 19/12/2022 10:17, Tamir Sagi wrote:
>
> Hey,
>
> We are running stream jobs on application clusters (v1.15.2) on AWS EKS.
>
> I was reviewing the following pages on Flink confluence
>
>- Reactive mode [1]
>- Adaptive Scheduler [2]
>
> I also encountered the following POC conducted by Robert Metzger (
> @rmetzger_ ) on 06 May 2021. [3]
>
> my question is whether that feature will be supported in the future for
> application clusters or not.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-159%3A+Reactive+Mode
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-160%3A+Adaptive+Scheduler
> [3] https://flink.apache.org/2021/05/06/reactive-mode.html
>
>
> Thanks,
> Tamir.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually 

Re: Missing Flink Operator version

2023-01-10 Thread Gyula Fóra
What you linked is not the list of stable releases but the list of all past
releases.
Only the last 2 minor versions are supported, which are currently 1.3.1 and
1.2.0.

I highly recommend using the latest 1.3.1 release.

Cheers,
Gyula

On Wed, Jan 11, 2023 at 7:31 AM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hello team,
>
> It appears that Flink Operator 1.1.0 is removed from the repo index
> https://downloads.apache.org/flink/
>
> However, it is still mentioned as a stable release in the list of
> downloads here
> 
> as well as in the quickstart documentation
> 
>
> Is it on purpose ? If yes, is 1.1.0 deprecated ? If not, how to use it
> then ?
> --
> *Regards,*
> *Meghajit*
>


[ANNOUNCE] Apache Flink Kubernetes Operator 1.3.1 released

2023-01-10 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.3.1.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2023/01/10/release-kubernetes-operator-1.3.1.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12352697

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


[ANNOUNCE] Apache Flink Kubernetes Operator 1.3.1 released

2023-01-10 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.3.1.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2023/01/10/release-kubernetes-operator-1.3.1.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/projects/FLINK/versions/12352697

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


Re: What is the flink-kubernetes-operator webhook for?

2022-12-09 Thread Gyula Fóra
To add to what Matyas said:

Validation in itself is a mandatory step for every spec change that is
submitted to guard against broken configs (things like negative parallelism
etc).

But validation can happen in 2 places. It can be done through the webhook,
which would result in upfront rejection of the spec on validation error.

Or it can happen during regular processing/reconciliation process in which
case errors are recorded in the status .

The webhook is nice way to get validation error’s immediately but as you
see it’s not necessary as validation would happen anyways .

Gyula

On Fri, 9 Dec 2022 at 09:21, Őrhidi Mátyás  wrote:

> Hi Otto,
>
> webhooks in general are optional components of the k8s operator pattern.
> Mostly used for validation, sometimes for changing custom resources and
> handling multiple versions, etc. It's an optional component in the Flink
> Kubernetes Operator too.
>
> Regards,
> Matyas
>
> On Fri, Dec 9, 2022 at 5:59 AM Andrew Otto  wrote:
>
>> Hello!
>>
>> What is the Flink Kubernetes Webhook
>> 
>> for?  I probably don't know just because I don't know k8s that well, but
>> reading code and other docs didn't particular enlighten me :)
>>
>> It looks like maybe its for doing some extra validation of k8s API
>> requests, and allows you to customize how those requests are validated and
>> processed if you have special requirements to do so.
>>
>> Since it can be so easily disabled
>> ,
>> do we need to install it for production use?  FWIW, we will not be using
>> FlinkSessionJob, so perhaps we don't need it if we don't use that?
>>
>> Thanks!
>> -Andrew Otto
>>  Wikimedia Foundation
>>
>


Re: ArgoCD unable to process health from k8s FlinkDeployment - stuck in Processing

2022-12-09 Thread Gyula Fóra
Hi!

The resource lifecycle state is currently not shown explicitly in the
status.

You are confusing it with reconciliation status. At the moment you can only
get this through the Java client:

https://github.com/apache/flink-kubernetes-operator/blob/main/flink-kubernetes-operator-api/src/main/java/org/apache/flink/kubernetes/operator/api/status/CommonStatus.java

This seems to be a very common request so we should probably expose this
field directly in the status, even though it could technically be derived
from other fields.

Could you please open a jira ticket for this improvement?

Cheers
Gyula

On Fri, 9 Dec 2022 at 05:46, Edgar H  wrote:

> Morning all,
>
> Recently been testing the Flink k8s operator (flink.apache.org/v1beta1)
> and although the jobs do startup and run perfectly fine, their status in
> ArgoCD is not yet as it should be, some details:
>
> When describing the flinkdeployment I'm currently trying to test, the
> follows appears in events:
>
>   TypeReason Age   From  Message
>   --     ---
>   Normal  Submit 22m   JobManagerDeployment  Starting deployment
>   Normal  StatusChanged  21m   Job   Job status changed
> from RECONCILING to CREATED
>   Normal  StatusChanged  20m   Job   Job status changed
> from CREATED to RUNNING
>
> On top of it, the reconciliation timestamp and the state are as follows:
>
> Reconciliation Timestamp:  1670581014190
> State: DEPLOYED
>
> From what I've read in the docs, the flinkdeployment is not considered
> healthy until that state: STABLE, right?
>
>
>- DEPLOYED : The resource is deployed/submitted to Kubernetes, but
>it’s not yet considered to be stable and might be rolled back in the future
>- STABLE : The resource deployment is considered to be stable and
>won’t be rolled back
>
>
> The jobs have been running for some hours already, one of them would throw
> some exceptions but won't cause downtime. What does it take for the job to
> be in STABLE state rather than just DEPLOYED? Would that be the cause of
> the Processing... health status in ArgoCD or just that internally in k8s
> the flinkoperator can't really notice the pods running well?
>


Re: [Flink K8s Operator] flinkdep stays in DEPLOYED and never turns STABLE

2022-12-06 Thread Gyula Fóra
Hi!

The stable state is not marked in the reconciliation state field but
instead using the last stable spec field. Deployed simply means that
something is running :)

The structure of the status is a bit complex to avoid too much redundancy
and limit the size and is mostly considered to be internal for the operator.

For a user facing view of the resource state you can check:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/architecture/#flink-resource-lifecycle

The status class contains a nice helper method to get the
ResourceLifecycleState enum if you want a single condensed status view .

Cheers
Gyula

On Tue, 6 Dec 2022 at 04:12, Paul Lam  wrote:

> Hi all,
>
> I’m trying out Flink K8s operator 1.2 with K8s 1.25 and Flink 1.15.
>
> I found kubectl shows that flinkdeployments stay in DEPLOYED like forever
> (the Flink job status are RUNNING),  but the operator logs shows that the
> flinkdeployments already turned into STABLE.
>
> Is that a known bug or I missed something? Thanks a lot!
>
> Best,
> Paul Lam
>
>
>


Re: Cleanup for high-availability.storageDir

2022-12-05 Thread Gyula Fóra
Hi!

There are some files that are not cleaned up over time in the HA dir that
need to be cleaned up by the user:

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/concepts/overview/#jobresultstore-resource-leak


Hope this helps
Gyula

On Mon, 5 Dec 2022 at 11:56, Alexis Sarda-Espinosa 
wrote:

> Hello,
>
> I see the number of entries in the directory configured for HA increases
> over time, particularly in the context of job upgrades in a Kubernetes
> environment managed by the operator. Would it be safe to assume that any
> files that haven't been updated in a while can be deleted? Assuming the
> checkpointing interval is much smaller than the period used to determine if
> files are too old.
>
> Regards,
> Alexis.
>
>


Re: flink-kubernetes-operator: image entrypoint misbehaves due to inability to write

2022-12-01 Thread Gyula Fóra
As I also mentioned in the email, this is on our roadmap for the operator
but we have not implemented it yet because this feature only became
available as of Flink 1.16.

Ideally in the operator FlinkDeployment spec.flinkConfiguration section the
user should be able to use env vars if this is added.

Gyula

On Thu, Dec 1, 2022 at 5:18 PM Andrew Otto  wrote:

> > Andrew please see my previous response, that covers the secrets case.
> > kubernetes.jobmanager.entrypoint.args: -D
> datadog.secret.conf=$MY_SECRET_ENV
>
> This way^?  Ya that makes sense.  It'd be nice if there was a way to get
> Secrets into the values used for rendering flink-conf.yaml too, so the
> confs will be all in the same place.
>
>
>
>
>
> On Thu, Dec 1, 2022 at 9:30 AM Gyula Fóra  wrote:
>
>> Andrew please see my previous response, that covers the secrets case.
>>
>> Gyula
>>
>> On Thu, Dec 1, 2022 at 2:54 PM Andrew Otto  wrote:
>>
>>> > several failures to write into $FLINK_HOME/conf/.
>>> I'm working on
>>> <https://gerrit.wikimedia.org/r/c/operations/docker-images/production-images/+/858356/>
>>> building Flink and flink-kubernetes-operator images for the Wikimedia
>>> Foundation, and I found this strange as well.  It makes sense in a docker /
>>> docker-compose only environment, but in k8s where you have ConfigMap
>>> responsible for flink-conf.yaml, and (also logs all going to the console,
>>> not FLINK_HOME/log), I'd prefer if the image was not modified by the
>>> ENTRYPOINT.
>>>
>>> I believe that for flink-kubernetes-operator, the docker-entrypoint.sh
>>> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/docker-entrypoint.sh>
>>> provided by flink-docker is not really needed.  It seems to be written more
>>> for deployments outside of kubernetes.
>>>  flink-kubernetes-operator never calls the built in subcommands (e.g.
>>> standalone-job), and always runs in 'pass-through' mode, just execing the
>>> args passed to it.  At WMF we build
>>> <https://doc.wikimedia.org/docker-pkg/> our own images, so I'm planning
>>> on removing all of the stuff in ENTRYPOINTs that mangles the image.
>>> Anything that I might want to keep from docker-entrypoint.sh (like enabling
>>> jemoalloc
>>> <https://gerrit.wikimedia.org/r/c/operations/docker-images/production-images/+/858356/6/images/flink/Dockerfile.template#73>)
>>> I should be able to do in the Dockerfile at image creation time.
>>>
>>> >  want to set an API key as part of the flink-conf.yaml file, but we
>>> don't want it to be persisted in Kubernetes or in our version control
>>> I personally am still pretty green at k8s, but would using kubernetes
>>> Secrets
>>> <https://kubernetes.io/docs/concepts/configuration/secret/#use-case-secret-visible-to-one-container-in-a-pod>
>>> work for your use case? I know we use them at WMF, but from a quick glance
>>> I'm not sure how to combine them in flink-kubernetes-operator's ConfigMap
>>> that renders flink-conf.yaml, but I feel like there should be a way.
>>>
>>>
>>>
>>>
>>> On Wed, Nov 30, 2022 at 4:59 PM Gyula Fóra  wrote:
>>>
>>>> Hi Lucas!
>>>>
>>>> The Flink kubernetes integration itself is responsible for mounting the
>>>> configmap and overwriting the entrypoint not the operator. Therefore this
>>>> is not something we can easily change from the operator side. However I
>>>> think we are looking at the problem from the wrong side and there may be a
>>>> solution already :)
>>>>
>>>> Ideally what you want is ENV replacement in Flink configuration. This
>>>> is not something that the Flink community has added yet unfortunately but
>>>> we have it on our radar for the operator at least (
>>>> https://issues.apache.org/jira/browse/FLINK-27491). It will probably
>>>> be added in the next 1.4.0 version.
>>>>
>>>> This will be possible from Flink 1.16 which introduced a small feature
>>>> that allows us to inject parameters to the kubernetes entrypoints:
>>>> https://issues.apache.org/jira/browse/FLINK-29123
>>>>
>>>> https://github.com/apache/flink/commit/c37643031dca2e6d4c299c0d704081a8bffece1d
>>>>
>>>> While it's not implemented in the operator yet, you could try setting
>>>> the following config in Flink 1.16.0:
>>>> kubernetes.jobmanager.entrypoint.args: -D
>>>>

Re: flink-kubernetes-operator: image entrypoint misbehaves due to inability to write

2022-12-01 Thread Gyula Fóra
Andrew please see my previous response, that covers the secrets case.

Gyula

On Thu, Dec 1, 2022 at 2:54 PM Andrew Otto  wrote:

> > several failures to write into $FLINK_HOME/conf/.
> I'm working on
> <https://gerrit.wikimedia.org/r/c/operations/docker-images/production-images/+/858356/>
> building Flink and flink-kubernetes-operator images for the Wikimedia
> Foundation, and I found this strange as well.  It makes sense in a docker /
> docker-compose only environment, but in k8s where you have ConfigMap
> responsible for flink-conf.yaml, and (also logs all going to the console,
> not FLINK_HOME/log), I'd prefer if the image was not modified by the
> ENTRYPOINT.
>
> I believe that for flink-kubernetes-operator, the docker-entrypoint.sh
> <https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/docker-entrypoint.sh>
> provided by flink-docker is not really needed.  It seems to be written more
> for deployments outside of kubernetes.
>  flink-kubernetes-operator never calls the built in subcommands (e.g.
> standalone-job), and always runs in 'pass-through' mode, just execing the
> args passed to it.  At WMF we build
> <https://doc.wikimedia.org/docker-pkg/> our own images, so I'm planning
> on removing all of the stuff in ENTRYPOINTs that mangles the image.
> Anything that I might want to keep from docker-entrypoint.sh (like enabling
> jemoalloc
> <https://gerrit.wikimedia.org/r/c/operations/docker-images/production-images/+/858356/6/images/flink/Dockerfile.template#73>)
> I should be able to do in the Dockerfile at image creation time.
>
> >  want to set an API key as part of the flink-conf.yaml file, but we
> don't want it to be persisted in Kubernetes or in our version control
> I personally am still pretty green at k8s, but would using kubernetes
> Secrets
> <https://kubernetes.io/docs/concepts/configuration/secret/#use-case-secret-visible-to-one-container-in-a-pod>
> work for your use case? I know we use them at WMF, but from a quick glance
> I'm not sure how to combine them in flink-kubernetes-operator's ConfigMap
> that renders flink-conf.yaml, but I feel like there should be a way.
>
>
>
>
> On Wed, Nov 30, 2022 at 4:59 PM Gyula Fóra  wrote:
>
>> Hi Lucas!
>>
>> The Flink kubernetes integration itself is responsible for mounting the
>> configmap and overwriting the entrypoint not the operator. Therefore this
>> is not something we can easily change from the operator side. However I
>> think we are looking at the problem from the wrong side and there may be a
>> solution already :)
>>
>> Ideally what you want is ENV replacement in Flink configuration. This is
>> not something that the Flink community has added yet unfortunately but we
>> have it on our radar for the operator at least (
>> https://issues.apache.org/jira/browse/FLINK-27491). It will probably be
>> added in the next 1.4.0 version.
>>
>> This will be possible from Flink 1.16 which introduced a small feature
>> that allows us to inject parameters to the kubernetes entrypoints:
>> https://issues.apache.org/jira/browse/FLINK-29123
>>
>> https://github.com/apache/flink/commit/c37643031dca2e6d4c299c0d704081a8bffece1d
>>
>> While it's not implemented in the operator yet, you could try setting the
>> following config in Flink 1.16.0:
>> kubernetes.jobmanager.entrypoint.args: -D
>> datadog.secret.conf=$MY_SECRET_ENV
>> kubernetes.taskmanager.entrypoint.args: -D
>> datadog.secret.conf=$MY_SECRET_ENV
>>
>> If you use this configuration together with the default native mode in
>> the operator, it should work I believe.
>>
>> Please try and let me know!
>> Gyula
>>
>> On Wed, Nov 30, 2022 at 10:36 PM Lucas Caparelli <
>> lucas.capare...@gympass.com> wrote:
>>
>>> Hello folks,
>>>
>>> Not sure if this is the best list for this, sorry if it isn't. I'd
>>> appreciate some pointers :-)
>>>
>>> When using flink-kubernetes-operator [1], docker-entrypoint.sh [2] goes
>>> through several failures to write into $FLINK_HOME/conf/. We believe this
>>> is due to this volume being mounted from a ConfigMap, which means it's
>>> read-only.
>>>
>>> This has been reported in the past in GCP's operator, but I was unable
>>> to find any kind of resolution for it:
>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/issues/213
>>>
>>> In our use case, we want to set an API key as part of the
>>> flink-conf.yaml file, but we don't want it to be persisted in Kubernetes or
>>> in our version control, since it's 

Re: flink-kubernetes-operator: image entrypoint misbehaves due to inability to write

2022-11-30 Thread Gyula Fóra
Hi Lucas!

The Flink kubernetes integration itself is responsible for mounting the
configmap and overwriting the entrypoint not the operator. Therefore this
is not something we can easily change from the operator side. However I
think we are looking at the problem from the wrong side and there may be a
solution already :)

Ideally what you want is ENV replacement in Flink configuration. This is
not something that the Flink community has added yet unfortunately but we
have it on our radar for the operator at least (
https://issues.apache.org/jira/browse/FLINK-27491). It will probably be
added in the next 1.4.0 version.

This will be possible from Flink 1.16 which introduced a small feature that
allows us to inject parameters to the kubernetes entrypoints:
https://issues.apache.org/jira/browse/FLINK-29123
https://github.com/apache/flink/commit/c37643031dca2e6d4c299c0d704081a8bffece1d

While it's not implemented in the operator yet, you could try setting the
following config in Flink 1.16.0:
kubernetes.jobmanager.entrypoint.args: -D datadog.secret.conf=$MY_SECRET_ENV
kubernetes.taskmanager.entrypoint.args: -D
datadog.secret.conf=$MY_SECRET_ENV

If you use this configuration together with the default native mode in the
operator, it should work I believe.

Please try and let me know!
Gyula

On Wed, Nov 30, 2022 at 10:36 PM Lucas Caparelli <
lucas.capare...@gympass.com> wrote:

> Hello folks,
>
> Not sure if this is the best list for this, sorry if it isn't. I'd
> appreciate some pointers :-)
>
> When using flink-kubernetes-operator [1], docker-entrypoint.sh [2] goes
> through several failures to write into $FLINK_HOME/conf/. We believe this
> is due to this volume being mounted from a ConfigMap, which means it's
> read-only.
>
> This has been reported in the past in GCP's operator, but I was unable to
> find any kind of resolution for it:
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/issues/213
>
> In our use case, we want to set an API key as part of the flink-conf.yaml
> file, but we don't want it to be persisted in Kubernetes or in our version
> control, since it's sensitive data. This API Key is used by Flink to report
> metrics to Datadog [3].
>
> We have automation in place which allows us to accomplish this by setting
> environment variables pointing to a path in our secret manager, which only
> gets injected during runtime. That part is working fine.
>
> However, we're trying to inject this secret using the FLINK_PROPERTIES
> variable, which is appended [4] to the flink-conf.yaml file in the
> docker-entrypoint script, which fails due to the filesystem where the file
> is being read-only.
>
> We attempted working around this in 2 different ways:
>
>   - providing our own .spec.containers[0].command, where we copied over
> /opt/flink to /tmp/flink and set FLINK_HOME=/tmp/flink. This did not work
> because the operator overwrote it and replaced it with its original
> command/args;
>   - providing an initContainer sharing the volumes so it could make the
> copy without being overridden by the operator's command/args. This did not
> work because the initContainer present in the spec never makes it to the
> resulting Deployment, it seems the operator ignores it.
>
> We have some questions:
>
> 1. Is this overriding of the pod template present in FlinkDeployment
> intentional? That is, should our custom command/args and initContainers
> have been overwritten? If so, I find it a bit confusing that these fields
> are present and available for use at all.
> 2. Since the ConfigMap volume will always be mounted as read-only, it
> seems to me there's some adjustments to be made in order for this script to
> work correctly. Do you think it would make sense for the script to copy
> over contents from the ConfigMap volume to a writable directory during
> initialization, and then use this copy for any subsequent operation?
> Perhaps copying over to $FLINK_HOME, which the user could set themselves,
> maybe even with a sane default which wouldn't fail on writes (eg
> /tmp/flink).
>
> Thanks in advance for your attention and hard work on the project!
>
> [1]: https://github.com/apache/flink-kubernetes-operator
> [2]:
> https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/docker-entrypoint.sh
> [3]: https://docs.datadoghq.com/integrations/flink/
> [4]:
> https://github.com/apache/flink-docker/blob/master/1.16/scala_2.12-java11-ubuntu/docker-entrypoint.sh#L86-L88
>


Re: Savepoint restore mode for the Kubernetes operator

2022-11-29 Thread Gyula Fóra
The operator might call dispose on an old savepoint that’s true, but I am
not sure if the dispose api call would actually corrupt it.

Gyula

On Tue, 29 Nov 2022 at 09:28, Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hi Hangxiang,
>
> but, if I understand correctly, setting restore mode to CLAIM means that
> the job might create a new incremental checkpoint based on the savepoint,
> right? And if the operator then decides to clean up the savepoint, the
> checkpoint would be corrupted, no?
>
> Regards,
> Alexis.
>
> Am Mo., 28. Nov. 2022 um 05:17 Uhr schrieb Hangxiang Yu <
> master...@gmail.com>:
>
>> Hi, Alexis.
>> IIUC, There is no conflict between savepoint history and restore mode.
>> Restore mode cares about whether/how we manage the savepoint of old job.
>> Savepoint management in operator only cares about savepoint history of
>> new job.
>> In other words, savepoint cleanup should not clean the savepoint from the
>> old job which should only be controlled by restore mode.
>> So I think you could also set restore mode according to your needs.
>>
>>
>> On Wed, Nov 16, 2022 at 10:41 PM Alexis Sarda-Espinosa <
>> sarda.espin...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Is there a recommended configuration for the restore mode of jobs
>>> managed by the operator?
>>>
>>> Since the documentation states that the operator keeps a savepoint
>>> history to perform cleanup, I imagine restore mode should always be
>>> NO_CLAIM, but I just want to confirm.
>>>
>>> Regards,
>>> Alexis.
>>>
>>
>>
>> --
>> Best,
>> Hangxiang.
>>
>


Re: What is the recommended cpu.memory configuration of the Flink kubernetes operator in the production environment ?

2022-11-27 Thread Gyula Fóra
We don't have a recommendation, this will depend on the number of jobs you
manage.

If you are managing a few jobs let's say < 50, the default resource
configuration probably works well.
The operator reports JVM metrics similar to Flink jobs so if you see longer
GC pauses you could simply add more memory/cpu as you scale the number of
managed jobs.

The operator is not very CPU intensive, but it uses some heap space for
caching information about the jobs.

Gyula

On Mon, Nov 28, 2022 at 8:01 AM hjw <18814122...@163.com> wrote:

> Flink Kubernetes Operator is a server is responsible to reconcile the
> FlinkDeployment.
> The operator will continuously monitor the status of Flink jobs.The
> stability of the operator's service is also important.
> What is the recommended cpu.memory configuration of the Flink kubernetes
> operator in the production environment ?
>
>
> --
> Best,
> Hjw
>


Re: [Flink K8s operator] HA metadata not available to restore from last state

2022-11-22 Thread Gyula Fóra
Hi Dongwon!

This error mostly occurs when using Flink 1.14 and the Flink cluster goes
into a terminal state. If a Flink job is FAILED/FINISHED (such as it
exhausted the retry strategy), in Flink 1.14 the cluster shuts itself down
and removes the HA metadata.

In these cases the operator will only see that the cluster completely
disappeared and there is no HA metadata and it will throw the error you
mentioned. It does not know what happened and doesn't have any way to
recover checkpoint information.

This is fixed in Flink 1.15 where even after terminal FAILED/FINISHED
states, the jobmanager would not shut down. This allows the operator to
observe this terminal state and actually recover the job even if the HA
metadata was removed.

To summarize, this is mostly caused by Flink 1.14 behaviour that the
operator cannot control. Upgrading to 1.15 allows much more robustness and
should eliminate most of these cases.

Cheers,
Gyula

On Tue, Nov 22, 2022 at 9:43 AM Dongwon Kim  wrote:

> Hi,
>
> While using a last-state upgrade mode on flink-k8s-operator-1.2.0 and
> flink-1.14.3, we're occasionally facing the following error:
>
> Status:
>>   Cluster Info:
>> Flink - Revision: 98997ea @ 2022-01-08T23:23:54+01:00
>> Flink - Version:  1.14.3
>>   Error:  HA metadata not available to restore
>> from last state. It is possible that the job has finished or terminally
>> failed, or the configmaps have been deleted. Manual restore required.
>>   Job Manager Deployment Status:  ERROR
>>   Job Status:
>> Job Id:e8dd04ea4b03f1817a4a4b9e5282f433
>> Job Name:  flinktest
>> Savepoint Info:
>>   Last Periodic Savepoint Timestamp:  0
>>   Savepoint History:
>>   Trigger Id:
>>   Trigger Timestamp:  0
>>   Trigger Type:   UNKNOWN
>> Start Time:   1668660381400
>> State:RECONCILING
>> Update Time:  1668994910151
>>   Reconciliation Status:
>> Last Reconciled Spec:  ...
>> Reconciliation Timestamp:  1668660371853
>> State: DEPLOYED
>>   Task Manager:
>> Label Selector:  component=taskmanager,app=flinktest
>> Replicas:1
>> Events:
>>   Type ReasonAge From
>> Message
>>    --
>> ---
>>   Normal   JobStatusChanged  30m Job
>> Job status changed from RUNNING to RESTARTING
>>   Normal   JobStatusChanged  29m Job
>> Job status changed from RESTARTING to CREATED
>>   Normal   JobStatusChanged  28m Job
>> Job status changed from CREATED to RESTARTING
>>   Warning  Missing   26m JobManagerDeployment
>> Missing JobManager deployment
>>   Warning  RestoreFailed 9s (x106 over 26m)  JobManagerDeployment
>> HA metadata not available to restore from last state. It is possible that
>> the job has finished or terminally failed, or the configmaps have been
>> deleted. Manual restore required.
>>   Normal   Submit9s (x106 over 26m)  JobManagerDeployment
>> Starting deployment
>
>
> We're happy with the last state mode most of the time, but we face it
> occasionally.
>
> We found that it's not easy to reproduce the problem; we tried to kill JMs
> and TMs and even shutdown the nodes on which JMs and TMs are running.
>
> We also checked that the file size is not zero.
>
> Thanks,
>
> Dongwon
>
>
>


Re: Deploy Flink Operator in an k8s enviroment without helm?

2022-11-17 Thread Gyula Fóra
Yes, but as we said you need to install all required resources manually in
that case.

Gyula

On Fri, Nov 18, 2022 at 6:59 AM Mark Lee  wrote:

> Thanks Gyula,
>
>  My basic OS does not have a helm client and don’t allow install it.
> Could I deploy flink operator in such situation?
>
>
>
> *发件人:* user-return-51648-lifuqiong00=126@flink.apache.org
>  *代表 *Gyula Fóra
> *发送时间:* 2022年11月18日 13:26
> *收件人:* Biao Geng 
> *抄送:* Mark Lee ; user@flink.apache.org
> *主题:* Re: Deploy Flink Operator in an k8s enviroment without helm?
>
>
>
> Adding to what Biao Geng said, yes it is completely possible and other
> installation methods are used by many users already.
>
> You can check the Helm templates in the repo to get an idea what resources
> you need to create.
> Actually if you run `helm template flink-kubernetes-operator
> helm/flink-kubernetes-operator` from the git repo your local helm client
> will render the templates and print the resources that it would generate.
> That is a great way to get started with manual installation.
>
> There are also OLM bundles available for the current operator releases,
> these are not yet officially supported by the Flink community but we are
> working towards that:
> https://operatorhub.io/operator/flink-kubernetes-operator
> maybe this is more relevant in your environment.
>
> To summarize, there are many ways to install the operator, Helm is just
> one of the more convenient ones, that's why we use it as the example in the
> repo.
> Production setups usually need to customize at least parts of the
> deployment logic in any case.
>
> Gyula
>
>
>
>
>
> On Fri, Nov 18, 2022 at 6:12 AM Biao Geng  wrote:
>
> Hi Mark,
>
> I believe you can do that without helm. Just like that you can install
> some software in CentOS without yum.
>
> But you may have to handle some basic setup by yourself. For the operator,
> you at least have to prepare RBAC creation, serviceAccount creation,
> Deployment creation or Webhook creation which if you want to use. Also, if
> you want to uninstall the operator, you should clear those resources by
> hand. It is not very hard but does require some hand work.
>
>
>
> Best,
>
> Biao Geng
>
>
>
> 获取 Outlook for iOS <https://aka.ms/o0ukef>
> --
>
> *发件人**:* Mark Lee 
> *发送时间**:* Friday, November 18, 2022 12:57:26 PM
> *收件人**:* user@flink.apache.org 
> *主题**:* Deploy Flink Operator in an k8s enviroment without helm?
>
>
>
> Hi all,
>
>I am trying to deploy flink operator followed Quick Start
> <https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/>.
> But it seems need helm client installed in Kubernetes environment. Could we
> deploy flink operator without helm client installed?
>
> Now you can deploy the selected stable Flink Kubernetes Operator version
> using the included Helm chart:
>
> helm repo add flink-operator-repo
> https://downloads.apache.org/flink/flink-kubernetes-operator-
> /
>
> helm install flink-kubernetes-operator
> flink-operator-repo/flink-kubernetes-operator
>
>
>
> Thank you.
>
>


Re: Deploy Flink Operator in an k8s enviroment without helm?

2022-11-17 Thread Gyula Fóra
Adding to what Biao Geng said, yes it is completely possible and other
installation methods are used by many users already.

You can check the Helm templates in the repo to get an idea what resources
you need to create.
Actually if you run `helm template flink-kubernetes-operator
helm/flink-kubernetes-operator` from the git repo your local helm client
will render the templates and print the resources that it would generate.
That is a great way to get started with manual installation.

There are also OLM bundles available for the current operator releases,
these are not yet officially supported by the Flink community but we are
working towards that:
https://operatorhub.io/operator/flink-kubernetes-operator
maybe this is more relevant in your environment.

To summarize, there are many ways to install the operator, Helm is just one
of the more convenient ones, that's why we use it as the example in the
repo.
Production setups usually need to customize at least parts of the
deployment logic in any case.

Gyula


On Fri, Nov 18, 2022 at 6:12 AM Biao Geng  wrote:

> Hi Mark,
> I believe you can do that without helm. Just like that you can install
> some software in CentOS without yum.
> But you may have to handle some basic setup by yourself. For the operator,
> you at least have to prepare RBAC creation, serviceAccount creation,
> Deployment creation or Webhook creation which if you want to use. Also, if
> you want to uninstall the operator, you should clear those resources by
> hand. It is not very hard but does require some hand work.
>
> Best,
> Biao Geng
>
> 获取 Outlook for iOS 
> --
> *发件人:* Mark Lee 
> *发送时间:* Friday, November 18, 2022 12:57:26 PM
> *收件人:* user@flink.apache.org 
> *主题:* Deploy Flink Operator in an k8s enviroment without helm?
>
>
> Hi all,
>
>I am trying to deploy flink operator followed Quick Start
> .
> But it seems need helm client installed in Kubernetes environment. Could we
> deploy flink operator without helm client installed?
>
> Now you can deploy the selected stable Flink Kubernetes Operator version
> using the included Helm chart:
>
> helm repo add flink-operator-repo
> https://downloads.apache.org/flink/flink-kubernetes-operator-
> /
>
> helm install flink-kubernetes-operator
> flink-operator-repo/flink-kubernetes-operator
>
>
>
> Thank you.
>


Re: Owner reference with the Kubernetes operator

2022-11-16 Thread Gyula Fóra
This has been changed in the current snapshot release:
https://issues.apache.org/jira/browse/FLINK-28979

It will be part of the 1.3.0 version.

On Wed, Nov 16, 2022 at 3:32 PM Alexis Sarda-Espinosa <
sarda.espin...@gmail.com> wrote:

> Hello,
>
> Is there a particular reason the operator doesn't set owner references for
> the Deployments it creates as a result of a FlinkDeployment CR? This makes
> tracking in the Argo CD UI impossible. (To be clear, I mean a reference
> from the Deployment to the FlinkDeployment).
>
> Regards,
> Alexis.
>
>


Re: support escaping `#` in flink job spec in Flink-operator

2022-11-07 Thread Gyula Fóra
It is also possible that this is a problem of the Flink native Kubernetes
integration, we have to check where exactly it goes wrong before we try to
fix it .

We simply set the args into a Flink config and pass it to the native
deployment logic in the operator.

Gyula

On Tue, 8 Nov 2022 at 07:37, Gyula Fóra  wrote:

> Hi!
>
> How do you submit your yaml?
>
> It’s possible that this is not operator problem. Did you try submitting
> the deployment in json format instead?
>
> If it still doesn't work please open a JIRA ticket with the details to
> reproduce and what you have tried :)
>
> Cheers
> Gyula
>
> On Tue, 8 Nov 2022 at 04:56, liuxiangcao  wrote:
>
>> Hi,
>>
>> We have a job that contains `#` as part of mainArgs and it used to work
>> on Ververica. Now we are switching to our own control plane to deploy to
>> flink-operaotor and the job started to fail due to the main args string
>> getting truncated at `#` character when passed to flink application. I
>> believe this is due to characters after `#` being interpreted as comments
>> in yaml file. To support having `#` in the mainArgs, the flink operator
>> needs to escape `#` when generating k8 yaml file.
>>
>> Assuming the mainArgs contain '\"xyz#abc\".
>>
>> Here is the stack-trace:
>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
>> not parse value '\"xyz' *(Note: truncated by #)*
>>
>> for key  '$internal.application.program-args'.\n\tat
>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat
>>  
>> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat
>>  
>> org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
>>  
>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
>>  
>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused
>>  by: *java.lang.IllegalArgumentException: Could not split string. Quoting 
>> was not closed properly*.\n\tat 
>> org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat
>>  
>> org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat
>>  
>> org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat
>>  
>> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat
>>  
>> org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat
>>  java.base/java.util.Optional.map(Optional.java:265)\n\tat 
>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t...
>>  5 more\n"},"@version":1,"source_host":"xx","message":"Could not create 
>> application 
>> program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>>
>>
>>  Can someone take a look and help fixing this issue? or I can help fixing
>> this if someone can point me in the right direction.
>>
>> --
>> Best Wishes & Regards
>> Shawn Xiangcao Liu
>>
>


Re: support escaping `#` in flink job spec in Flink-operator

2022-11-07 Thread Gyula Fóra
Hi!

How do you submit your yaml?

It’s possible that this is not operator problem. Did you try submitting the
deployment in json format instead?

If it still doesn't work please open a JIRA ticket with the details to
reproduce and what you have tried :)

Cheers
Gyula

On Tue, 8 Nov 2022 at 04:56, liuxiangcao  wrote:

> Hi,
>
> We have a job that contains `#` as part of mainArgs and it used to work on
> Ververica. Now we are switching to our own control plane to deploy to
> flink-operaotor and the job started to fail due to the main args string
> getting truncated at `#` character when passed to flink application. I
> believe this is due to characters after `#` being interpreted as comments
> in yaml file. To support having `#` in the mainArgs, the flink operator
> needs to escape `#` when generating k8 yaml file.
>
> Assuming the mainArgs contain '\"xyz#abc\".
>
> Here is the stack-trace:
> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
> not parse value '\"xyz' *(Note: truncated by #)*
>
> for key  '$internal.application.program-args'.\n\tat
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat
>  
> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat
>  
> org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
>  
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
>  
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused
>  by: *java.lang.IllegalArgumentException: Could not split string. Quoting was 
> not closed properly*.\n\tat 
> org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat
>  
> org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat
>  
> org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat
>  
> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat
>  
> org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat
>  java.base/java.util.Optional.map(Optional.java:265)\n\tat 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t...
>  5 more\n"},"@version":1,"source_host":"xx","message":"Could not create 
> application 
> program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>
>
>  Can someone take a look and help fixing this issue? or I can help fixing
> this if someone can point me in the right direction.
>
> --
> Best Wishes & Regards
> Shawn Xiangcao Liu
>


Re: configMap value error when using flink-operator?

2022-10-26 Thread Gyula Fóra
Max sense, I will fix this!

Gyula

On Thu, Oct 27, 2022 at 4:09 AM Biao Geng  wrote:

> +1 to Yang's suggestion.
>
> Yang Wang  于2022年10月26日周三 20:00写道:
>
>> Maybe we could change the values of *taskmanager.numberOfTaskSlots* and 
>> *parallelism.default
>> *in flink-conf.yaml of Kubernetes operator to 1, which are aligned with
>> the default values in Flink codebase.
>>
>>
>> Best,
>> Yang
>>
>> Gyula Fóra  于2022年10月26日周三 15:17写道:
>>
>>> Hi!
>>>
>>> I agree that this might be confusing but let me explain what happened.
>>>
>>> In the operator you can define default flink configuration. Currently it
>>> is
>>> https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/conf/flink-conf.yaml
>>> It contains numberOfTaskSlots=2.
>>>
>>> This is more just an example of how to control default cluster
>>> configuration from the operator. Users generally define numberOfTaskSlots
>>> for every Flink resource in the flinkConfiguration setting, that would
>>> override this default.
>>>
>>> You are also free to change the operator side default flink conf to not
>>> set this, then you will have 1. In any case nobody is running real
>>> applications with 1 task slots / parallelism so this hasn't caused any
>>> problems so far :)
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Wed, Oct 26, 2022 at 7:20 AM Liting Liu (litiliu) 
>>> wrote:
>>>
>>>> hi:
>>>> I'm  trying to deploy a flink job with flink-operaotor. The
>>>> flink-operator's version is 1.2.0. And the yaml i use is here:
>>>> 
>>>> apiVersion: flink.apache.org/v1beta1
>>>> kind: FlinkDeployment
>>>> metadata:
>>>>   name: basic-example
>>>> spec:
>>>>   image: flink:1.15
>>>>   flinkVersion: v1_15
>>>>   flinkConfiguration:
>>>>   serviceAccount: flink
>>>>   jobManager:
>>>> resource:
>>>>   memory: "2048m"
>>>>   cpu: 1
>>>>   taskManager:
>>>> resource:
>>>>   memory: "2048m"
>>>>   cpu: 1
>>>>   job:
>>>> jarURI:
>>>> local:///opt/flink/examples/streaming/StateMachineExample.jar
>>>> parallelism: 2
>>>> upgradeMode: stateless
>>>> 
>>>>But i found in the generated configMap, there was a field named
>>>> "taskmanager.numberOfTaskSlots" was set to 2.  Which is very weird, since
>>>> that field was not defined by user.  And according to flink doc the
>>>> default value of "taskmanager.numberOfTaskSlots" should be 1.
>>>>
>>>


Re: configMap value error when using flink-operator?

2022-10-26 Thread Gyula Fóra
Hi!

I agree that this might be confusing but let me explain what happened.

In the operator you can define default flink configuration. Currently it is
https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/conf/flink-conf.yaml
It contains numberOfTaskSlots=2.

This is more just an example of how to control default cluster
configuration from the operator. Users generally define numberOfTaskSlots
for every Flink resource in the flinkConfiguration setting, that would
override this default.

You are also free to change the operator side default flink conf to not set
this, then you will have 1. In any case nobody is running real applications
with 1 task slots / parallelism so this hasn't caused any problems so far :)

Cheers,
Gyula

On Wed, Oct 26, 2022 at 7:20 AM Liting Liu (litiliu) 
wrote:

> hi:
> I'm  trying to deploy a flink job with flink-operaotor. The
> flink-operator's version is 1.2.0. And the yaml i use is here:
> 
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: basic-example
> spec:
>   image: flink:1.15
>   flinkVersion: v1_15
>   flinkConfiguration:
>   serviceAccount: flink
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   job:
> jarURI: local:///opt/flink/examples/streaming/StateMachineExample.jar
> parallelism: 2
> upgradeMode: stateless
> 
>But i found in the generated configMap, there was a field named
> "taskmanager.numberOfTaskSlots" was set to 2.  Which is very weird, since
> that field was not defined by user.  And according to flink doc the
> default value of "taskmanager.numberOfTaskSlots" should be 1.
>


Re: status no clear when deploying batch job with flink-k8s-operator

2022-10-25 Thread Gyula Fóra
When a batch job finishes and the cluster is shut down the operator cannot
observe the status. It is impossible to tell whether it finished or not.

Try upgrading to Flink 1.15, there this is solved.

Cheers,
Gyula

On Tue, Oct 25, 2022 at 9:23 AM Liting Liu (litiliu) 
wrote:

> Hi, I'm deploying a flink batch job with flink-k8s-operator.
> My flink-k8s-operator's version is 1.2.0 and flink's version is 1.14.6.  I
> found after the batch job execute finish, the jobManagerDeploymentStatus
> field became "MISSING" in FlinkDeployment crd. And the error field became
> "Missing JobManager deployment".
> Is it better to add more information(or field) to indicate this batch job
> has finished normally?
>
> `
> status:
>   clusterInfo:
> flink-revision: a921a4d @ 2022-09-09T10:18:38+02:00
> flink-version: 1.14.6
>   error: Missing JobManager deployment
>   jobManagerDeploymentStatus: MISSING
>   jobStatus:
> jobId: 3c5807b038300f46154d72c58f074715
> jobName: batch-job-lab-o8yln9
> savepointInfo:
>   lastPeriodicSavepointTimestamp: 0
>   savepointHistory: []
>   triggerId: ''
>   triggerTimestamp: 0
>   triggerType: UNKNOWN
> startTime: '181370751'
> state: RECONCILING
> updateTime: '181379021'
> 
>


Re: Flink Native K8S RBAC

2022-10-19 Thread Gyula Fóra
Hi!

As a reference you can look at how the Flink Kubernetes Operator manages
RBAC settings:

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/rbac/
https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/templates/rbac.yaml

Cheers,
Gyula

On Wed, Oct 19, 2022 at 9:46 PM Calvin D Souza via user <
user@flink.apache.org> wrote:

> Hi,
>
> I am using custom service account for flink native k8s. These are the
> rules for the clusterrole I’m using:
>
> rules:
> - apiGroups: [""]
> resources: ["pods"]
> verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
> - apiGroups: [""]
> resources: ["configmaps"]
> verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
> - apiGroups: [""]
> resources: ["services"]
> verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
> - apiGroups: ["apps"]
> resources: ["deployments"]
> verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
> - apiGroups: [""]
> resources: ["pods/log"]
> verbs: ["get", "list", "watch"]
> - apiGroups: ["extensions"]
> resources: ["deployments"]
> verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
>
>
> Are there any that I am missing or that are not needed?
>
> Thanks,
> Calvin
>


Re: Does kubernetes operator support manually triggering savepoint with canceling the job?

2022-10-19 Thread Gyula Fóra
I think you are confusing manual savepoints with savepoint upgrades.

Manual savepoints will trigger a savepoint but not shut down the job. If
you want to stop the job with savepoint you set the upgradeMode to
savepoint and set the state to SUSPENDED.
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/

The operator is not really a substitute for the CLI for these low level
operations, it's possible but the main goal is to provide comprehensive
lifecycle management easily.
You don't usually need to stop with savepoint... Instead you usually do a
stateful upgrade by changing the CR.

Hope this helps
Gyula

On Wed, Oct 19, 2022 at 9:34 AM Liting Liu (litiliu) 
wrote:

> hi, Geng:
>I successfully triggered savePoint manually, but the job was still
> running after finish taking savepoint. I expect this job to be deleted,
> because the savepoint has been taken.
>   jobStatus:
> jobId: 9de925e9d4a67e04ef6279925450907c
> jobName: sql-te-lab-s334c9
> savepointInfo:
>   lastPeriodicSavepointTimestamp: 0
>   lastSavepoint:
> location: >-
>   hdfs://flink/sql-te/savepoint-9de925-b9ead1c58e7b
> timeStamp: 1666163606426
> triggerType: MANUAL
>   savepointHistory:
> - location: >-
> hdfs://flink/sql-te/savepoint-9de925-b9ead1c58e7b
>   timeStamp: 1666163606426
>   triggerType: MANUAL
>   triggerId: ''
>   triggerTimestamp: 0
>   triggerType: MANUAL
> startTime: '1666161791058'
> state: RUNNING
>
> --
> *发件人:* Geng Biao 
> *发送时间:* 2022年10月4日 13:57
> *收件人:* Liting Liu (litiliu) ; user <
> user@flink.apache.org>
> *主题:* Re: Does kubernetes operator support manually triggering savepoint
> with canceling the job?
>
> Hi liting,
>
> Maybe you can check codes of deleteClusterDeployment. When savepoint is
> finished, the operator will delete the job. Is the job not deleted as
> expected?
>
> Best,
> Bias Geng
>
> 获取 Outlook for iOS 
> --
> *发件人:* Liting Liu (litiliu) 
> *发送时间:* Tuesday, October 4, 2022 12:53:45 PM
> *收件人:* user 
> *主题:* Does kubernetes operator support manually triggering savepoint with
> canceling the job?
>
> Hello Flink community:
> I want to manually trigger the savepoint with the help of kubernetes
> operator. But seems kubernetes operator hasn't provided an option for
> whether cancling the job when triggering savepoint. Because the
> `cancelJob` parameter was hard coded to false in latest code
> AbstractFlinkService.java#L299
> 
> .
>   Do i have to watch the savepoint finish myself, then cancel this job
> ASAP?  And do we have a plan to support this option?
>


Re: Activate Flink HA without checkpoints on k8S

2022-10-13 Thread Gyula Fóra
Without HA, if the jobmanager goes down, job information is lost so the job
won’t be restarted after the JM comes back up.

Gyula

On Thu, 13 Oct 2022 at 19:07, marco andreas 
wrote:

>
>
> Hello,
>
> Can someone explain to me what is the point of using HA when deploying an
> application cluster with a single JM and the checkpoints are not activated.
>
> AFAK when the pod of the JM goes down kubernetes will restart it anyway so
> we don't need to activate the HA in this case.
>
> Maybe there's something else that I am missing here, so if someone could
> give me an explanation it would be great .
>
> Sincerely,
>


Re: HA not working in standalone mode for operator 1.2

2022-10-13 Thread Gyula Fóra
Before we dive further into this can you please explain the jarURI problem
your are trying to solve by switching to standalone?

The native mode should work well in almost any setup.

Gyula

On Thu, 13 Oct 2022 at 21:41, Javier Vegas  wrote:

> Hi, I have a S3 HA Flink app that works as expected deployed via
> operator 1.2 in native mode, but I am seeing errors when switching to
> standalone mode (which I want to do mostly to save me having to set jarURI
> explicitly).
> I can see the job manager writes the JobGraph in S3, and in the web UI I
> can see it creates the jobs, but the taskmanager sits there doing nothing
> as if could not communicate with the jobmanager. I can see also that the
> operator has created two services, while native mode creates only the rest
> service. After a while, the taskmanager closes with the following exception:
>
> org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException:
> Could not register at the ResourceManager within the specified maximum
> registration duration 30 ms. This indicates a problem with this
> instance. Terminating now.
>
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.registrationTimeout(TaskExecutor.java:1474)
>
> at
> org.apache.flink.runtime.taskexecutor.TaskExecutor.lambda$startRegistrationTimeout$16(TaskExecutor.java:1459)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:443)
>
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:443)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
>
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
>
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
>
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
>
> at
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
>
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>
> at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
>
> at akka.actor.Actor.aroundReceive(Actor.scala:537)
>
> at akka.actor.Actor.aroundReceive$(Actor.scala:535)
>
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:548)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:231)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
>
> at java.base/java.util.concurrent.ForkJoinTask.doExec(Unknown
> Source)
>
> at
> java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(Unknown
> Source)
>
> at java.base/java.util.concurrent.ForkJoinPool.scan(Unknown
> Source)
>
> at java.base/java.util.concurrent.ForkJoinPool.runWorker(Unknown
> Source)
> at
> java.base/java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
>
>


Re: Kubernetes operator assign Job ID

2022-10-13 Thread Gyula Fóra
Hi!

This change aligns with how newer Flink 1.16+ versions handle application
job ids. There are some good reasons for doing this please see:
https://issues.apache.org/jira/browse/FLINK-19358
https://issues.apache.org/jira/browse/FLINK-29109

If you want to go back to the old behaviour you need to manually set the
following flinkConfiguration:
$internal.pipeline.job-id: 

Cheers,
Gyula

On Thu, Oct 13, 2022 at 8:55 AM Evgeniy Lyutikov 
wrote:

> Hi everyone
>
> After updating kuberneter operator to version 1.2.0 noticed that it
> started generating jobid for all deployments.
>
> 2022-10-13 06:18:30,724 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][infojob/infojob] Starting reconciliation
> 2022-10-13 06:18:30,725 o.a.f.k.o.l.AuditUtils [INFO
> ][infojob/infojob] >>> Status | Info| CREATED | The resource
> was created in Kubernetes but not yet handled by the operator
> 2022-10-13 06:18:30,726 o.a.f.k.o.c.FlinkConfigManager [INFO
> ][infojob/infojob] Generating new config
> 2022-10-13 06:18:30,729 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler
> [INFO ][infojob/infojob] Deploying for the first time
> 2022-10-13 06:18:30,755 o.a.f.k.o.l.AuditUtils [INFO
> ][infojob/infojob] >>> Status | Info| UPGRADING   | The resource is
> being upgraded
> 2022-10-13 06:18:30,755 o.a.f.k.o.r.d.ApplicationReconciler [INFO
> ][infojob/infojob] Assigning JobId override to
> ebae6c825b86db4790e34d443655a410
> 2022-10-13 06:18:30,779 o.a.f.k.o.l.AuditUtils [INFO
> ][infojob/infojob] >>> Status | Info| UPGRADING   | The resource is
> being upgraded
> 2022-10-13 06:18:30,795 o.a.f.k.o.l.AuditUtils [INFO
> ][infojob/infojob] >>> Event  | Info| SUBMIT  | Starting
> deployment
> 2022-10-13 06:18:30,795 o.a.f.k.o.s.AbstractFlinkService [INFO
> ][infojob/infojob] Deploying application cluster
> 2022-10-13 06:18:30,801 o.a.f.k.o.s.NativeFlinkService [INFO
> ][infojob/infojob] Deploying application cluster
> 2022-10-13 06:18:30,801 o.a.f.c.d.a.c.ApplicationClusterDeployer [INFO
> ][infojob/infojob] Submitting application in 'Application Mode'.
>
> How to disable this behavior?
> We are using Flink 1.14.4 and upgrade mode = 'last-state'
>
>
>
>
>
> * -- *“This message contains confidential
> information/commercial secret. If you are not the intended addressee of
> this message you may not copy, save, print or forward it to any third party
> and you are kindly requested to destroy this message and notify the sender
> thereof by email.
> Данное сообщение содержит конфиденциальную информацию/информацию,
> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
> данного сообщения, Вы не вправе копировать, сохранять, печатать или
> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
> уведомить об этом отправителя электронным письмом.”
>


Re: Job uptime metric in Flink Operator managed cluster

2022-10-13 Thread Gyula Fóra
Sorry, what I said applies to Flink 1.15+ and the savepoint upgrade mode
(not stateless).

In any case if there is no job manager there are no metrics... So not sure
how to answer your question.

Gyula

On Thu, Oct 13, 2022 at 8:24 AM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hi Gyula,
>
> Thanks for the prompt response.
>
> > The Flink operator currently does not delete the jobmanager pod when a
> deployment is suspended.
> Are you sure this is true ? I have re-tried this many times, but each time
> the pods get deleted, along with the deployment resources.
>
> Additionally, the flink-operator logs also denote that the resources are
> being deleted ( highlighted in red) after I change the state in the
> FlinkDeployment yaml from running --> suspended
> ( note: my FlinkDeployment name is *my-sample-dagger-v7 *)
>
> 2022-10-13 06:11:47,392 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][flink-operator/my-sample-dagger-v7] End of reconciliation
> 2022-10-13 06:11:49,879 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][flink-operator/parquet-savepoint-test] Starting reconciliation
> 2022-10-13 06:11:49,880 o.a.f.k.o.o.JobStatusObserver  [INFO
> ][flink-operator/parquet-savepoint-test] Observing job status
> 2022-10-13 06:11:52,710 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][flink-operator/my-sample-dagger-v7] Starting reconciliation
> 2022-10-13 06:11:52,712 o.a.f.k.o.o.JobStatusObserver  [INFO
> ][flink-operator/my-sample-dagger-v7] Observing job status
> 2022-10-13 06:11:52,721 o.a.f.k.o.o.JobStatusObserver  [INFO
> ][flink-operator/my-sample-dagger-v7] Job status (RUNNING) unchanged
> 2022-10-13 06:11:52,723 o.a.f.k.o.c.FlinkConfigManager [INFO
> ][flink-operator/my-sample-dagger-v7] Generating new config
> 2022-10-13 06:11:52,725 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler
> [INFO ][flink-operator/my-sample-dagger-v7] Detected spec change, starting
> reconciliation.
>
>
> 2022-10-13 06:11:52,788 o.a.f.k.o.r.d.AbstractJobReconciler [INFO
> ][flink-operator/my-sample-dagger-v7] Stateless job, ready for upgrade
> 2022-10-13 06:11:52,798 o.a.f.k.o.s.FlinkService   [INFO
> ][flink-operator/my-sample-dagger-v7] Job is running, cancelling job.
> 2022-10-13 06:11:52,815 o.a.f.k.o.s.FlinkService   [INFO
> ][flink-operator/my-sample-dagger-v7] Job successfully cancelled.
> 2022-10-13 06:11:52,815 o.a.f.k.o.u.FlinkUtils [INFO
> ][flink-operator/my-sample-dagger-v7] Deleting JobManager deployment and HA
> metadata.
> 2022-10-13 06:11:56,863 o.a.f.k.o.u.FlinkUtils [INFO
> ][flink-operator/my-sample-dagger-v7] Cluster shutdown completed.
> 2022-10-13 06:11:56,903 o.a.f.k.o.u.FlinkUtils [INFO
> ][flink-operator/my-sample-dagger-v7] Cluster shutdown completed.
> 2022-10-13 06:11:56,904 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][flink-operator/my-sample-dagger-v7] End of reconciliation
> 2022-10-13 06:11:56,928 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][flink-operator/my-sample-dagger-v7] Starting reconciliation
> 2022-10-13 06:11:56,930 o.a.f.k.o.r.d.AbstractFlinkResourceReconciler
> [INFO ][flink-operator/my-sample-dagger-v7] Resource fully reconciled,
> nothing to do...
>
> Also, my original doubt was around the uptime metric itself. What is the
> correct metric to use for monitoring the status ( running or suspended) of
> a job which is being managed by the Flink Operator ?
> The  *jobmanager_job_uptime_value * seems to be giving the wrong status
> as mentioned in the earlier mail.
>
> Regards,
> Meghajit
>
>
> On Wed, Oct 12, 2022 at 7:32 PM Gyula Fóra  wrote:
>
>> Hello!
>> The Flink operator currently does not delete the jobmanager pod when a
>> deployment is suspended.
>> This way the rest api stay available but no other resources are consumed
>> (taskmanagers are deleted)
>>
>> When you delete the FlinkDeployment resource completely, then the
>> jobmanager deployment is also deleted.
>>
>> In theory we could improve the logic to eventually delete the jobmanager
>> for suspended resources but we currently use this is a way to guarantee
>> more resiliency for the operator flow.
>>
>> Cheers,
>> Gyula
>>
>> On Wed, Oct 12, 2022 at 3:56 PM Meghajit Mazumdar <
>> meghajit.mazum...@gojek.com> wrote:
>>
>>> Hello,
>>>
>>> I recently deployed a Flink Operator in Kubernetes and wrote a simple
>>> FlinkDeployment CRD  to run it in application mode following this
>>> <https://github.com/apache/flink-kubernetes-operator/blob/main/examples/pod-template.yaml>
>>> .
>>>
>>> I noticed that, even after I edited the CRD and marked the
>>> spec.job.state fie

Re: allowNonRestoredState doesn't seem to be working

2022-10-13 Thread Gyula Fóra
Hi!

If you have last-state upgrade mode configured it may happen that the
allowNonRestoredState config is ignored by Flink (as the last-state upgrade
mechanism somewhat bypasses the regular submission).

Worst case scenario, you can suspend the deployment, manually record the
last checkpoint/savepoint path. Then delete the FlinkDeployment and
recreate it with the initialSavepointPath set to your checkpoint.

Cheers,
Gyula

On Thu, Oct 13, 2022 at 7:36 AM Yaroslav Tkachenko 
wrote:

> Hey everyone,
>
> I'm trying to redeploy an application using a savepoint. The new version
> of the application has a few operators with new uids and a few operators
> with the old uids. I'd like to keep the state for the old ones.
>
> I passed the allowNonRestoredState flag (using Apache Kubernetes Operator
> actually) and I can confirm that
> "execution.savepoint.ignore-unclaimed-state" is "true" after that.
>
> However, the application still fails with the following exception:
>
> "java.lang.IllegalStateException: Failed to rollback to
> checkpoint/savepoint s3p://. Cannot map checkpoint/savepoint
> state for operator d9ea0f9654a3395802138c72c1bfd35b to the new program,
> because the operator is not available in the new program. If you want to
> allow to skip this, you can set the --allowNonRestoredState option on the
> CLI."
>
> Is there a situation where allowNonRestoredState may not work? Thanks.
>


Re: Job uptime metric in Flink Operator managed cluster

2022-10-12 Thread Gyula Fóra
Hello!
The Flink operator currently does not delete the jobmanager pod when a
deployment is suspended.
This way the rest api stay available but no other resources are consumed
(taskmanagers are deleted)

When you delete the FlinkDeployment resource completely, then the
jobmanager deployment is also deleted.

In theory we could improve the logic to eventually delete the jobmanager
for suspended resources but we currently use this is a way to guarantee
more resiliency for the operator flow.

Cheers,
Gyula

On Wed, Oct 12, 2022 at 3:56 PM Meghajit Mazumdar <
meghajit.mazum...@gojek.com> wrote:

> Hello,
>
> I recently deployed a Flink Operator in Kubernetes and wrote a simple
> FlinkDeployment CRD  to run it in application mode following this
> 
> .
>
> I noticed that, even after I edited the CRD and marked the spec.job.state
> field as *suspended, *the metric *jobmanager_job_uptime_value *continued
> to show the job status as *running*. I did verify that after re-applying
> these changes, the JM and TM pods were deleted and the cluster was not
> running anymore.
>
> Am I doing something incorrect or is there some other metric to monitor
> the job status when using Flink Operator ?
>
>
>
> --
> *Regards,*
> *Meghajit*
>


Re: videos Flink Forward San Francisco 2022

2022-10-10 Thread Gyula Fóra
I think everyone would be happier with the videos published on Youtube but
it's unfortunately at the discretion of the organizer.

At this time they decided against it for some reason.

Gyula

On Mon, Oct 10, 2022 at 11:57 AM Martin  wrote:

> Hey,
>
> that's sad. Is it possible for future Flink Forwards to record again and
> publish all sessions?
>
> Best regards
> Martin
>
>
> Am 10. Oktober 2022 11:26:26 MESZ schrieb Martijn Visser <
> martijnvis...@apache.org>:
>>
>> Hi Günter,
>>
>> I've understood that only the keynotes were recorded and not the other
>> sessions.
>>
>> Best regards,
>>
>> Martijn
>>
>> On Sun, Oct 9, 2022 at 4:10 PM guenterh.lists 
>> wrote:
>>
>>> Sorry if this question was already posted
>>>
>>> By now only a few videos of the conference were published (mainly the
>>> keynotes)
>>> https://www.youtube.com/playlist?list=PLDX4T_cnKjD10qp1y2B4sLNW5KL_P6RuB
>>>
>>> Are the other presentations not going to be published?
>>>
>>> Günter
>>>
>>>


Re: Validation error trying to use standalone mode with operator 1.2.0

2022-10-07 Thread Gyula Fóra
Hi!

Seems like you still have an older version CRD installed for the
FlinkDeployment which doesn’t contain the newly introduced mode setting.

You can check
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/operations/upgrade/
for the upgrade process.

Cheers
Gyula

On Sat, 8 Oct 2022 at 00:00, Javier Vegas  wrote:

>
> I am following the operator quickstart
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.2/docs/try-flink-kubernetes-operator/quick-start/
>
> kubectl create -f
> https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.2/examples/basic.yaml
>
>
> works fine, but
>
>
> kubectl create -f
> https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.2/examples/basic-reactive.yaml
>
>
> which has the mode: standalone setting
>
>
> gives me this error:
>
>
> error: error validating "
> https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.2/examples/basic-reactive.yaml":
> error validating data: ValidationError(FlinkDeployment.spec): unknown field
> "mode" in org.apache.flink.v1beta1.FlinkDeployment.spec; if you choose to
> ignore these errors, turn validation off with --validate=false
>
>
>
>


[ANNOUNCE] Apache Flink Kubernetes Operator 1.2.0 released

2022-10-07 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.2.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/10/07/release-kubernetes-operator-1.2.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352091

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


[ANNOUNCE] Apache Flink Kubernetes Operator 1.2.0 released

2022-10-07 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.2.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/10/07/release-kubernetes-operator-1.2.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for Flink Kubernetes Operator applications can be
found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12352091

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


Re: JobManager restarts on job failure

2022-09-26 Thread Gyula Fóra
Maybe it is a stupid question but in Flink 1.15 with the following configs
enabled:

SHUTDOWN_ON_APPLICATION_FINISH = false
SUBMIT_FAILED_JOB_ON_APPLICATION_ERROR = true

I think jobmanager pod would not restart but simply go to a terminal failed
state right?

Gyula

On Mon, Sep 26, 2022 at 12:31 PM Matthias Pohl 
wrote:

> Thanks Evgeniy for reaching out to the community and Gyula for picking it
> up. I haven't looked into the k8s operator in much detail, yet. So, help me
> out if I miss something here. But I'm afraid that this is not something
> that would be fixed by upgrading to 1.15.
> The issue here is that we're recovering from an external checkpoint using
> the same job ID (the default one used for any Flink cluster in Application
> Mode) and the same cluster ID, if I understand correctly. Now, the job is
> failing during initialization. Currently, this causes a global cleanup [1].
> All HA data including the checkpoints are going to be deleted. I created
> FLINK-29415 [2] to cover this.
>
> I'm wondering whether we could work around this problem by specifying a
> random job ID through PipelineOptionsInternal [3] in the Kubernetes
> Operator. But I haven't looked into all the consequences around that. And
> it feels wrong to make this configuration parameter publicly usable.
>
> Another option might be to use ExecutionMode.RECOVERY in case of an
> initialization failure when recovering from an external Checkpoint in
> Application Mode (like we do it for internal recovery already).
>
> I'm looking forward to your opinion.
> Matthias
>
> [1]
> https://github.com/apache/flink/blob/41ac1ba13679121f1ddf14b26a36f4f4a3cc73e4/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java#L663
> [2] https://issues.apache.org/jira/browse/FLINK-29415
> [3]
> https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-core/src/main/java/org/apache/flink/configuration/PipelineOptionsInternal.java#L29
>
> On Tue, Sep 20, 2022 at 3:45 PM Gyula Fóra  wrote:
>
>> I see I think we have seen this issue with others before, in Flink 1.15
>> it is solved by the newly introduced JobResultStore. The operator also
>> configures that automatically for 1.15 to avoid this.
>>
>> Gyula
>>
>> On Tue, Sep 20, 2022 at 3:27 PM Evgeniy Lyutikov 
>> wrote:
>>
>>> Thanks for the answer.
>>> I think this is not about the operator issue, kubernetes deployment just
>>> restarts the fallen pod, restarted jobmanager without HA metadata
>>> starts the job itself from an empty state.
>>>
>>> I'm looking for a way to prevent it from exiting in case of an job error
>>> (we use application mode cluster).
>>>
>>>
>>>
>>> --
>>> *От:* Gyula Fóra 
>>> *Отправлено:* 20 сентября 2022 г. 19:49:37
>>> *Кому:* Evgeniy Lyutikov
>>> *Копия:* user@flink.apache.org
>>> *Тема:* Re: JobManager restarts on job failure
>>>
>>> The best thing for you to do would be to upgrade to Flink 1.15 and the
>>> latest operator version.
>>> In Flink 1.15 we have the option to interact with the Flink jobmanager
>>> even after the job FAILED and the operator leverages this for a much more
>>> robust behaviour.
>>>
>>> In any case the operator should not ever start the job from an empty
>>> state (even if it FAILED), if you think that's happening could you please
>>> open a JIRA ticket with the accompanying JM and Operator logs?
>>>
>>> Thanks
>>> Gyula
>>>
>>> On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov 
>>> wrote:
>>>
>>>> Hi,
>>>> We using flink 1.14.4 with flink kubernetes operator.
>>>>
>>>> Sometimes when updating a job, it fails on startup and flink removes
>>>> all HA metadata and exits the jobmanager.
>>>>
>>>>
>>>> 2022-09-14 14:54:44,534 INFO
>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring
>>>> job  from Checkpoint 30829 @ 1663167158684
>>>> for  located at
>>>> s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829.
>>>> 2022-09-14 14:54:44,638 INFO
>>>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
>>>>  reached terminal state FAILED.
>>>> org.apache.flink.runtime.client.JobInitializationException: Could not
>>>> start the Jo

Re: JobManager restarts on job failure

2022-09-20 Thread Gyula Fóra
I see I think we have seen this issue with others before, in Flink 1.15 it
is solved by the newly introduced JobResultStore. The operator also
configures that automatically for 1.15 to avoid this.

Gyula

On Tue, Sep 20, 2022 at 3:27 PM Evgeniy Lyutikov 
wrote:

> Thanks for the answer.
> I think this is not about the operator issue, kubernetes deployment just
> restarts the fallen pod, restarted jobmanager without HA metadata starts
> the job itself from an empty state.
>
> I'm looking for a way to prevent it from exiting in case of an job error
> (we use application mode cluster).
>
>
>
> ------
> *От:* Gyula Fóra 
> *Отправлено:* 20 сентября 2022 г. 19:49:37
> *Кому:* Evgeniy Lyutikov
> *Копия:* user@flink.apache.org
> *Тема:* Re: JobManager restarts on job failure
>
> The best thing for you to do would be to upgrade to Flink 1.15 and the
> latest operator version.
> In Flink 1.15 we have the option to interact with the Flink jobmanager
> even after the job FAILED and the operator leverages this for a much more
> robust behaviour.
>
> In any case the operator should not ever start the job from an empty state
> (even if it FAILED), if you think that's happening could you please open a
> JIRA ticket with the accompanying JM and Operator logs?
>
> Thanks
> Gyula
>
> On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov 
> wrote:
>
>> Hi,
>> We using flink 1.14.4 with flink kubernetes operator.
>>
>> Sometimes when updating a job, it fails on startup and flink removes all
>> HA metadata and exits the jobmanager.
>>
>>
>> 2022-09-14 14:54:44,534 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring
>> job  from Checkpoint 30829 @ 1663167158684
>> for  located at
>> s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829.
>> 2022-09-14 14:54:44,638 INFO
>> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
>>  reached terminal state FAILED.
>> org.apache.flink.runtime.client.JobInitializationException: Could not
>> start the JobMaster.
>> Caused by: java.util.concurrent.CompletionException:
>> java.lang.IllegalStateException: There is no operator for the state
>> 4e1d9dde287c33a35e7970cbe64a40fe
>> 2022-09-14 14:54:44,930 ERROR
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
>> error occurred in the cluster entrypoint.
>> 2022-09-14 14:54:45,020 INFO
>> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
>> Clean up the high availability data for job
>> .
>> 2022-09-14 14:54:45,020 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting
>> KubernetesApplicationClusterEntrypoint down with application status
>> UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
>> 2022-09-14 14:54:45,026 INFO
>> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
>> down rest endpoint.
>> 2022-09-14 14:54:46,122 INFO
>> akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting
>> down remote daemon.
>> 2022-09-14 14:54:46,321 INFO
>> akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting
>> shut down.
>>
>> Kubernetes restarts the pod jobmanager and the new instance, not finding
>> the HA metadata, starts the job from an empty state.
>> Is there some option to prevent jobmanager from exiting after an job FAILED
>> state?
>>
>>
>> * -- *“This message contains confidential
>> information/commercial secret. If you are not the intended addressee of
>> this message you may not copy, save, print or forward it to any third party
>> and you are kindly requested to destroy this message and notify the sender
>> thereof by email.
>> Данное сообщение содержит конфиденциальную информацию/информацию,
>> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
>> данного сообщения, Вы не вправе копировать, сохранять, печатать или
>> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
>> уведомить об этом отправителя электронным письмом.”
>>
>


Re: JobManager restarts on job failure

2022-09-20 Thread Gyula Fóra
The best thing for you to do would be to upgrade to Flink 1.15 and the
latest operator version.
In Flink 1.15 we have the option to interact with the Flink jobmanager even
after the job FAILED and the operator leverages this for a much more robust
behaviour.

In any case the operator should not ever start the job from an empty state
(even if it FAILED), if you think that's happening could you please open a
JIRA ticket with the accompanying JM and Operator logs?

Thanks
Gyula

On Tue, Sep 20, 2022 at 1:00 PM Evgeniy Lyutikov 
wrote:

> Hi,
> We using flink 1.14.4 with flink kubernetes operator.
>
> Sometimes when updating a job, it fails on startup and flink removes all
> HA metadata and exits the jobmanager.
>
>
> 2022-09-14 14:54:44,534 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Restoring
> job  from Checkpoint 30829 @ 1663167158684
> for  located at
> s3p://flink-checkpoints/k8s-checkpoint-job-name//chk-30829.
> 2022-09-14 14:54:44,638 INFO
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job
>  reached terminal state FAILED.
> org.apache.flink.runtime.client.JobInitializationException: Could not
> start the JobMaster.
> Caused by: java.util.concurrent.CompletionException:
> java.lang.IllegalStateException: There is no operator for the state
> 4e1d9dde287c33a35e7970cbe64a40fe
> 2022-09-14 14:54:44,930 ERROR
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Fatal
> error occurred in the cluster entrypoint.
> 2022-09-14 14:54:45,020 INFO
> org.apache.flink.kubernetes.highavailability.KubernetesHaServices [] -
> Clean up the high availability data for job
> .
> 2022-09-14 14:54:45,020 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] - Shutting
> KubernetesApplicationClusterEntrypoint down with application status
> UNKNOWN. Diagnostics Cluster entrypoint has been closed externally..
> 2022-09-14 14:54:45,026 INFO
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting
> down rest endpoint.
> 2022-09-14 14:54:46,122 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Shutting
> down remote daemon.
> 2022-09-14 14:54:46,321 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator[] - Remoting
> shut down.
>
> Kubernetes restarts the pod jobmanager and the new instance, not finding
> the HA metadata, starts the job from an empty state.
> Is there some option to prevent jobmanager from exiting after an job FAILED
> state?
>
>
> * -- *“This message contains confidential
> information/commercial secret. If you are not the intended addressee of
> this message you may not copy, save, print or forward it to any third party
> and you are kindly requested to destroy this message and notify the sender
> thereof by email.
> Данное сообщение содержит конфиденциальную информацию/информацию,
> являющуюся коммерческой тайной. Если Вы не являетесь надлежащим адресатом
> данного сообщения, Вы не вправе копировать, сохранять, печатать или
> пересылать его каким либо иным лицам. Просьба уничтожить данное сообщение и
> уведомить об этом отправителя электронным письмом.”
>


Re: Deploying Jobmanager on k8s as a Deployment

2022-09-05 Thread Gyula Fóra
Hi!

The operator supports both Flink native and standalone deployment modes and
in both cases the JM is deployed as k8s Deployment.

During upgrade Flink/operator deletes the deployment after savepoint and
waits for termination before it creates a new one with the updated spec.

Cheers,
Gyula

On Mon, 5 Sep 2022 at 07:41, Austin Cawley-Edwards 
wrote:

> Hey Marco,
>
> Unfortunately there is no built in k8s API that models an application mode
> JM exactly but Deployments should be fine, in general. As Gyula notes,
> where they can be difficult is during application upgrades as Deployments
> never let their pods exit, even if successful, so there is no way to stop
> the cluster gracefully.
>
> Is stopping your application with a savepoint and redeploying a workable
> solution for image upgrades? In this way a Job could still be used.
>
>
> @Gyula, how are JMs handled in the operator? Job, Deployment, or something
> custom?
>
>
> Best,
> Austin
>
>
>
> On Mon, Sep 5, 2022 at 6:15 AM Gyula Fóra  wrote:
>
>> You can use deployments of course , the operator and native k8s
>> integration does exactly that.
>>
>> Even then job updates can be tricky so I believe you are much better off
>> with the operator.
>>
>> Gyula
>>
>> On Sun, 4 Sep 2022 at 11:11, marco andreas 
>> wrote:
>>
>>> Hello,
>>>
>>> Thanks for the response, I will take a look at it.
>>>
>>> But if we aren't able to use the flink operator due to technical
>>> constraints is it possible to deploy the JM as deployment without any
>>> consequences that I am not aware of?
>>>
>>> Sincerely,
>>>
>>> Le sam. 3 sept. 2022 à 23:27, Gyula Fóra  a
>>> écrit :
>>>
>>>> Hi!
>>>> You should check out the Flink Kubernetes Operator. I think that covers
>>>> all your needs .
>>>>
>>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
>>>>
>>>> Cheers,
>>>> Gyula
>>>>
>>>> On Sat, 3 Sep 2022 at 13:45, marco andreas 
>>>> wrote:
>>>>
>>>>>
>>>>> We are deploying a flink application cluster on k8S. Following the
>>>>> official documentation the JM is deployed As a job resource , however we
>>>>> are deploying a long running flink job that is not supposed to be
>>>>> terminated and also we need to update the image of the flink job.
>>>>>
>>>>>  The problem is that the job is an immutable resource, we cant update
>>>>> it.
>>>>>
>>>>> So I'm wondering if it's possible to use a deployment resource for the
>>>>> jobmanager and if there will be any side effects or repercussions.
>>>>>
>>>>> Thanks,
>>>>>
>>>>


Re: Deploying Jobmanager on k8s as a Deployment

2022-09-05 Thread Gyula Fóra
You can use deployments of course , the operator and native k8s integration
does exactly that.

Even then job updates can be tricky so I believe you are much better off
with the operator.

Gyula

On Sun, 4 Sep 2022 at 11:11, marco andreas 
wrote:

> Hello,
>
> Thanks for the response, I will take a look at it.
>
> But if we aren't able to use the flink operator due to technical
> constraints is it possible to deploy the JM as deployment without any
> consequences that I am not aware of?
>
> Sincerely,
>
> Le sam. 3 sept. 2022 à 23:27, Gyula Fóra  a écrit :
>
>> Hi!
>> You should check out the Flink Kubernetes Operator. I think that covers
>> all your needs .
>>
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
>>
>> Cheers,
>> Gyula
>>
>> On Sat, 3 Sep 2022 at 13:45, marco andreas 
>> wrote:
>>
>>>
>>> We are deploying a flink application cluster on k8S. Following the
>>> official documentation the JM is deployed As a job resource , however we
>>> are deploying a long running flink job that is not supposed to be
>>> terminated and also we need to update the image of the flink job.
>>>
>>>  The problem is that the job is an immutable resource, we cant update it.
>>>
>>> So I'm wondering if it's possible to use a deployment resource for the
>>> jobmanager and if there will be any side effects or repercussions.
>>>
>>> Thanks,
>>>
>>


Re: Deploying Jobmanager on k8s as a Deployment

2022-09-03 Thread Gyula Fóra
Hi!
You should check out the Flink Kubernetes Operator. I think that covers all
your needs .

https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/

Cheers,
Gyula

On Sat, 3 Sep 2022 at 13:45, marco andreas 
wrote:

>
> We are deploying a flink application cluster on k8S. Following the
> official documentation the JM is deployed As a job resource , however we
> are deploying a long running flink job that is not supposed to be
> terminated and also we need to update the image of the flink job.
>
>  The problem is that the job is an immutable resource, we cant update it.
>
> So I'm wondering if it's possible to use a deployment resource for the
> jobmanager and if there will be any side effects or repercussions.
>
> Thanks,
>


Re: start flink-operator using "watchNamespaces"

2022-08-23 Thread Gyula Fóra
Hi!

>From your logs I see:

Configuring operator to watch the following namespaces: [JOSDK_ALL_NAMESPACES]
This means that the operator is not configured correctly for the
watched namespaces. After you change the helm values you might have to
reinstall the operator.

Cheers,
Gyula


On Mon, Aug 22, 2022 at 5:16 PM Sigalit Eliazov  wrote:

> Hi,
> we are trying to start a flink operator with a specific namespace.
> flink-operator version: 1.1.0
>
> we set the  following parameter : watchNamespaces: ["my-namespace"]
>
> The operator fails during installation with the following error
>
> *Caused by: io.fabric8.kubernetes.client.KubernetesClientException: *
>
> *Failure executing: GET at: 
> https://10.96.0.1/apis/flink.apache.org/v1beta1/flinkdeployments 
> . Message: 
> Forbidden!Configured service account doesn't have access. *
>
> *Service account may have been revoked. flinkdeployments.flink.apache.org 
>  is forbidden: User 
> "system:serviceaccount:my-namespace:flink-operator" *
>
> *cannot list resource "flinkdeployments" in API group "flink.apache.org 
> " at the cluster scope: RBAC: 
> clusterrole.rbac.authorization.k8s.io 
>  "psp:privileged" not found.*
>
>
> we would expect that the list operation will take into consideration the
> namespace as well.
> Attached log file
> Is there any other configuration required?
>
> Thanks
> Sigalit
>


Re: Savepoint problen on KubernetesOperator HA cluster

2022-08-11 Thread Gyula Fóra
In general the Flink JobManager HA /client mechanism ensures that the rest
requests end up at the current leader.

In your case it's not clear what the actual cause of the issue was.

What I would do is to upgrade to the latest operator version (1.1.0) where
the savepoint upgrade mechanism has been hardened.
If your cluster is already stopped with the savepoint but the operator did
not get the response back you might have to perform the steps outlined in:
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/job-management/#manual-recovery

Savepoint upgrades work significantly more robustly with Flink 1.15+
because there we have the ability to keep the cluster/rest api around even
after the application was stopped. In Flink 1.14 and before after stopping
the job the cluster disappears, making it difficult to handle all
situations.

Side note: I think in most cases you should not need more job manager
replicas than 1. You still have the same HA guarantees with 1 replica, if
it goes down it will be restarted. The behaviour is generally the same.

Cheers,
Gyula

On Thu, Aug 11, 2022 at 11:15 AM Evgeniy Lyutikov 
wrote:

> Hi,
>
> I'm using flink 1.14.4 with flink kubernetes operator 1.0.1 with ha
> configuration on 3 jobmanager.
>
> When trying to change the job configuration, it restarts with trigger
> savepoint and an error occurs each time:
>
>
> 2022-08-10 12:04:21,142 mo.a.f.k.o.c.FlinkDeploymentController [INFO
> ][job-namespace/job-namespace] Starting reconciliation
> 2022-08-10 12:04:21,143 mo.a.f.k.o.o.JobStatusObserver  [INFO
> ][job-namespace/job-namespace] Observing job status
> 2022-08-10 12:04:21,154 mo.a.f.k.o.o.JobStatusObserver  [INFO
> ][job-namespace/job-namespace] Job status (RUNNING) unchanged
> 2022-08-10 12:04:21,155 mo.a.f.k.o.c.FlinkConfigManager [INFO
> ][job-namespace/job-namespace] Generating new config
> 2022-08-10 12:04:21,157 mo.a.f.k.o.r.d.ApplicationReconciler [INFO
> ][job-namespace/job-namespace] Upgrading/Restarting running job, suspending
> first...
> 2022-08-10 12:04:21,157 mo.a.f.k.o.r.d.ApplicationReconciler [INFO
> ][job-namespace/job-namespace] Job is in running state, ready for upgrade
> with SAVEPOINT
> 2022-08-10 12:04:21,157 mo.a.f.k.o.s.FlinkService   [INFO
> ][job-namespace/job-namespace] Suspending job with savepoint.
> 2022-08-10 12:04:21,171 mo.a.f.k.o.r.ReconciliationUtils[WARN
> ][job-namespace/job-namespace] Attempt count: 5, last attempt: true
> 2022-08-10 12:04:21,242 mi.j.o.p.e.ReconciliationDispatcherESC[m
> ESC[1;31m[ERROR][job-namespace/job-namespace] Error during event processing
> ExecutionScope{ resource id: CustomResourceID{name='job-namespace',
> namespace='job-namespace'}, version: null} failed.
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.NotFoundException: Operation not found under
> key:
> org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@b41b16a8
>
> After 5 retries
>
> 2022-08-10 12:04:21,157 o.a.f.k.o.r.d.ApplicationReconciler [INFO
> ][job-namespace/job-namespace] Job is in running state, ready for upgrade
> with SAVEPOINT
> 2022-08-10 12:04:21,157 o.a.f.k.o.s.FlinkService   [INFO
> ][job-namespace/job-namespace] Suspending job with savepoint.
> 2022-08-10 12:04:21,171 o.a.f.k.o.r.ReconciliationUtils [WARN
> ][job-namespace/job-namespace] Attempt count: 5, last attempt: true
> 2022-08-10 12:04:21,242 i.j.o.p.e.ReconciliationDispatcher
> [ERROR][job-namespace/job-namespace] Error during event processing
> ExecutionScope{ resource id: CustomResourceID{name='job-namespace',
> namespace='job-namespace'}, version: null} failed.
> org.apache.flink.kubernetes.operator.exception.ReconciliationException:
> java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.rest.util.RestClientException:
> [org.apache.flink.runtime.rest.NotFoundException: Operation not found under
> key:
> org.apache.flink.runtime.rest.handler.job.AsynchronousJobOperationKey@b41b16a8
> 2022-08-10 12:04:21,243 i.j.o.p.e.EventProcessor
>  [ERROR][job-namespace/job-namespace] Exhausted retries for ExecutionScope{
> resource id: CustomResourceID{name='job-namespace',
> namespace='job-namespace'}, version: null}
> 2022-08-10 12:04:53,299 o.a.f.k.o.c.FlinkDeploymentController [INFO
> ][job-namespace/job-namespace] Starting reconciliation
> 2022-08-10 12:04:53,299 o.a.f.k.o.o.JobStatusObserver  [INFO
> ][job-namespace/job-namespace] Observing job status
> 2022-08-10 12:05:03,322 o.a.f.s.n.i.n.c.AbstractChannel [WARN ]
> Force-closing a channel whose registration task was not accepted by an
> event loop: [id: 0x4fb8bb3b]
> java.util.concurrent.RejectedExecutionException: event executor terminated
> 2022-08-10 12:05:03,323 o.a.f.s.n.i.n.u.c.D.rejectedExecution [ERROR]
> Failed to submit a listener notification task. Event loop shut down?
> 

Re: Flink Operator - delete flinkdeployments

2022-08-03 Thread Gyula Fóra
Hi!

We haven't experienced this problem in general (and other users also
haven't reported).

Could you please share your operator logs when you triggered the deletion
that takes long?
Long job cancellation times can lead to delayed deletion times.

Cheers,
Gyula

On Wed, Aug 3, 2022 at 11:07 AM Sigalit Eliazov  wrote:

> hello
> we upgraded to version 1.1.0 and i am afraid the problem exists in that
> version as well.
>  I would appreciate any additional ideas or guidelines on how to do the
> cleanup correctly.
>
> thanks
> Sigalit
>
>
> On Tue, Aug 2, 2022 at 3:39 PM Sigalit Eliazov 
> wrote:
>
>> Will do, thanks!
>>
>> On Tue, Aug 2, 2022 at 3:39 PM Gyula Fóra  wrote:
>>
>>> Before trying to solve any already fixed problems please upgrade to
>>> 1.1.0 :)
>>>
>>>
>>>
>>> On Tue, Aug 2, 2022 at 2:33 PM Sigalit Eliazov 
>>> wrote:
>>>
>>>> we are working with 1.0.0
>>>>
>>>> On Tue, Aug 2, 2022 at 3:24 PM Gyula Fóra  wrote:
>>>>
>>>>> Are you running the latest 1.1.0 version of the operator?
>>>>>
>>>>> Gyula
>>>>>
>>>>> On Tue, Aug 2, 2022 at 2:18 PM Sigalit Eliazov 
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We are deploying a few flink clusters via the flink operator in our
>>>>>> CI.
>>>>>>
>>>>>> In each run we first do a clean-up where one of the first steps is to
>>>>>>  run 'kubectl delete flinkdeployments --all -n '
>>>>>> after that we also delete the flink operator pod and our all
>>>>>> namespace.
>>>>>>
>>>>>> Lately we face issues where the deletion of the crd takes a lot of
>>>>>> time and sometimes it just gets stuck and we need to manually modify
>>>>>> finalizers so they will be deleted.
>>>>>>
>>>>>> Anyone faced this issue?
>>>>>> Any suggestions on how to overcome it?
>>>>>>
>>>>>> Thanks
>>>>>> Sigalit
>>>>>>
>>>>>


Re: Flink Operator - delete flinkdeployments

2022-08-02 Thread Gyula Fóra
Before trying to solve any already fixed problems please upgrade to 1.1.0
:)



On Tue, Aug 2, 2022 at 2:33 PM Sigalit Eliazov  wrote:

> we are working with 1.0.0
>
> On Tue, Aug 2, 2022 at 3:24 PM Gyula Fóra  wrote:
>
>> Are you running the latest 1.1.0 version of the operator?
>>
>> Gyula
>>
>> On Tue, Aug 2, 2022 at 2:18 PM Sigalit Eliazov 
>> wrote:
>>
>>> Hi,
>>>
>>> We are deploying a few flink clusters via the flink operator in our CI.
>>>
>>> In each run we first do a clean-up where one of the first steps is to
>>>  run 'kubectl delete flinkdeployments --all -n '
>>> after that we also delete the flink operator pod and our all namespace.
>>>
>>> Lately we face issues where the deletion of the crd takes a lot of time
>>> and sometimes it just gets stuck and we need to manually modify finalizers
>>> so they will be deleted.
>>>
>>> Anyone faced this issue?
>>> Any suggestions on how to overcome it?
>>>
>>> Thanks
>>> Sigalit
>>>
>>


Re: Flink Operator - delete flinkdeployments

2022-08-02 Thread Gyula Fóra
Are you running the latest 1.1.0 version of the operator?

Gyula

On Tue, Aug 2, 2022 at 2:18 PM Sigalit Eliazov  wrote:

> Hi,
>
> We are deploying a few flink clusters via the flink operator in our CI.
>
> In each run we first do a clean-up where one of the first steps is to
>  run 'kubectl delete flinkdeployments --all -n '
> after that we also delete the flink operator pod and our all namespace.
>
> Lately we face issues where the deletion of the crd takes a lot of time
> and sometimes it just gets stuck and we need to manually modify finalizers
> so they will be deleted.
>
> Anyone faced this issue?
> Any suggestions on how to overcome it?
>
> Thanks
> Sigalit
>


Re: Resources configuration on Kubernetes Session Cluster

2022-08-02 Thread Gyula Fóra
Hi Vladislav!

I am afraid there is no way to specify resources independently for jobs
within a session cluster currently in Flink.

For this I suggest using the Application Mode instead where each job can
have its own resources.

In any case you should check out the Flink Kubernetes Operator -
https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
It allows you to manage session clusters, session jobs and application
deployments very conveniently :)

Cheers,
Gyula

On Tue, Aug 2, 2022 at 11:17 AM Vladislav Keda <
vladislav.k...@glowbyteconsulting.com> wrote:

> Hi,
>
> I'm trying to specify different TaskManager resources for different Flink
> jobs on Kubernetes Session Cluster. Can you help me to find a way to do
> that?
>
> I use this options, but Flink picks them up only when I start new
> Kubernetes Session deployment:
> Memory:
> jobmanager.memory.process.size
> 
> , taskmanager.memory.process.size
> 
> CPU:
> kubernetes.jobmanager.cpu
> 
> , kubernetes.taskmanager.cpu
> 
>
> *FYI*
> I deploy Flink jobs on cluster and set up specific configuration
> parameters for jobs using
> *org.apache.flink.client.program.rest.RestClusterClient*
>
> Flink version - 1.13.6.
>
> ---
>
> Best Regards,
> Vladislav Keda
>


[ANNOUNCE] Apache Flink Kubernetes Operator 1.1.0 released

2022-07-24 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.1.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/07/25/release-kubernetes-operator-1.1.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for the Flink Kubernetes Operator can be found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351723

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


[ANNOUNCE] Apache Flink Kubernetes Operator 1.1.0 released

2022-07-24 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.1.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/07/25/release-kubernetes-operator-1.1.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Maven artifacts for Flink Kubernetes Operator can be found at:
https://search.maven.org/artifact/org.apache.flink/flink-kubernetes-operator

Official Docker image for the Flink Kubernetes Operator can be found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351723

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


Re: Guide for building Flink image with Python doesn't work

2022-07-08 Thread Gyula Fóra
Thanks for the detailed info Xingbo, I am pretty new to the PyFlink module
so was mainly trying to experiment a little :)

I think supporting it in Flink 1.16 is more than enough, it would probably
be an overkill to try to backport this. The M1 issue only really occurs in
local development/testing.

Gyula

On Fri, Jul 8, 2022 at 5:51 AM Xingbo Huang  wrote:

> Hi Gyula,
>
> According to the log, we can see that you downloaded the source package of
> pemja, not the wheel package of pemja[1]. I guess you are using the m1
> machine. If you install pemja from the source package, you need to have
> JDK, gcc tools and CPython with Numpy in the environment. I believe this
> can be solved after you prepared those tools, but other dependencies of
> pyflink 1.15 do not support m1, which makes PyFlink 1.15 unable to install
> and use in m1.
>
> We have supported m1 in release 1.16[2]. If a large number of m1 users
> have big demand for PyFlink 1.15, I think we need to consider whether it is
> necessary to backport this support to 1.15, which will break our
> compatibility issues between minor versions.
> Best,
> Xingbo
>
> [1] https://pypi.org/project/pemja/0.1.4/
> [2] https://issues.apache.org/jira/browse/FLINK-25188
>
> Gyula Fóra  于2022年7月6日周三 13:56写道:
>
>> Here it is, copied from the docs essentially:
>>
>> FROM flink:1.15.0
>>
>>
>> # install python3: it has updated Python to 3.9 in Debian 11 and so
>> install Python 3.7 from source
>> # it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink
>> officially.
>>
>> RUN apt-get update -y && \
>> apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
>> libffi-dev git && \
>> wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
>> tar -xvf Python-3.7.9.tgz && \
>> cd Python-3.7.9 && \
>> ./configure --without-tests --enable-shared && \
>> make -j6 && \
>> make install && \
>> ldconfig /usr/local/lib && \
>> cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
>> ln -s /usr/local/bin/python3 /usr/local/bin/python && \
>> apt-get clean && \
>> rm -rf /var/lib/apt/lists/*
>>
>> # install PyFlink
>> RUN pip3 install apache-flink==1.15.0
>>
>> And I am running:
>> docker build --tag pyflink:latest .
>>
>> This gives the following error:
>>
>>
>> *#6 64.12  cwd: /tmp/pip-install-9_farey_/pemja/#6 64.12
>> Complete output (1 lines):#6 64.12 Include folder should be at
>> '/usr/local/openjdk-11/include' but doesn't exist. Please check you've
>> installed the JDK properly.*
>>
>> A side note:
>> The Dockerfile in the docs is missing git so initially I got the
>> following error:
>>
>> *#7 57.73 raise OSError("%r was not found" % name)#7 57.73
>> OSError: 'git' was not found *
>>
>> @Weihua Hu  can you please send your working
>> Dockerfile?
>>
>> Gyula
>>
>> On Wed, Jul 6, 2022 at 4:16 AM Weihua Hu  wrote:
>>
>>> Hi Gyula,
>>>
>>> I can build pyFlink image successfully by following this guide. Did you
>>> add a dependency outside of the documentation? And could you provide your
>>> Dockerfile
>>>
>>>
>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
>>>
>>> Best,
>>> Weihua
>>>
>>>
>>> On Tue, Jul 5, 2022 at 11:40 PM Gyula Fóra  wrote:
>>>
>>>> Well in any case either the official image is incorrect (maybe we
>>>> should include JDK by default not JRE) or we should update the
>>>> documentation regarding the python docker build because it simply doesn't
>>>> work at the moment.
>>>>
>>>> I am still looking for a full working example that adds the required
>>>> Python packages on top of a Flink 1.15.0 base image :)
>>>>
>>>> Gyula
>>>>
>>>> On Tue, Jul 5, 2022 at 5:36 PM Weihua Hu 
>>>> wrote:
>>>>
>>>>> In addition, you can try providing the Dockerfile
>>>>>
>>>>> Best,
>>>>> Weihua
>>>>>
>>>>>
>>>>> On Tue, Jul 5, 2022 at 11:24 PM Weihua Hu 
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> The base image flink:1.15.0 is built from openjdk:11-jre, and this

Re: Difference between Session Mode and Session Job(Flink Opearator)

2022-07-07 Thread Gyula Fóra
Hi!

The Flink Kubernetes Operator on a high level supports 3 types of resources:

   1. Session Deployment : Empty Flink Session cluster
   2. Application Deployment: Flink Application cluster (single job /
   cluster)
   3. Session Job: Flink Job deployed to an existing Session Deployment.

So in other words, the Session deployment only creates the Flink cluster.
The Session job can be deployed to an existing session deployment and it
represents an actual Flink job.

I hope that helps :)
Gyula

On Thu, Jul 7, 2022 at 7:42 AM bat man  wrote:

> Hi,
>
> I want to understand the difference between session mode and the new
> deployment mode - Flink Session Job which I believe is newly introduced as
> part of the Flink Operator(1.15) release.
> What's the benefit of using this mode as opposed to session mode as both
> run sessions to which flink jobs can be submitted.
>
> Cheers.
> H.
>


Re: Guide for building Flink image with Python doesn't work

2022-07-05 Thread Gyula Fóra
Here it is, copied from the docs essentially:

FROM flink:1.15.0


# install python3: it has updated Python to 3.9 in Debian 11 and so install
Python 3.7 from source
# it currently only supports Python 3.6, 3.7 and 3.8 in PyFlink officially.

RUN apt-get update -y && \
apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
libffi-dev git && \
wget https://www.python.org/ftp/python/3.7.9/Python-3.7.9.tgz && \
tar -xvf Python-3.7.9.tgz && \
cd Python-3.7.9 && \
./configure --without-tests --enable-shared && \
make -j6 && \
make install && \
ldconfig /usr/local/lib && \
cd .. && rm -f Python-3.7.9.tgz && rm -rf Python-3.7.9 && \
ln -s /usr/local/bin/python3 /usr/local/bin/python && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# install PyFlink
RUN pip3 install apache-flink==1.15.0

And I am running:
docker build --tag pyflink:latest .

This gives the following error:


*#6 64.12  cwd: /tmp/pip-install-9_farey_/pemja/#6 64.12
Complete output (1 lines):#6 64.12 Include folder should be at
'/usr/local/openjdk-11/include' but doesn't exist. Please check you've
installed the JDK properly.*

A side note:
The Dockerfile in the docs is missing git so initially I got the following
error:

*#7 57.73 raise OSError("%r was not found" % name)#7 57.73
OSError: 'git' was not found *

@Weihua Hu  can you please send your working
Dockerfile?

Gyula

On Wed, Jul 6, 2022 at 4:16 AM Weihua Hu  wrote:

> Hi Gyula,
>
> I can build pyFlink image successfully by following this guide. Did you
> add a dependency outside of the documentation? And could you provide your
> Dockerfile
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
>
> Best,
> Weihua
>
>
> On Tue, Jul 5, 2022 at 11:40 PM Gyula Fóra  wrote:
>
>> Well in any case either the official image is incorrect (maybe we should
>> include JDK by default not JRE) or we should update the
>> documentation regarding the python docker build because it simply doesn't
>> work at the moment.
>>
>> I am still looking for a full working example that adds the required
>> Python packages on top of a Flink 1.15.0 base image :)
>>
>> Gyula
>>
>> On Tue, Jul 5, 2022 at 5:36 PM Weihua Hu  wrote:
>>
>>> In addition, you can try providing the Dockerfile
>>>
>>> Best,
>>> Weihua
>>>
>>>
>>> On Tue, Jul 5, 2022 at 11:24 PM Weihua Hu 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> The base image flink:1.15.0 is built from openjdk:11-jre, and this
>>>> image only installs jre but not jdk.
>>>> It looks like the package you want to install (pemja) depends on jdk.
>>>> you need install openjdk-11-jdk in dockerfile,
>>>> take a look to how it is installed in the official image:
>>>>
>>>>
>>>> https://hub.docker.com/layers/openjdk/library/openjdk/11-jdk/images/sha256-bc0af19c7c4f492fe6ff0c1d1c8c0e5dd90ab801385b220347bb91dbe2b4094f?context=explore
>>>>
>>>>
>>>> Best,
>>>> Weihua
>>>>
>>>>
>>>> On Tue, Jul 5, 2022 at 3:50 PM Gyula Fóra  wrote:
>>>>
>>>>> Hi All!
>>>>>
>>>>> I have been trying to experiment with the Flink python support on
>>>>> Kubernetes but I got stuck creating a custom image with all the necessary
>>>>> python libraries.
>>>>>
>>>>> I found this guide in the docs:
>>>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
>>>>>
>>>>> However when I try to build a custom image using it, I get the
>>>>> following error:
>>>>>
>>>>> #7 131.7 Collecting pemja==0.1.4
>>>>> #7 131.8   Downloading pemja-0.1.4.tar.gz (32 kB)
>>>>> #7 131.9 ERROR: Command errored out with exit status 255:
>>>>> #7 131.9  command: /usr/local/bin/python3.7 -c 'import sys,
>>>>> setuptools, tokenize; sys.argv[0] =
>>>>> '"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"';
>>>>> __file__='"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"';f=getattr(tokenize,
>>>>> '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"',
>>>>> '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))'
>>>>> egg_info --egg-base /tmp/pip-pip-egg-info-47iid6nm
>>>>> #7 131.9  cwd: /tmp/pip-install-y6o6djs1/pemja/
>>>>> #7 131.9 Complete output (1 lines):
>>>>> #7 131.9 Include folder should be at
>>>>> '/usr/local/openjdk-11/include' but doesn't exist. Please check you've
>>>>> installed the JDK properly.
>>>>>
>>>>> Has anyone seen this before and knows the solution?
>>>>>
>>>>> Thanks
>>>>> Gyula
>>>>>
>>>>


Re: Best practice for creating/restoring savepoint in standalone k8 setup

2022-07-05 Thread Gyula Fóra
Hi Jonas!

I think generally managed platforms used to provide the functionality that
you are after. Otherwise it's mostly home grown CI/CD integrations :)

The Kubernetes Operator is maybe the first initiative to bring proper
application lifecycle management to the ecosystem directly.

Cheers,
Gyula

On Tue, Jul 5, 2022 at 6:45 PM jonas eyob  wrote:

> Thanks Weihua and Gyula,
>
> @Weihia
> > If you restart flink cluster by delete/create deployment directly, it
> will be automatically restored from the latest checkpoint[1], so maybe just
> enabling the checkpoint is enough.
> Not sure I follow, we might have changes to the job that will require us
> to restore from a savepoint, where checkpoints wouldn't be possible due to
> significant changes to the JobGraph.
>
> > But if you want to use savepoint, you need to check whether the latest
> savepoint is successful (check whether have _metadata in savepoint dir is
> useful in most scenarios, but in some cases the _metadata may not be
> completed).
>
> Yes that is basically what our savepoint restore script does, it checks S3
> to see if we have any savepoints generated and will specify that to the
> "--fromSavePoint" argument.
>
> @Gyula
>
> >Did you check the https://github.com/apache/flink-kubernetes-operator
> <https://github.com/apache/flink-kubernetes-operator> by any chance?
> Interesting, no I have missed this! will have a look but it would also be
> interesting to see how this have been solved before the introduction of the
> Flink operator
>
> Den tis 5 juli 2022 kl 16:37 skrev Gyula Fóra :
>
>> Hi!
>>
>> Did you check the https://github.com/apache/flink-kubernetes-operator
>> <https://github.com/apache/flink-kubernetes-operator> by any chance?
>>
>> It provides many of the application lifecycle features that you are
>> probably after straight out-of-the-box. It has both manual and periodic
>> savepoint triggering also included in the latest upcoming version :)
>>
>> Cheers,
>> Gyula
>>
>> On Tue, Jul 5, 2022 at 5:34 PM Weihua Hu  wrote:
>>
>>> Hi, jonas
>>>
>>> If you restart flink cluster by delete/create deployment directly, it
>>> will be automatically restored from the latest checkpoint[1], so maybe just
>>> enabling the checkpoint is enough.
>>> But if you want to use savepoint, you need to check whether the latest
>>> savepoint is successful (check whether have _metadata in savepoint dir is
>>> useful in most scenarios, but in some cases the _metadata may not be
>>> completed).
>>>
>>> [1]
>>> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/
>>>
>>> Best,
>>> Weihua
>>>
>>>
>>> On Tue, Jul 5, 2022 at 10:54 PM jonas eyob  wrote:
>>>
>>>> Hi!
>>>>
>>>> We are running a Standalone job on Kubernetes using application
>>>> deployment mode, with HA enabled.
>>>>
>>>> We have attempted to automate how we create and restore savepoints by
>>>> running a script for generating a savepoint (using k8 preStop hook) and
>>>> another one for restoring from a savepoint (located in a S3 bucket).
>>>>
>>>> Restoring from a savepoint is typically not a problem once we have a
>>>> savepoint generated and accessible in our s3 bucket. The problem is
>>>> generating the savepoint which hasn't been very reliable thus far. Logs are
>>>> not particularly helpful either so we wanted to rethink how we go about
>>>> taking savepoints.
>>>>
>>>> Are there any best practices for doing this in a CI/CD manner given our
>>>> setup?
>>>>
>>>> --
>>>>
>>>>
>
> --
> *Med Vänliga Hälsningar*
> *Jonas Eyob*
>


Re: Guide for building Flink image with Python doesn't work

2022-07-05 Thread Gyula Fóra
Well in any case either the official image is incorrect (maybe we should
include JDK by default not JRE) or we should update the
documentation regarding the python docker build because it simply doesn't
work at the moment.

I am still looking for a full working example that adds the required Python
packages on top of a Flink 1.15.0 base image :)

Gyula

On Tue, Jul 5, 2022 at 5:36 PM Weihua Hu  wrote:

> In addition, you can try providing the Dockerfile
>
> Best,
> Weihua
>
>
> On Tue, Jul 5, 2022 at 11:24 PM Weihua Hu  wrote:
>
>> Hi,
>>
>> The base image flink:1.15.0 is built from openjdk:11-jre, and this image
>> only installs jre but not jdk.
>> It looks like the package you want to install (pemja) depends on jdk. you
>> need install openjdk-11-jdk in dockerfile,
>> take a look to how it is installed in the official image:
>>
>>
>> https://hub.docker.com/layers/openjdk/library/openjdk/11-jdk/images/sha256-bc0af19c7c4f492fe6ff0c1d1c8c0e5dd90ab801385b220347bb91dbe2b4094f?context=explore
>>
>>
>> Best,
>> Weihua
>>
>>
>> On Tue, Jul 5, 2022 at 3:50 PM Gyula Fóra  wrote:
>>
>>> Hi All!
>>>
>>> I have been trying to experiment with the Flink python support on
>>> Kubernetes but I got stuck creating a custom image with all the necessary
>>> python libraries.
>>>
>>> I found this guide in the docs:
>>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker
>>>
>>> However when I try to build a custom image using it, I get the following
>>> error:
>>>
>>> #7 131.7 Collecting pemja==0.1.4
>>> #7 131.8   Downloading pemja-0.1.4.tar.gz (32 kB)
>>> #7 131.9 ERROR: Command errored out with exit status 255:
>>> #7 131.9  command: /usr/local/bin/python3.7 -c 'import sys,
>>> setuptools, tokenize; sys.argv[0] =
>>> '"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"';
>>> __file__='"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"';f=getattr(tokenize,
>>> '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"',
>>> '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))'
>>> egg_info --egg-base /tmp/pip-pip-egg-info-47iid6nm
>>> #7 131.9  cwd: /tmp/pip-install-y6o6djs1/pemja/
>>> #7 131.9 Complete output (1 lines):
>>> #7 131.9 Include folder should be at '/usr/local/openjdk-11/include'
>>> but doesn't exist. Please check you've installed the JDK properly.
>>>
>>> Has anyone seen this before and knows the solution?
>>>
>>> Thanks
>>> Gyula
>>>
>>


Re: Best practice for creating/restoring savepoint in standalone k8 setup

2022-07-05 Thread Gyula Fóra
Hi!

Did you check the https://github.com/apache/flink-kubernetes-operator
 by any chance?

It provides many of the application lifecycle features that you are
probably after straight out-of-the-box. It has both manual and periodic
savepoint triggering also included in the latest upcoming version :)

Cheers,
Gyula

On Tue, Jul 5, 2022 at 5:34 PM Weihua Hu  wrote:

> Hi, jonas
>
> If you restart flink cluster by delete/create deployment directly, it will
> be automatically restored from the latest checkpoint[1], so maybe just
> enabling the checkpoint is enough.
> But if you want to use savepoint, you need to check whether the latest
> savepoint is successful (check whether have _metadata in savepoint dir is
> useful in most scenarios, but in some cases the _metadata may not be
> completed).
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/
>
> Best,
> Weihua
>
>
> On Tue, Jul 5, 2022 at 10:54 PM jonas eyob  wrote:
>
>> Hi!
>>
>> We are running a Standalone job on Kubernetes using application
>> deployment mode, with HA enabled.
>>
>> We have attempted to automate how we create and restore savepoints by
>> running a script for generating a savepoint (using k8 preStop hook) and
>> another one for restoring from a savepoint (located in a S3 bucket).
>>
>> Restoring from a savepoint is typically not a problem once we have a
>> savepoint generated and accessible in our s3 bucket. The problem is
>> generating the savepoint which hasn't been very reliable thus far. Logs are
>> not particularly helpful either so we wanted to rethink how we go about
>> taking savepoints.
>>
>> Are there any best practices for doing this in a CI/CD manner given our
>> setup?
>>
>> --
>>
>>


Guide for building Flink image with Python doesn't work

2022-07-05 Thread Gyula Fóra
Hi All!

I have been trying to experiment with the Flink python support on
Kubernetes but I got stuck creating a custom image with all the necessary
python libraries.

I found this guide in the docs:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/standalone/docker/#using-flink-python-on-docker

However when I try to build a custom image using it, I get the following
error:

#7 131.7 Collecting pemja==0.1.4
#7 131.8   Downloading pemja-0.1.4.tar.gz (32 kB)
#7 131.9 ERROR: Command errored out with exit status 255:
#7 131.9  command: /usr/local/bin/python3.7 -c 'import sys, setuptools,
tokenize; sys.argv[0] = '"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"';
__file__='"'"'/tmp/pip-install-y6o6djs1/pemja/setup.py'"'"';f=getattr(tokenize,
'"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"',
'"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))'
egg_info --egg-base /tmp/pip-pip-egg-info-47iid6nm
#7 131.9  cwd: /tmp/pip-install-y6o6djs1/pemja/
#7 131.9 Complete output (1 lines):
#7 131.9 Include folder should be at '/usr/local/openjdk-11/include'
but doesn't exist. Please check you've installed the JDK properly.

Has anyone seen this before and knows the solution?

Thanks
Gyula


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.0.1 released

2022-06-28 Thread Gyula Fóra
Sorry for the incorrect blogpost link, please ignore that. There is no
blogpbost for 1.0.1 :)

Gyula

On Tue, Jun 28, 2022 at 9:43 AM Yang Wang  wrote:

> Thanks Gyula for working on the first patch release for the Flink
> Kubernetes Operator project.
>
>
> Best,
> Yang
>
>
>
> Gyula Fóra  于2022年6月28日周二 00:22写道:
>
>> The Apache Flink community is very happy to announce the release of Apache
>> Flink Kubernetes Operator 1.0.1.
>>
>> The Flink Kubernetes Operator allows users to manage their Apache Flink
>> applications and their lifecycle through native k8s tooling like kubectl.
>> <
>> https://flink.apache.org/news/2022/04/03/release-kubernetes-operator-0.1.0.html
>> >
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Official Docker image for Flink Kubernetes Operator applications can be
>> found at:
>> https://hub.docker.com/r/apache/flink-kubernetes-operator
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351812
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Gyula Fora
>>
>


Re: [ANNOUNCE] Apache Flink Kubernetes Operator 1.0.1 released

2022-06-28 Thread Gyula Fóra
Sorry for the incorrect blogpost link, please ignore that. There is no
blogpbost for 1.0.1 :)

Gyula

On Tue, Jun 28, 2022 at 9:43 AM Yang Wang  wrote:

> Thanks Gyula for working on the first patch release for the Flink
> Kubernetes Operator project.
>
>
> Best,
> Yang
>
>
>
> Gyula Fóra  于2022年6月28日周二 00:22写道:
>
>> The Apache Flink community is very happy to announce the release of Apache
>> Flink Kubernetes Operator 1.0.1.
>>
>> The Flink Kubernetes Operator allows users to manage their Apache Flink
>> applications and their lifecycle through native k8s tooling like kubectl.
>> <
>> https://flink.apache.org/news/2022/04/03/release-kubernetes-operator-0.1.0.html
>> >
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Official Docker image for Flink Kubernetes Operator applications can be
>> found at:
>> https://hub.docker.com/r/apache/flink-kubernetes-operator
>>
>> The full release notes are available in Jira:
>>
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351812
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>>
>> Regards,
>> Gyula Fora
>>
>


[ANNOUNCE] Apache Flink Kubernetes Operator 1.0.1 released

2022-06-27 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.0.1.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.


The release is available for download at:
https://flink.apache.org/downloads.html

Official Docker image for Flink Kubernetes Operator applications can be
found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351812

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


[ANNOUNCE] Apache Flink Kubernetes Operator 1.0.1 released

2022-06-27 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 1.0.1.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.


The release is available for download at:
https://flink.apache.org/downloads.html

Official Docker image for Flink Kubernetes Operator applications can be
found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12351812

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


Re: Flink k8s Operator on AWS?

2022-06-21 Thread Gyula Fóra
A small addition to what Matyas has said:

The limitation of only supporting local scheme is coming from the Flink
Kubernetes Application mode directly and is not related to the operator
itself.
Once this feature is added to Flink itself the operator can also support it
for newer Flink versions.

Gyula

On Tue, Jun 21, 2022 at 2:52 PM Őrhidi Mátyás 
wrote:

> Hi Matt,
>
> - In FlinkDeployments you can utilize an init container to download your
> artifact onto a shared volume, then you can refer to it as local:/.. from
> the main container. FlinkDeployments comes with pod template support
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/pod-template/#pod-template
>
> - FlinkSessionJobs comes with an artifact fetcher, but it may need some
> tweaking to make it work on your environment:
>
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/overview/#flinksessionjob-spec-overview
>
> I hope it helps, let us know if you have further questions.
>
> Cheers,
> Matyas
>
>
>
> On Tue, Jun 21, 2022 at 2:35 PM Matt Casters <
> matt.cast...@neotechnology.com> wrote:
>
>> Hi Flink team!
>>
>> I'm interested in getting the new Flink Kubernetes Operator to work on
>> AWS EKS.  Following the documentation I got pretty far.  However, when
>> trying to run a job I got the following error:
>>
>> Only "local" is supported as schema for application mode. This assumes t
>>> hat the jar is located in the image, not the Flink client. An example of
>>> such path is: local:///opt/flink/examples/streaming/WindowJoin.jar
>>
>>
>>  I have an Apache Hop/Beam fat jar capable of running the Flink pipeline
>> in my yml file:
>>
>> jarURI: s3://hop-eks/hop/hop-2.1.0-fat.jar
>>
>> So how could I go about getting the fat jar in a desired location for the
>> operator?
>>
>> Getting this to work would be really cool for both short and long-lived
>> pipelines in the service of all sorts of data integration work.  It would
>> do away with the complexity of setting up and maintaining your own Flink
>> cluster.
>>
>> Thanks in advance!
>>
>> All the best,
>>
>> Matt (mcasters, Apache Hop PMC)
>>
>>


Re: Flink operator deletes the FlinkDeplyoment after a while

2022-06-14 Thread Gyula Fóra
Hi Sigalit,

This could be related to https://issues.apache.org/jira/browse/FLINK-27889
We have fixed this issue already (after the release), you could simply use
the latest operator image from of `release-1.0:


*ghcr.io/apache/flink-kubernetes-operator:cc8207c
*
In any case there probably was some error during the initial deployment of
your job, and the operator could not record the deployment information in
the CR status correctly.
This should not happen normally.

Cheers,
Gyula

On Tue, Jun 14, 2022 at 6:09 AM Sigalit Eliazov  wrote:

> after few hours of running job manager and task manager generated using
> the operator
> i get the following message in the operator log
> There really wasn't any traffic and the flink deployment is being delete
>
> === Finished metrics report
> ===
> Deleting FlinkDeployment
> 2022-06-14 03:09:51,847 i.j.o.p.e.ReconciliationDispatcher
> [ERROR][ns/job-namel] Error during event processing ExecutionScope{
> resource id: CustomResourceID{name='job-name', namespace='ns'}, version:
> 53138} failed.
> java.lang.RuntimeException: Cannot create observe config before first
> deployment, this indicates a bug.
> at
> org.apache.flink.kubernetes.operator.config.FlinkConfigManager.getObserveConfig(FlinkConfigManager.java:137)
> at
> org.apache.flink.kubernetes.operator.service.FlinkService.cancelJob(FlinkService.java:357)
> at
> org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconciler.shutdown(ApplicationReconciler.java:327)
> at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractDeploymentReconciler.cleanup(AbstractDeploymentReconciler.java:56)
> at
> org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractDeploymentReconciler.cleanup(AbstractDeploymentReconciler.java:37)
> at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:107)
> at
> org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController.cleanup(FlinkDeploymentController.java:59)
> at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:68)
> at
> io.javaoperatorsdk.operator.processing.Controller$1.execute(Controller.java:50)
> at
> io.javaoperatorsdk.operator.api.monitoring.Metrics.timeControllerExecution(Metrics.java:34)
>
> i'm not sure I understand this behviour.
> Thanks
> Sigalit
>


Re: Flink Operator 1.0.0 not working

2022-06-08 Thread Gyula Fóra
Seems like something is off with your CRD.

You could try replacing it using:

kubectl replace -f
https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml

After that you should be able to deploy the examples.
Gyula

On Wed, Jun 8, 2022 at 4:03 PM Geldenhuys, Morgan Karl <
morgan.geldenh...@tu-berlin.de> wrote:

> Greetings all,
>
>
> I am trying to get the flink operator (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.0/)
> working, however running into a number of issues.
>
>
> I have a fresh Kubernetes cluster running and have followed all the
> instructions for deploying the operator as per the documentation (
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.0/docs/try-flink-kubernetes-operator/quick-start/
> ).
>
>
> The pods seem to start up correctly, however when I run the following
> command:
>
> kubectl create -f 
> https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0/examples/basic.yaml
>
> It returns with the following error:
>
>
> error: unable to recognize "
> https://raw.githubusercontent.com/apache/flink-kubernetes-operator/release-1.0/examples/basic.yaml":
> no matches for kind "FlinkDeployment" in version "flink.apache.org/v1beta1
> "
>
> Any ideas?
>
>
> Regards,
>
> M.
>
>


Re: Flink Kubernetes Operator v1.0 ETA

2022-06-01 Thread Gyula Fóra
Hi Jeesmon!

We are currently working through the release process. We are now in the
middle of voting for RC4 (we have identified and fixed a number of blocker
issues in earlier RCs).

We are hopeful that this RC will be successful in which case you will have
a release by the end of the week. If we hit any further blockers that might
delay it 1-2 days, but I would say current ETA is end of this week.

Cheers,
Gyula

On Wed, Jun 1, 2022 at 5:05 PM Jeesmon Jacob  wrote:

> Hi there,
>
> Is there an ETA on v1.0 release of operator? We are prototyping with a CI
> build from release-1.0 branch but would like to know the approximate ETA of
> official 1.0 release so that we can plan accordingly.
>
> Thanks,
> Jeesmon
>


Re: Python Job Type Support in Flink Kubernetes Operator

2022-05-24 Thread Gyula Fóra
Hi Jeesmon!

Sorry I completely missed this question earlier :)

There is no support currently for Python jobs and I don't really have any
experience with Python jobs so cannot really comment on how easy it would
be to integrate it.

We and most of the companies currently involved with developing the
operators focus on Flink production jobs built on Java so this feature is
not on our radar at the moment.
If this is something interesting for you and you would like to investigate
and contribute to it we would be happy to help you along the way.

Cheers,
Gyula

On Tue, May 24, 2022 at 5:24 AM Jeesmon Jacob  wrote:

> Hi Gyula,
>
> Any idea on this? We are exploring current limitations of using the
> operator for Flink deployment and if there is a plan to support Python jobs
> in future will help us.
>
> Thanks,
> Jeesmon
>
> On Fri, May 20, 2022 at 3:46 PM Jeesmon Jacob  wrote:
>
>> Hi there,
>>
>> Is there a plan to support Python Job Type in Flink Kubernetes Operator?
>> If yes, any ETA?
>>
>> According to this previous operator overview only Java jobs are supported
>> in operator. This page was recently modified to remove the features table.
>>
>>
>> https://github.com/apache/flink-kubernetes-operator/blob/73369b851f2cd92a6818bb84e21157518d63a48d/docs/content/docs/concepts/overview.md
>>
>> Job Type Jar job full
>>
>> SQL Job no
>>
>> Python Job no
>>
>> Thanks,
>> Jeesmon
>>
>


Re: Flink Kubernetes Operator: Specifying env variables from ConfigMaps

2022-05-23 Thread Gyula Fóra
Hi Mads!

I think you need to use the podTemplate for this. You can either do it in
the top level spec or customize it for tm/jm respectively.

Keep in mind that pod templates are merged with the base flink template so
it's enough to specify the fields relevant for you (in these case the env
variables for the main container).
So it should be fairly simple :)

Let us know if you hit any issues.

Cheers
Gyula

On Mon, May 23, 2022 at 6:20 AM Mads Ellersgaard Kalør 
wrote:

> Hi,
>
> We use a number of environment variables to configure our Flink pipelines,
> such as Kafka connection info, hostnames for external services etc. This
> works well when running a standalone Kubernetes deployment or on a local
> environment in Docker, but I cannot find any documentation about how to
> specify environment variables (from ConfigMaps or Secrets) in the Flink
> Kubernetes Operator (I would expect it to be in the JobSpec part of the
> FlinkDeploymentSpec).
>
> Do I have to create a PodTemplate, or is there a simpler way?
>
>
> Thanks,
>
> Mads
>


Re: Example of Beam Go job that uses Flink Kubernetes operator

2022-05-21 Thread Gyula Fóra
Hi Red!

Thank you for your interest and for checking out the official Flink
Kubernetes Operator Project.

This project is still relatively new and so far our focus has been on
running native Flink workloads (Flink Application/Session clusters, and
session jobs) and we haven't tried running a Flink Beam job yet.

I think in your case the best thing to try would be the application
deployment
.
The current assumption is that your Flink application workload is packaged
into a fatjar with a main class that can be executed by the Flink jobmaster
component. I am not sure to what extent this is possible with the current
Beam implementation, it would be good to get input from the Beam community.

I guess there are multiple ways to achieve this on a high level:
 - Bundle everything into a fatjar (runner + user jar + configs)
 - Have the runner and user jars separately in the docker image and
configure flink to add them to the classpath

Maybe someone already tried running Beam jobs using the Flink Application
mode in Flink, that person should be able to provide a working answer :)

Cheers,
Gyula

On Sat, May 21, 2022 at 8:04 AM Red Daly  wrote:

> I created an example project with my attempts to get something working:
>
> https://github.com/gonzojive/beam-go-k8s
>
> The good news is I got something working with Beam, Go, Bazel, Flink, and
> Kubernetes (minikube). The working version doesn't use the operator and
> only runs in a single pod.
>
> I would still like to figure out how to use the Flink operator with a Go
> Beam job. I'm not yet sure how the operator should work together with the
> "beam_flink1.14_job_server" and the Go binary.
>
> On Fri, May 20, 2022 at 5:17 PM Red Daly  wrote:
>
>> Hi,
>>
>> This is a request for documentation to walk through using the Flink
>> Kubernetes operator with the Go Beam SDK. I will plan to update the thread
>> as I investigate. I hope this feedback is useful.
>>
>> A few observations:
>>
>> 1) The official "Quick Start guide
>> "
>> for the Kubernetes operator is great. It would be great if there were a
>> followup guide for how to use this operator to run a Beam job, especially a
>> Beam Go job.
>>
>> 2) The closest to a walkthrough I have found is
>> github.com/sambvfx/beam-flink-k8s, but that doesn't seem to use the
>> operator at all.
>>
>> 3) There are 3 different Flink Kubernetes operators. (Lyft,
>> GoogleCloudPlatform, and the Apache one.) The github repos for these don't
>> have information about the other projects in their READMEs, from what I can
>> tell. So it is a bit confusing.
>>
>> 4) There are a lot of concepts between Beam and Flink. To be a bit blunt,
>> I don't want to learn many concepts up front. I just want to run a few
>> commands and be up and running, then invest the time to learn more concepts
>> later.
>>
>> - Red
>>
>


Re: Flink Kubernetes operator not having a scale subresource

2022-05-19 Thread Gyula Fóra
Hi Team!

This is probably something for after the release but I created a simple
prototype for the scaling subresource based on taskmanager replica count.

You can take a look here:
https://github.com/apache/flink-kubernetes-operator/pull/227

After some consideration I decided against using parallelism and used tm
replicas instead (still with native integration), I describe this in the PR.

I will leave the PR open so people can experiment/comment and we should
definitely get back to this after the 1.0.0 release because it seems to be
a very lightweight yet useful feature.

Cheers,
Gyula


On Sat, May 7, 2022 at 11:25 AM Gyula Fóra  wrote:

> Hi Jay!
>
> I will take a closer look into this and see if we can use the parallelism
> in the scale subresource.
>
> If you could experiment with this and see if it works with the current CRD
> that would be helpful . Not sure if we need to change the status or
> anything as parallelism is only part of the spec at the moment.
>
> If you have a working modified CRD I would appreciate if you could share
> it with us!
>
> Don’t worry about the release schedule, if we think that this is important
> and we need some changes for it , we can push the release out a few days if
> necessary.
>
> What is important at this point to understand what exactly we need to make
> the parallelism scaling work natively to avoid breaking changes to the
> spec/status after the release :)
>
> Cheers
> Gyula
>
> On Sat, 7 May 2022 at 11:14, Jay Ghiya  wrote:
>
>> Hi Team,
>>
>> Yes we can change the parallelism of flink job. So going through the
>> roadmap , what I understand that we have put the standalone mode as second
>> priority due to right reasons. So , if possible can I be of any help to
>> accelerate this as we have a tight release schedule so would want to close
>> this in next 10 days with your guys’ help.
>>
>> Looking forward to hear from you !
>>
>> -Jay
>>
>> Sent with a Spark <https://sparkmailapp.com/source?from=signature>
>> On 7 May 2022, 8:15 AM +0530, Yang Wang , wrote:
>>
>> Currently, the flink-kubernetes-operator is using Flink native K8s
>> integration[1], which means Flink ResourceManager will dynamically allocate
>> TaskManager on demand.
>> So the users do not need to specify the replicas of TaskManager.
>>
>> Just like Gyula said, one possible solution to make "kubectl scale" work
>> is to change the parallelism of Flink job.
>>
>> If the standalone mode[2] is introduced in the operator, then it is also
>> possible to directly change the replicas of TaskManager pods.
>>
>>
>> [1].
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/
>> [2].
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-225%3A+Implement+standalone+mode+support+in+the+kubernetes+operator
>>
>> Best,
>> Yang
>>
>> Gyula Fóra  于2022年5月7日周六 04:26写道:
>>
>>> Hi Jay!
>>>
>>> Interesting question/proposal to add the scale-subresource.
>>>
>>> I am not an expert on this area but we will look into this a little and
>>> give you some feedback and see if we can incorporate something into the
>>> upcoming release if it makes sense.
>>>
>>> On a high level there is not a single replicas value for a
>>> FlinkDeployment that would be easy to map, but maybe we could use the
>>> parallelism value for this purpose for Applications/Session jobs.
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Fri, May 6, 2022 at 8:04 PM Jay Ghiya  wrote:
>>>
>>>>  Hi Team,
>>>>
>>>>
>>>> I have been experimenting the Flink Kubernetes operator. One of the
>>>> biggest miss that we have is it does not support scale sub resource as of
>>>> now to support reactive scaling. Without that commercially it becomes very
>>>> difficult for products like us who have very varied loads for every hour.
>>>>
>>>>
>>>>
>>>> Can I get some direction on the same to contribute on
>>>> https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#scale-subresource
>>>>  for
>>>> our Kubernetes operator crd?
>>>>
>>>> I have been a hard time reading -> 
>>>> *https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
>>>> <https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml>
>>>>  to
>>>> figure out the replicas, status,label selector json path of task
>>>> manager? It may be due to lack of my knowledge so sense of direction will
>>>> help me.*
>>>>
>>>> *-Jay*
>>>> *GEHC*
>>>>
>>>


Re: Flink Kubernetes operator not having a scale subresource

2022-05-07 Thread Gyula Fóra
Hi Jay!

I will take a closer look into this and see if we can use the parallelism
in the scale subresource.

If you could experiment with this and see if it works with the current CRD
that would be helpful . Not sure if we need to change the status or
anything as parallelism is only part of the spec at the moment.

If you have a working modified CRD I would appreciate if you could share it
with us!

Don’t worry about the release schedule, if we think that this is important
and we need some changes for it , we can push the release out a few days if
necessary.

What is important at this point to understand what exactly we need to make
the parallelism scaling work natively to avoid breaking changes to the
spec/status after the release :)

Cheers
Gyula

On Sat, 7 May 2022 at 11:14, Jay Ghiya  wrote:

> Hi Team,
>
> Yes we can change the parallelism of flink job. So going through the
> roadmap , what I understand that we have put the standalone mode as second
> priority due to right reasons. So , if possible can I be of any help to
> accelerate this as we have a tight release schedule so would want to close
> this in next 10 days with your guys’ help.
>
> Looking forward to hear from you !
>
> -Jay
>
> Sent with a Spark <https://sparkmailapp.com/source?from=signature>
> On 7 May 2022, 8:15 AM +0530, Yang Wang , wrote:
>
> Currently, the flink-kubernetes-operator is using Flink native K8s
> integration[1], which means Flink ResourceManager will dynamically allocate
> TaskManager on demand.
> So the users do not need to specify the replicas of TaskManager.
>
> Just like Gyula said, one possible solution to make "kubectl scale" work
> is to change the parallelism of Flink job.
>
> If the standalone mode[2] is introduced in the operator, then it is also
> possible to directly change the replicas of TaskManager pods.
>
>
> [1].
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/resource-providers/native_kubernetes/
> [2].
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-225%3A+Implement+standalone+mode+support+in+the+kubernetes+operator
>
> Best,
> Yang
>
> Gyula Fóra  于2022年5月7日周六 04:26写道:
>
>> Hi Jay!
>>
>> Interesting question/proposal to add the scale-subresource.
>>
>> I am not an expert on this area but we will look into this a little and
>> give you some feedback and see if we can incorporate something into the
>> upcoming release if it makes sense.
>>
>> On a high level there is not a single replicas value for a
>> FlinkDeployment that would be easy to map, but maybe we could use the
>> parallelism value for this purpose for Applications/Session jobs.
>>
>> Cheers,
>> Gyula
>>
>> On Fri, May 6, 2022 at 8:04 PM Jay Ghiya  wrote:
>>
>>>  Hi Team,
>>>
>>>
>>> I have been experimenting the Flink Kubernetes operator. One of the
>>> biggest miss that we have is it does not support scale sub resource as of
>>> now to support reactive scaling. Without that commercially it becomes very
>>> difficult for products like us who have very varied loads for every hour.
>>>
>>>
>>>
>>> Can I get some direction on the same to contribute on
>>> https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#scale-subresource
>>>  for
>>> our Kubernetes operator crd?
>>>
>>> I have been a hard time reading -> 
>>> *https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
>>> <https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml>
>>>  to
>>> figure out the replicas, status,label selector json path of task
>>> manager? It may be due to lack of my knowledge so sense of direction will
>>> help me.*
>>>
>>> *-Jay*
>>> *GEHC*
>>>
>>


Re: Flink Kubernetes operator not having a scale subresource

2022-05-06 Thread Gyula Fóra
Hi Jay!

Interesting question/proposal to add the scale-subresource.

I am not an expert on this area but we will look into this a little and
give you some feedback and see if we can incorporate something into the
upcoming release if it makes sense.

On a high level there is not a single replicas value for a
FlinkDeployment that would be easy to map, but maybe we could use the
parallelism value for this purpose for Applications/Session jobs.

Cheers,
Gyula

On Fri, May 6, 2022 at 8:04 PM Jay Ghiya  wrote:

>  Hi Team,
>
>
> I have been experimenting the Flink Kubernetes operator. One of the
> biggest miss that we have is it does not support scale sub resource as of
> now to support reactive scaling. Without that commercially it becomes very
> difficult for products like us who have very varied loads for every hour.
>
>
>
> Can I get some direction on the same to contribute on
> https://kubernetes.io/docs/tasks/extend-kubernetes/custom-resources/custom-resource-definitions/#scale-subresource
>  for
> our Kubernetes operator crd?
>
> I have been a hard time reading -> 
> *https://github.com/apache/flink-kubernetes-operator/blob/main/helm/flink-kubernetes-operator/crds/flinkdeployments.flink.apache.org-v1.yml
> 
>  to
> figure out the replicas, status,label selector json path of task
> manager? It may be due to lack of my knowledge so sense of direction will
> help me.*
>
> *-Jay*
> *GEHC*
>


Re: Kubernetes Operator --allowNonRestoredState config

2022-04-29 Thread Gyula Fóra
Hi!

We did not expose this as a top level flag in the spec, but you can enable
this through the flink configuration using:

execution.savepoint.ignore-unclaimed-state: "true"

Cheers,
Gyula

On Fri, Apr 29, 2022 at 5:26 PM Shqiprim Bunjaku 
wrote:

> Hi all,
>
> I am using Flink Kubernetes Operator to manage my Flink applications. I
> have one issue when making changes in pipeline and I need to pass
> --allowNonRestoredState configuration, but I cannot find how I can do
> this using Flink Operator. Tried below method but didn't work:
>
>  job:
> args:
> - --allowNonRestoredState
> entryClass: package.MainClass
> jarURI: local:///opt/flink/usrlib/app.jar
> parallelism: 6
> savepointTriggerNonce: 0
> state: running
>
> Thank you
>


Re: Enabling savepoints when deploying in Application Mode

2022-04-11 Thread Gyula Fóra
Hi Lilli!

I am not aware of any problems with savepoint restore in application mode.
What you can try is to use the *execution.savepoint.path *configuration
setting to control it instead of the CLI and see if it makes a difference
for you.

Otherwise, you could also check out the
https://github.com/apache/flink-kubernetes-operator  (docs
)
which can help you manage your Flink Application Deployments in Kubernetes.

Cheers,
Gyula

On Mon, Apr 11, 2022 at 8:09 PM Lilli Pearson  wrote:

> Hi,
>
> Summary:
> I've run into a number of issues trying to marry savepoints with running
> Flink in Application Mode, and am wondering if anyone has suggestions on
> how to resolve them, or if savepoints and Application Mode simply aren't
> designed to work together.
>
> Context on app deployment:
> For long-running processing of my Kafka streams, I'm running Flink 1.13.5
> in application mode, using CI/CD to deploy the cluster to Kubernetes by
> deleting and recreating the deployment. This approach has worked great with
> checkpoints. However, since the savepoint Flink should start up with needs
> to be specified on startup, this approach would need to change a bit.
>
> Details:
> In experimenting with savepoints while running the app in Application
> Mode, I've run into some issues that have made me suspect these two
> features just don't work well together, at least in Flink 1.13, though I
> can't find documentation that says so directly. (Maybe it's implied, as
> considering how application mode is set up, it does seem reasonable to me
> that savepoints wouldn't work.) For example:
> * The entire /jars API (link:
> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/rest_api/#jars)
> is simply unavailable on my cluster (404s), though the rest of the API
> works fine. This means I can't use those endpoints to submit a jar to start
> from
> * using the CLI to run has been equally unsuccessful; when I run a command
> like  `bin/flink run path/to/jar.jar -s path/to/savepoint`, it fails and
> the root cause error is
> `org.apache.flink.runtime.rest.util.RestClientException: [Not found.]`
>
>
> Thanks in advance for any help or advice!
>
>


[ANNOUNCE] Apache Flink Kubernetes Operator 0.1.0 released

2022-04-04 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 0.1.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.
This is the first release of this new community effort to include a robust
Java based operator implementation in the Flink project umbrella.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/04/03/release-kubernetes-operator-0.1.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Official Docker image for Flink Kubernetes Operator applications can be
found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?
projectId=12315522=12351499

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


[ANNOUNCE] Apache Flink Kubernetes Operator 0.1.0 released

2022-04-04 Thread Gyula Fóra
The Apache Flink community is very happy to announce the release of Apache
Flink Kubernetes Operator 0.1.0.

The Flink Kubernetes Operator allows users to manage their Apache Flink
applications and their lifecycle through native k8s tooling like kubectl.
This is the first release of this new community effort to include a robust
Java based operator implementation in the Flink project umbrella.

Please check out the release blog post for an overview of the release:
https://flink.apache.org/news/2022/04/03/release-kubernetes-operator-0.1.0.html

The release is available for download at:
https://flink.apache.org/downloads.html

Official Docker image for Flink Kubernetes Operator applications can be
found at:
https://hub.docker.com/r/apache/flink-kubernetes-operator

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?
projectId=12315522=12351499

We would like to thank all contributors of the Apache Flink community who
made this release possible!

Regards,
Gyula Fora


Re: The flink-kubernetes-operator vs third party flink operators

2022-04-02 Thread Gyula Fóra
Hi!

The main difference at the moment is the programming language and the APIs
used to interact with Flink.

The flink-kubernetes-operator, uses Java and interacts with Flink using the
built in (native) clients.

The other operators have been around since earlier Flink versions. They all
use Golang, and some of them have been already abandoned by the initial
developers. In many cases they also do not support the latest Flink
operational features.

With the flink-kubernetes-operator project we aimed to take inspiration
from the existing operators and create a project where Flink developers can
easily contribute and could be maintained together with Flink itself while
keeping up the high quality standards.

We hope that developers of the other operators would start contributing
soon :)

Cheers,
Gyula



On Sat, 2 Apr 2022 at 11:01, Hao t Chang  wrote:

> Hi
>
>
>
> I started looking into Flink recently more specifically the
> flink-kubernetes-operator so I only know little about it. I found at least
> 3 other Flink K8s operators that Lyft, Google, and Spotify developed.
> Could someone please enlighten me what is the difference of these third
> party Flink k8s operators ? and why don’t these parties contribute to the
> same repo in the Flink community from the beginning ? Thanks.
>
>
>
> Ted
>


Re: Helm install flink-kubernetes-operator failed

2022-04-02 Thread Gyula Fóra
As Biao Geng said this will be much easier after the first preview release.
Which should become available on monday if all works out :)

Until then you can also test our last release candidate which will
hopefully become the release:

helm repo add operator-rc3
https://dist.apache.org/repos/dist/dev/flink/flink-kubernetes-operator-0.1.0-rc3
helm install flink-operator operator-rc3/flink-kubernetes-operator

These commands will stop working once the release is finalized and then you
can move to that :)

Cheers,
Gyi;a

On Sat, Apr 2, 2022 at 9:26 AM Geng Biao  wrote:

> Hi Spoon,
>
>
>
> The command in current doc (helm install flink-kubernetes-operator
> helm/flink-kubernetes-operator) should be executed under the repo’s root
> dir (e.g. ~/flink-kubernetes-operator/).
>
> The community are working on to make this process simpler(
> https://github.com/apache/flink-kubernetes-operator/pull/143). You should
> be able to helm install it directly later.
>
>
>
> Best,
>
> Biao Geng
>
>
>
> *发件人**:* spoon_lz 
> *日期**:* 星期六, 2022年4月2日 下午3:11
> *收件人**:* user 
> *主题**:* Helm install flink-kubernetes-operator failed
>
>
>
> Hi, I am trying it according to the official documentation of
> flink-kubernetes-operator, according to the description in 'Quick Start',
> when I execute the command:
>
> helm install flink-kubernetes-operator helm/flink-kubernetes-operator
>
> it returns an error:
>
> Error: INSTALLATION FAILED: failed to download
> "helm/flink-kubernetes-operator
>
> Executing 'helm repo list’ returned:
>
> NAME   URL
> bitnami  https://charts.bitnami.com/bitnami
> stablehttps://kubernetes.oss-cn-hangzhou.aliyuncs.com/charts
>
> and the command 'helm repo update' has been executed.
>
>
>
> But using the commands 'helm search repo flink' and 'helm search repo
> helm/flink-kubernetes-operator' both returned:
>
> No results found
>
>
> Is my repository configured incorrectly? Hope to get your help
>
>
>
>
>


Re: flink cluster startup time

2022-03-30 Thread Gyula Fóra
Hi Frank!

Thank you for the interest.

As the others said, the flink-kubernetes-operator will give you quicker
job/cluster startup time together with full support for the application
mode.

Production readiness is always relative. If I had to build a new production
use-case I would not hesitate to use the flink-kubernetes-operator, simply
because the internal architecture is much simpler and it uses the Flink
Java Client APIs to interact with the cluster in a Flink-native manner. For
me as a java developer it is also much easier to debug/fix any issues that
might come up.

One important thing to note here is that the way the
flink-kubernetes-operator works is much less intrusive on the running
cluster compared to the SpotifyOperator. It internally relies on the Flink
Native Kubernetes integration (including HA) and once the job is submitted
it lets Flink take care of the rest.

I hope you will have some time to test our preview release and share some
feedback!

Cheers,
Gyula



On Thu, Mar 31, 2022 at 4:51 AM Yang Wang  wrote:

> @Gyula Fóra  is trying to prepare the preview
> release(0.1) for flink-kubernetes-operator. It now is fully functional for
> application mode.
> You could have a try and share more feedback with the community.
>
> The release-1.0 aims for production ready. And we still miss some
> important pieces(e.g. FlinkSessionJob, SQL job, observability improvements,
> etc.).
>
> Best,
> Yang
>
> Frank Dekervel  于2022年3月30日周三 23:40写道:
>
>> Hello David,
>>
>> Thanks for the information! So the two main takeaways from your email are
>> to
>>
>>- Move to something supporting application mode. Is
>>https://github.com/apache/flink-kubernetes-operator already ready
>>enough for production deployments ?
>>- wait for flink 1.15
>>
>> thanks!
>> Frank
>>
>>
>> On Mon, Mar 28, 2022 at 9:16 AM David Morávek  wrote:
>>
>>> Hi Frank,
>>>
>>> I'm not really familiar with the internal workings of the Spotify's
>>> operator, but here are few general notes:
>>>
>>> - You only need the JM process for the REST API to become available (TMs
>>> can join in asynchronously). I'd personally aim for < 1m for this step, if
>>> it takes longer it could signal a problem with your infrastructure (eg.
>>> images taking long time to pull, incorrect setup of liveness / readiness
>>> probes, not enough resources).
>>>
>>> The job is packaged as a fat jar, but it is already baked in the docker
>>>> images we use (so technically there would be no need to "submit" it from a
>>>> separate pod).
>>>>
>>>
>>> That's where the application mode comes in. Please note that this might
>>> be also one of the reasons for previous steps taking too long (as all pods
>>> are pulling an image with your fat jar that might not be cached).
>>>
>>> Then the application needs to start up and load its state from the
>>>> latest savepoint, which again takes a couple of minutes
>>>>
>>>
>>> This really depends on the state size, state backend (eg. rocksdb
>>> restore might take longer), object store throughput / rate limit. The
>>> native-savepoint feature that will come out with 1.15 might help to shave
>>> off some time here as the there is no conversion into the state backend
>>> structures.
>>>
>>> Best,
>>> D.
>>>
>>>-
>>>
>>>
>>> On Fri, Mar 25, 2022 at 9:46 AM Frank Dekervel 
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> We run flink using the spotify flink Kubernetes operator (job cluster
>>>> mode). Everything works fine, including upgrades and crash recovery. We do
>>>> not run the job manager in HA mode.
>>>>
>>>> One of the problems we have is that upon upgrades (or during testing),
>>>> the startup time of the flink cluster takes a very long time:
>>>>
>>>>- First the operator needs to create the cluster (JM+TM), and wait
>>>>for it to respond for api requests. This already takes a couple of 
>>>> minutes.
>>>>- Then the operator creates a job-submitter pod that submits the
>>>>job to the cluster. The job is packaged as a fat jar, but it is already
>>>>baked in the docker images we use (so technically there would be no 
>>>> need to
>>>>"submit" it from a separate pod). The submission goes rather fast tho 
>>>> (the
>>>>time between the job 

Re: Deploy a Flink session cluster natively on K8s with multi AZ

2022-03-27 Thread Gyula Fóra
Hi!

I think the Flink Kubernetes Operator (
https://github.com/apache/flink-kubernetes-operator) project is exactly
what you are looking for.

This is a relatively new addition to Flink that supports k8s application
and session deployments with lifecycle management through kubernetes native
tooling.

Cheers,
Gyula

On Sun, 27 Mar 2022 at 08:50, Almog Rozencwajg <
almog.rozencw...@niceactimize.com> wrote:

> Hi,
>
>
>
> From the documentation, deploy a Flink session cluster natively on K8S is
> by running a shell script.
>
>
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/resource-providers/native_kubernetes/?web=1=cFF7B7F0D-8BEC-4BAA-B06D-3D80A62B24AE#starting-a-flink-session-on-kubernetes
>
> In the example, we can pass args in the command, is there a way to do it
> by using the flink-conf.yaml?
> Is there another way to deploy a session cluster natively on K8S not only
> by running a command?
>
>
> We want to support multi availability zone. If we are deploying the
> cluster in a standalone mode, we can configure the deployment of the job
> manager and task manager and using  K8s pod topology spread constraints to
> achieve it.
> If we are working with native K8s mode, is there a way to do it?
>
>
>
> Thanks,
>
> Almog
>
>
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential and/or legally
> privileged. Any opinions expressed in this communication are not
> necessarily those of NICE Actimize. If this communication has come to you
> in error you must take no action based on it, nor must you copy or show it
> to anyone; please delete/destroy and inform the sender by e-mail
> immediately.
> Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
> Viruses: Although we have taken steps toward ensuring that this e-mail and
> attachments are free from any virus, we advise that in keeping with good
> computing practice the recipient should ensure they are actually virus free.
>


Re: Where can I find flink-kubernetes-operator project?

2022-03-25 Thread Gyula Fóra
Hi Makas!

The repo is here: https://github.com/apache/flink-kubernetes-operator

We should add a link somewhere in the docs :)

Cheers,
Gyula


On Fri, Mar 25, 2022 at 9:45 AM Makas Tzavellas 
wrote:

> Hello,
>
> I came across this document
> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/try-flink-kubernetes-operator/quick-start/
> and would like to give it a try. But I have not been able to find it
> anywhere in Flink's main repository.
>
> Could someone please point me in the right direction?
>
> Thanks,
>
> Makas
>


Re: Flink native k8s integration vs. operator

2022-01-17 Thread Gyula Fóra
e example for this in the production. When a K8s
> >>> node
> >>> >> is down, the standalone K8s deployment will take longer
> >>> >> > recovery time based on the K8s eviction time(IIRC, default is 5
> >>> >> minutes). For the native K8s integration, Flink RM could be aware of
> >>> the
> >>> >> > TM heartbeat lost and allocate a new one timely.
> >>> >>
> >>> >> Thanks for sharing this, we should evaluate it as part of a
> proposal.
> >>> >> If we can optimize recovery or scaling with active resource
> management
> >>> >> then perhaps it is worth to support it through the operator.
> >>> >> Previously mentioned operators all rely on the standalone model.
> >>> >>
> >>> >> Cheers,
> >>> >> Thomas
> >>> >>
> >>> >> On Wed, Jan 12, 2022 at 3:21 AM Konstantin Knauf  >
> >>> >> wrote:
> >>> >> >
> >>> >> > cc dev@
> >>> >> >
> >>> >> > Hi Thomas, Hi everyone,
> >>> >> >
> >>> >> > Thank you for starting this discussion and sorry for chiming in
> >>> late.
> >>> >> >
> >>> >> > I agree with Thomas' and David's assessment of Flink's "Native
> >>> >> Kubernetes
> >>> >> > Integration", in particular, it does actually not integrate well
> >>> with
> >>> >> the
> >>> >> > Kubernetes ecosystem despite being called "native" (tooling,
> >>> security
> >>> >> > concerns).
> >>> >> >
> >>> >> > With respect to common lifecycle management operations: these
> >>> features
> >>> >> are
> >>> >> > not available (within Apache Flink) for any of the other resource
> >>> >> providers
> >>> >> > (YARN, Standalone) either. From this perspective, I wouldn't
> >>> consider
> >>> >> this
> >>> >> > a shortcoming of the Kubernetes integration. Instead, we have been
> >>> >> focusing
> >>> >> > our efforts in Apache Flink on the operations of a single Job, and
> >>> left
> >>> >> > orchestration and lifecycle management that spans multiple Jobs to
> >>> >> > ecosystem projects. I still believe that we should keep this focus
> >>> on
> >>> >> low
> >>> >> > level composable building blocks (like Jobs and Snapshots) in
> Apache
> >>> >> Flink
> >>> >> > to make it easy for everyone to build fitting higher level
> >>> abstractions
> >>> >> > like a FlinkApplication Custom Resource on top of it. For example,
> >>> we
> >>> >> are
> >>> >> > currently contributing multiple improvements [1,2,3,4] to the REST
> >>> API
> >>> >> and
> >>> >> > Application Mode that in our experience will make it easier to
> >>> manage
> >>> >> > Apache Flink with a Kubernetes operator. Given this background, I
> >>> >> suspect a
> >>> >> > Kubernetes Operator in Apache Flink would not be a priority for us
> >>> at
> >>> >> > Ververica - at least right now.
> >>> >> >
> >>> >> > Having said this, if others in the community have the capacity to
> >>> push
> >>> >> and
> >>> >> > *maintain* a somewhat minimal "reference" Kubernetes Operator for
> >>> Apache
> >>> >> > Flink, I don't see any blockers. If or when this happens, I'd see
> >>> some
> >>> >> > clear benefits of using a separate repository (easier independent
> >>> >> > versioning and releases, different build system & tooling (go, I
> >>> >> assume)).
> >>> >> >
> >>> >> > Looking forward to your thoughts,
> >>> >> >
> >>> >> > Konstantin
> >>> >> >
> >>> >> > [1] https://issues.apache.org/jira/browse/FLINK-24275
> >>> >> > [2] https://issues.apache.org/jira/browse/FLINK-24208
> >>> >> > [3]
> >&g

Re: Flink native k8s integration vs. operator

2022-01-10 Thread Gyula Fóra
Hi All!

This is a very interesting discussion.

I think many users find it confusing what deployment mode to choose when
considering a new production application on Kubernetes. With all the
options of native, standalone and different operators this can get tricky :)

I really like the idea that Thomas brought up to have at least a minimal
operator implementation in Flink itself to cover the most common production
job lifecycle management scenarios. I think the Flink community has a very
strong experience in this area to create a successful implementation that
would benefit most production users on Kubernetes.

Cheers,
Gyula

On Mon, Jan 10, 2022 at 4:29 AM Yang Wang  wrote:

> Thanks all for this fruitful discussion.
>
> I think Xintong has given a strong point why we introduced the native K8s
> integration, which is active resource management.
> I have a concrete example for this in the production. When a K8s node is
> down, the standalone K8s deployment will take longer
> recovery time based on the K8s eviction time(IIRC, default is 5 minutes).
> For the native K8s integration, Flink RM could be aware of the
> TM heartbeat lost and allocate a new one timely.
>
> Also when introducing the native K8s integration, another hit is that we
> should make the users are easy enough to migrate from YARN deployment.
> They already have a production-ready job life-cycle management system,
> which is using Flink CLI to submit the Flink jobs.
> So we provide a consistent command "bin/flink run-application -t
> kubernetes-application/yarn-application" to start a Flink application and
> "bin/flink cancel/stop ..."
> to terminate a Flink application.
>
>
> Compared with K8s operator, I know that this is not a K8s
> native mechanism. Hence, I also agree that we still need a powerful K8s
> operator which
> could work with both standalone and native K8s modes. The major difference
> between them is how to start the JM and TM pods. For standalone,
> they are managed by K8s job/deployment. For native, maybe we could simply
> create a submission carrying the "flink run-application" arguments
> which is derived from the Flink application CR.
>
> Make the Flink's active resource manager can talk to the K8s operator is
> an interesting option, which could support both standalone and native.
> Then Flink RM just needs to declare the resource requirement(e.g. 2 * <2G,
> 1CPU>, 2 * <4G, 1CPU>) and defer the resource allocation/de-allocation
> to the K8s operator. It feels like an intermediate form between native and
> standalone mode :)
>
>
>
> Best,
> Yang
>
>
>
> Xintong Song  于2022年1月7日周五 12:02写道:
>
>> Hi folks,
>>
>> Thanks for the discussion. I'd like to share my two cents on this topic.
>>
>> Firstly, I'd like to clarify my understanding of the concepts "native k8s
>> integration" and "active resource management".
>> - Native k8s integration means Flink's master interacts with k8s' api
>> server directly. It acts like embedding an operator inside Flink's master,
>> which manages the resources (pod, deployment, configmap, etc.) and watches
>> / reacts to related events.
>> - Active resource management means Flink can actively start / terminate
>> workers as needed. Its key characteristic is that the resource a Flink
>> deployment uses is decided by the job's execution plan, unlike the opposite
>> reactive mode (resource available to the deployment decides the execution
>> plan) or the standalone mode (both execution plan and deployment resources
>> are predefined).
>>
>> Currently, we have the yarn and native k8s deployments (and the recently
>> removed mesos deployment) in active mode, due to their ability to request /
>> release worker resources from the underlying cluster. And all the existing
>> operators, AFAIK, work with a Flink standalone deployment, where Flink
>> cannot request / release resources by itself.
>>
>> From this perspective, I think a large part of the native k8s integration
>> advantages come from the active mode: being able to better understand the
>> job's resource requirements and adjust the deployment resource accordingly.
>> Both fine-grained resource management (customizing TM resources for
>> different tasks / operators) and adaptive batch scheduler (rescale the
>> deployment w.r.t. different stages) fall into this category.
>>
>> I'm wondering if we can have an operator that also works with the active
>> mode. Instead of talking to the api server directly for adding / deleting
>> resources, Flink's active resource manager can talk to the operator (via
>> CR) about the resources the deployment needs, and let the operator to
>> actually add / remove the resources. The operator should be able to work
>> with (active) or without (standalone) the information of deployment's
>> resource requirements. In this way, users are free to choose between active
>> and reactive (e.g., HPA) rescaling, while always benefiting from the
>> beyond-deployment lifecycle (upgrades, savepoint management, etc.) and
>> 

Re: [ANNOUNCE] Dropping "CheckpointConfig.setPreferCheckpointForRecovery()"

2021-08-25 Thread Gyula Fóra
Hi Stephan,

I do not know if anyone is still relying on this but I think it makes sense
to drop this feature. So +1 from me.

I think it served a valid purpose originally but if we have a good
improvement in the pipeline using the savepoints directly that will
solve the problem properly.
I would consider this feature a workaround at best anyways.

Regards,
Gyula

On Tue, Aug 24, 2021 at 11:56 AM Stephan Ewen  wrote:

> Hi Flink Community!
>
> A quick heads-up: We suggest removing the setting
> "CheckpointConfig.setPreferCheckpointForRecovery()" [1].
>
> The setting has been deprecated since Flink 1.12 and is strongly
> discouraged, because it can lead to data loss or data duplication in
> different scenarios.
> Please see also https://issues.apache.org/jira/browse/FLINK-20427 for
> background.
>
> Are there any concerns about deprecating this issue? Is anyone relying on
> this setting right now?
>
> For a long-term solution to ensuring that there is no slow recovery from
> savepoints: Some committers (me included) are working on a proposal to
> support more efficient savepoints and to ensure that intermediate
> savepoints don't interfere with side effects. Then we can always exclude
> them from recovery without risk of data loss or duplication.
>
> Best,
> Stephan
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java#L493
>


Re: Using double quotes for SQL identifiers

2021-01-26 Thread Gyula Fóra
Thanks guys for the pointers, I will give it a try!

Gyula

On Tue, Jan 26, 2021 at 5:53 PM Timo Walther  wrote:

> Hi Gyula,
>
> the TableEnvironment.getConfig offers a setPlannerConfig. And
> org.apache.flink.table.planner.calcite.CalciteConfigBuilder helps in
> creating an object that implements this interface. You should be able to
> influence the Calcite parser config with this.
>
>
> However, I'm not sure how well this config is passed through the stack.
>
> Regards,
> Timo
>
>
> On 26.01.21 17:38, Sebastian Liu wrote:
> > Hi Gyula,
> >
> > AFAIK, except the sql-dialect, table API does not expose any parser
> > related configuration to the user.
> > But we still can change the config of quoting identifiers in parser with
> > some code changing.
> > You can reference this test class:
> > org.apache.flink.sql.parser.FlinkDDLDataTypeTest.TestFactory.
> >
> >
> > Gyula Fóra mailto:gyula.f...@gmail.com>> 于2021年
> > 1月26日周二 下午8:42写道:
> >
> > Hi All!
> >
> > Is it possible in any way to configure the Table environments to
> > allow double quotes (") to be used for identifiers instead of
> > backticks (`).
> >
> > Thank you!
> > Gyula
> >
> >
> >
> > --
> >
> > *With kind regards
> >  Sebastian
> > Liu 刘洋 Institute of Computing Technology, Chinese Academy of Science
> > Mobile\WeChat: +86—15201613655 E-mail: liuyang0...@gmail.com
> > <mailto:liuyang0...@gmail.com> QQ: 3239559*
> >
>
>


Using double quotes for SQL identifiers

2021-01-26 Thread Gyula Fóra
Hi All!

Is it possible in any way to configure the Table environments to allow
double quotes (") to be used for identifiers instead of backticks (`).

Thank you!
Gyula


Re: [DISCUSS] Upgrade HBase connector to 2.2.x

2020-08-07 Thread Gyula Fóra
Hi Robert,

I completely agree with you on the Bahir based approach.

I am happy to help with the contribution on the bahir side, with thorough
 review and testing.

Cheers,
Gyula

On Fri, 7 Aug 2020 at 09:30, Robert Metzger  wrote:

> It seems that this thead is not on dev@ anymore. Adding it back ...
>
> On Fri, Aug 7, 2020 at 9:23 AM Robert Metzger  wrote:
>
>> I would like to revive this discussion. There's a new JIRA[1] + PR[2] for
>> adding HBase 2 support.
>>
>> it seems that there is demand for a HBase 2 connector, and consensus to
>> do it.
>>
>> The remaining question in this thread seems to be the "how". I would
>> propose to go the other way around as Gyula suggested: We move the legacy
>> connector (1.4x) to bahir and add the new (2.x.x) to Flink.
>> Why? In the Flink repo, we have a pretty solid testing infra, where we
>> also run Hbase end to end tests. This will help us to stabilize the new
>> connector and ensure a good quality.
>> It also, the perception of what goes into Flink, and what into Bahir is a
>> bit clearer if we put the stable, up to date stuff into Flink, and legacy,
>> experimental or unstable connectors into Bahir.
>>
>>
>> Who can take care of this effort? (Decide which Hbase 2 PR to take,
>> review and contribution to Bahir)
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-18795
>> [2] https://github.com/apache/flink/pull/13047
>>
>> On Mon, Jun 22, 2020 at 3:32 PM Gyula Fóra  wrote:
>>
>>> If we were to go the bahir route, I don't see the point in migrating the
>>> 1.4.x version there since that's already available in Flink. To me that is
>>> almost the same as dropping explicit support for 1.4 and telling users to
>>> use older connector versions if they wish to keep using it.
>>>
>>> If we want to keep 1.4 around for legacy users and slowly deprecate
>>> that, we can do that inside Flink and only push the 2.4.x version to bahir.
>>>
>>> What do you think?
>>>
>>> Gyula
>>>
>>> On Mon, Jun 22, 2020 at 3:16 PM Arvid Heise  wrote:
>>>
>>>> If we support both HBase 1 and 2, maybe it's a good time to pull them
>>>> out to Bahir and list them in flink-packages to avoid adding even more
>>>> modules to Flink core?
>>>>
>>>> On Mon, Jun 22, 2020 at 4:05 AM OpenInx  wrote:
>>>>
>>>>> Hi
>>>>>
>>>>> According to my observation in the hbase community, there are still
>>>>> lots of hbase users running their production cluster with version 1.x 
>>>>> (1.4x
>>>>> or 1.5.x). so I'd like to suggest that
>>>>> supporting both hbase1.x & hbase2.x connector.
>>>>>
>>>>> Thanks.
>>>>>
>>>>> On Sat, Jun 20, 2020 at 2:41 PM Ming Li  wrote:
>>>>>
>>>>>> +1 to support both HBase 2.x and Hbase 1.4.x,  just as what we are
>>>>>> doing for Kafka.
>>>>>>
>>>>>> On Fri, Jun 19, 2020 at 4:02 PM Yu Li  wrote:
>>>>>>
>>>>>>> One supplement:
>>>>>>>
>>>>>>> I noticed that there are discussions in HBase ML this March about
>>>>>>> removing stable-1 pointer and got consensus [1], and will follow up in
>>>>>>> HBase community about why we didn't take real action. However, this 
>>>>>>> doesn't
>>>>>>> change my previous statement / stand due to the number of 1.x usages in
>>>>>>> production.
>>>>>>>
>>>>>>> Best Regards,
>>>>>>> Yu
>>>>>>>
>>>>>>> [1]
>>>>>>> http://mail-archives.apache.org/mod_mbox/hbase-dev/202003.mbox/%3c30180be2-bd93-d414-a158-16c9c8d01...@apache.org%3E
>>>>>>>
>>>>>>> On Fri, 19 Jun 2020 at 15:54, Yu Li  wrote:
>>>>>>>
>>>>>>>> +1 on upgrading the HBase version of the connector, and 1.4.3 is
>>>>>>>> indeed an old version.
>>>>>>>>
>>>>>>>> OTOH, AFAIK there're still quite some 1.x HBase clusters in
>>>>>>>> production. We could also see that the HBase community is still 
>>>>>>>> maintaining
>>>>>>>> 1.x release lines (with "stable-1 release" point to 1.4.13) [1]
>>>>>>&g

Re: [DISCUSS] Upgrade HBase connector to 2.2.x

2020-06-22 Thread Gyula Fóra
If we were to go the bahir route, I don't see the point in migrating the
1.4.x version there since that's already available in Flink. To me that is
almost the same as dropping explicit support for 1.4 and telling users to
use older connector versions if they wish to keep using it.

If we want to keep 1.4 around for legacy users and slowly deprecate that,
we can do that inside Flink and only push the 2.4.x version to bahir.

What do you think?

Gyula

On Mon, Jun 22, 2020 at 3:16 PM Arvid Heise  wrote:

> If we support both HBase 1 and 2, maybe it's a good time to pull them out
> to Bahir and list them in flink-packages to avoid adding even more modules
> to Flink core?
>
> On Mon, Jun 22, 2020 at 4:05 AM OpenInx  wrote:
>
>> Hi
>>
>> According to my observation in the hbase community, there are still lots
>> of hbase users running their production cluster with version 1.x (1.4x or
>> 1.5.x). so I'd like to suggest that
>> supporting both hbase1.x & hbase2.x connector.
>>
>> Thanks.
>>
>> On Sat, Jun 20, 2020 at 2:41 PM Ming Li  wrote:
>>
>>> +1 to support both HBase 2.x and Hbase 1.4.x,  just as what we are doing
>>> for Kafka.
>>>
>>> On Fri, Jun 19, 2020 at 4:02 PM Yu Li  wrote:
>>>
 One supplement:

 I noticed that there are discussions in HBase ML this March about
 removing stable-1 pointer and got consensus [1], and will follow up in
 HBase community about why we didn't take real action. However, this doesn't
 change my previous statement / stand due to the number of 1.x usages in
 production.

 Best Regards,
 Yu

 [1]
 http://mail-archives.apache.org/mod_mbox/hbase-dev/202003.mbox/%3c30180be2-bd93-d414-a158-16c9c8d01...@apache.org%3E

 On Fri, 19 Jun 2020 at 15:54, Yu Li  wrote:

> +1 on upgrading the HBase version of the connector, and 1.4.3 is
> indeed an old version.
>
> OTOH, AFAIK there're still quite some 1.x HBase clusters in
> production. We could also see that the HBase community is still 
> maintaining
> 1.x release lines (with "stable-1 release" point to 1.4.13) [1]
>
> Please also notice that HBase follows semantic versioning [2] [3] thus
> don't promise any kind of compatibility (source/binary/wire, etc.) between
> major versions. So if we only maintain 2.x connector, it would not be able
> to work with 1.x HBase clusters.
>
> I totally understand the additional efforts of maintaining two
> modules, but since we're also reserving multiple versions for kafka
> connector, meantime considering the current HBase in-production status, 
> I'd
> still suggest to get both 1.4.13 and 2.2.5 supported.
>
> Best Regards,
> Yu
>
> [1] http://hbase.apache.org/downloads.html
> [2] https://hbase.apache.org/book.html#hbase.versioning
> [3] https://semver.org/
>
>
> On Fri, 19 Jun 2020 at 14:58, Leonard Xu  wrote:
>
>> +1 to support HBase 2.2.x, and +1 to retain HBase 1.4.3 until we
>> deprecates finished(maybe one version is enough).
>>
>> Currently we only support HBase 1.4.3 which is pretty old, and I’m
>> making a flink-sql-connector-hbase[1] shaded jar for pure SQL user, the
>> dependencies is a little more complex.
>>
>>
>> 在 2020年6月19日,14:20,jackylau  写道:
>>
>> + 1 to support HBase 2.x and the hbase 2.x client dependencies are
>> simple and clear. Tbe hbase project shades them all
>>
>>
>> Best,
>> Leonard Xu
>> [1] https://github.com/apache/flink/pull/12687
>>
>>
>>>
>>> --
>>> Best Regards
>>> Michael Li
>>>
>>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> 
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward  - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


Re: Re: [DISCUSS] Upgrade HBase connector to 2.2.x

2020-06-19 Thread Gyula Fóra
Hi!

Having both 1.4.x and 2.x supported means we need different modules or some
shim logic as they are not compatible with each other. I would love to
avoid this if possible because it is a lot of extra effort from a
maintainability perspective.

It would be great to see how many users still use HBase 1.4.3 and how many
would use a 2.2.x connector instead.

Gyula

On Fri, Jun 19, 2020 at 7:44 AM chaojianok  wrote:

> +1 to support HBase 2.x
> And I think the 1.4.x version can be retained for the time being, so that
> users who are currently using the 1.4.x version can have more time to
> evaluate whether their projects need to be upgraded and the cost of
> upgrading.
>
>
>
>
>
>
> At 2020-06-19 12:35:36, "Jark Wu"  wrote:
>
> +1 to support HBase 2.x
> But not sure about dropping support for 1.4.x
>
> I cc'ed to user@ and user-zh@ to hear more feedback from users.
>
> Best,
> Jark
>
> On Thu, 18 Jun 2020 at 21:25, Gyula Fóra  wrote:
>
>> Hi All!
>>
>> I would like to revive an old ticket
>> <https://issues.apache.org/jira/browse/FLINK-9849> and discussion around
>> upgrading the HBase version of the connector.
>>
>> The current HBase version is 1.4.3 which is over 2 years old at this point
>> and incompatible with the newer HBase versions used at many companies.
>>
>> We propose to upgrade the connector to the latest version and drop support
>> for the old version starting from the 1.12 Flink release. This would help
>> us maintain and improve the HBase connector over time.
>>
>> If the community agrees we are happy to contribute this upgrade as we have
>> already developed and tested the updated version.
>>
>> Cheers,
>> Gyula
>>
>
>
>
>


Re: Using logicalType in the Avro table format

2020-04-30 Thread Gyula Fóra
Hi Arvid!

I tried it with Avro 1.9.2, and it lead to the same error.
Seems like Avro cannot find the conversion class between LocalDateTime and
timestamp-millis.
Not sure how exactly this works, maybe we need to set the conversions
ourselves?

Thanks!
Gyula

On Thu, Apr 30, 2020 at 12:53 PM Arvid Heise  wrote:

> Hi Gyula,
>
> it may still be worth to try to upgrade to Avro 1.9.2 (can never hurt) and
> see if this solves your particular problem.
> The code path in GenericDatumWriter is taking the conversion path, so it
> might just work. Of course that depends on the schema being correctly
> translated to a specific record that uses the new TimeConversions [1].
>
> [1]
> https://github.com/apache/avro/blob/master/lang/java/avro/src/main/java/org/apache/avro/data/TimeConversions.java
>
> On Thu, Apr 30, 2020 at 10:41 AM Gyula Fóra  wrote:
>
>> Hi!
>>
>> @Arvid: We are using Avro 1.8 I believe but this problem seems to come
>> from the flink side as Dawid mentioned.
>>
>> @Dawid:
>> Sounds like a reasonable explanation, here are the actual queries to
>> reproduce within the SQL client/table api:
>>
>> CREATE TABLE source_table (
>>  int_field INT,
>>  timestamp_field TIMESTAMP(3)
>> ) WITH (
>>  'connector.type'   = 'kafka',
>>  'connector.version'= 'universal',
>>  'connector.topic'  = 'avro_tset',
>>  'connector.properties.bootstrap.servers'   = '<...>',
>>  'format.type'  = 'avro',
>> 'format.avro-schema' =
>> '{
>>   "type": "record",
>>   "name": "test",
>>   "fields" : [
>> {"name": "int_field", "type": "int"},
>> {"name": "timestamp_field", "type": {"type":"long", "logicalType": 
>> "timestamp-millis"}}
>>   ]
>> }'
>> )
>>
>> INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11');
>> And the error:
>>
>> Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be 
>> cast to java.lang.Long
>>  at 
>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
>>  at 
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
>>  at 
>> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
>>  at 
>> org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
>>  at 
>> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
>>  at 
>> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
>>  at 
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
>>  at 
>> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
>>  at 
>> org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143)
>>
>> I will open a Jira ticket as well with these details.
>>
>> Thank you!
>>
>> Gyula
>>
>>
>> On Thu, Apr 30, 2020 at 10:05 AM Dawid Wysakowicz 
>> wrote:
>>
>>> Hi Gyula,
>>>
>>> I have not verified it locally yet, but I think you are hitting yet
>>> another problem of the unfinished migration from old TypeInformation based
>>> type system to the new type system based on DataTypes. As far as I
>>> understand the problem the information about the bridging class
>>> (java.sql.Timestamp in this case) is lost in the stack. Because this
>>> information is lost/not respected the planner produces LocalDateTime
>>> instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema
>>> expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails
>>> for LocalDateTime. I really hope the effort of FLIP-95 will significantly
>>> reduce the number of problems.
>>>
>>> It's definitely worth reporting a bug.
>>>
>>> BTW could you share how you create the Kafka Table sink to have the full
>>> picture?
>>>
>>> Best,
>>>
>>> Dawid
>>> On 29/04/2020 15:42, Gyula Fóra wrote:
>>>
>>> Hi All!
>>>
>>> We are trying to work with avro serialized data from Kafka u

Re: Using logicalType in the Avro table format

2020-04-30 Thread Gyula Fóra
Hi!

@Arvid: We are using Avro 1.8 I believe but this problem seems to come from
the flink side as Dawid mentioned.

@Dawid:
Sounds like a reasonable explanation, here are the actual queries to
reproduce within the SQL client/table api:

CREATE TABLE source_table (
int_field INT,
timestamp_field TIMESTAMP(3)
) WITH (
'connector.type'   = 'kafka',
'connector.version'= 'universal',
'connector.topic'  = 'avro_tset',
'connector.properties.bootstrap.servers'   = '<...>',
'format.type'  = 'avro',
'format.avro-schema' =
'{
  "type": "record",
  "name": "test",
  "fields" : [
{"name": "int_field", "type": "int"},
{"name": "timestamp_field", "type": {"type":"long",
"logicalType": "timestamp-millis"}}
  ]
}'
)

INSERT INTO source_table VALUES (12, TIMESTAMP '1999-11-11 11:11:11');
And the error:

Caused by: java.lang.ClassCastException: java.time.LocalDateTime
cannot be cast to java.lang.Long
at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
at 
org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:166)
at 
org.apache.avro.specific.SpecificDatumWriter.writeField(SpecificDatumWriter.java:90)
at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:156)
at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:118)
at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:62)
at 
org.apache.flink.formats.avro.AvroRowSerializationSchema.serialize(AvroRowSerializationSchema.java:143)

I will open a Jira ticket as well with these details.

Thank you!

Gyula


On Thu, Apr 30, 2020 at 10:05 AM Dawid Wysakowicz 
wrote:

> Hi Gyula,
>
> I have not verified it locally yet, but I think you are hitting yet
> another problem of the unfinished migration from old TypeInformation based
> type system to the new type system based on DataTypes. As far as I
> understand the problem the information about the bridging class
> (java.sql.Timestamp in this case) is lost in the stack. Because this
> information is lost/not respected the planner produces LocalDateTime
> instead of a proper java.sql.Timestamp time. The AvroRowSerializationSchema
> expects java.sql.Timestamp for a column of TIMESTAMP type and thus it fails
> for LocalDateTime. I really hope the effort of FLIP-95 will significantly
> reduce the number of problems.
>
> It's definitely worth reporting a bug.
>
> BTW could you share how you create the Kafka Table sink to have the full
> picture?
>
> Best,
>
> Dawid
> On 29/04/2020 15:42, Gyula Fóra wrote:
>
> Hi All!
>
> We are trying to work with avro serialized data from Kafka using the Table
> API and use TIMESTAMP column type.
>
> According to the docs
> <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#apache-avro-format>,
> we can use long type with logicalType: timestamp-millis.
> So we use the following avro field schema in the descriptor:
>
>
>   {"name": "timestamp_field", "type": {"type":"long", "logicalType": 
> "timestamp-millis"}}
>
> When trying to insert into the table we get the following error:
>
> Caused by: java.lang.ClassCastException: class java.time.LocalDateTime cannot 
> be cast to class java.lang.Long (java.time.LocalDateTime and java.lang.Long 
> are in module java.base of loader 'bootstrap')  at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
>at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
>
> It seems like the avro format (serializer) is not aware of the logical type 
> conversion that is needed to convert back to the physical type long.
>
> I looked at the AvroTypesITCase which uses all kinds of logical types but I 
> could only find logic that maps between Avro Pojos and tables and none that 
> actually uses the serializaiton/deserialization logic with the format.
>
> Could someone please help me with this? Maybe what I am trying to do is not 
> possible, or I just missed a crucial step.
>
> Thank you!
> Gyula
>
>
>
>


Using logicalType in the Avro table format

2020-04-29 Thread Gyula Fóra
Hi All!

We are trying to work with avro serialized data from Kafka using the Table
API and use TIMESTAMP column type.

According to the docs
,
we can use long type with logicalType: timestamp-millis.
So we use the following avro field schema in the descriptor:


  {"name": "timestamp_field", "type": {"type":"long", "logicalType":
"timestamp-millis"}}

When trying to insert into the table we get the following error:

Caused by: java.lang.ClassCastException: class java.time.LocalDateTime
cannot be cast to class java.lang.Long (java.time.LocalDateTime and
java.lang.Long are in module java.base of loader 'bootstrap')
at 
org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:131)
at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)

It seems like the avro format (serializer) is not aware of the logical
type conversion that is needed to convert back to the physical type
long.

I looked at the AvroTypesITCase which uses all kinds of logical types
but I could only find logic that maps between Avro Pojos and tables
and none that actually uses the serializaiton/deserialization logic
with the format.

Could someone please help me with this? Maybe what I am trying to do
is not possible, or I just missed a crucial step.

Thank you!
Gyula


<    1   2   3   4   >