Out of memory in heap memory when working with state

2022-09-05 Thread lan tran
Hi team,Currently, I was facing the OutOfMemoryError: Java heap space. This was some how due to the fact that I was storing the state on FileSystem. With the FsStateBackend, the working state for each task manager is in memory (on the JVM heap), and state backups (checkpoints) go to a distributed file system, e.g., HDFS. Therefore, is there anyways  that I can free the state in memory and directly use the state on s3 ? Sent from Mail for Windows 


回复:flink table API使用

2022-09-05 Thread 小昌同学
感谢感谢大佬指点


| |
应聘者昌呈呈
|
|
ccc0606fight...@163.com
|
 回复的原邮件 
| 发件人 | Xuyang |
| 发送日期 | 2022年9月6日 00:03 |
| 收件人 |  |
| 主题 | Re:flink table API使用 |
Hi, 可以类似这样写 “.filter($("a").isGreater(10)) "。 更多的使用方法可以参考[1]




[1] 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java




--

Best!
Xuyang





在 2022-09-05 20:53:03,"小昌同学"  写道:


Table result = kafka_item.groupBy($("trans_number"))   
.select($("trans_number"),$("sales_amount").sum().as("sum_amount"))
.filter($("sum_amount "));
各位大佬  请教一个问题  我这边想通过flink table API 达到这样一个效果:
根据trans_number进行分组  然后对另一个字段进行sum计算  然后我想最后进行过滤的时候 过滤出来这个sum值大于100的
我这个后续怎么使用API啊  这个filter算子咋用呀
| |
小昌
|
|
ccc0606fight...@163.com
|


退订

2022-09-05 Thread 勇Steve 金


退订
Sent from my iPhone

Re: Deploying Jobmanager on k8s as a Deployment

2022-09-05 Thread Gyula Fóra
Hi!

The operator supports both Flink native and standalone deployment modes and
in both cases the JM is deployed as k8s Deployment.

During upgrade Flink/operator deletes the deployment after savepoint and
waits for termination before it creates a new one with the updated spec.

Cheers,
Gyula

On Mon, 5 Sep 2022 at 07:41, Austin Cawley-Edwards 
wrote:

> Hey Marco,
>
> Unfortunately there is no built in k8s API that models an application mode
> JM exactly but Deployments should be fine, in general. As Gyula notes,
> where they can be difficult is during application upgrades as Deployments
> never let their pods exit, even if successful, so there is no way to stop
> the cluster gracefully.
>
> Is stopping your application with a savepoint and redeploying a workable
> solution for image upgrades? In this way a Job could still be used.
>
>
> @Gyula, how are JMs handled in the operator? Job, Deployment, or something
> custom?
>
>
> Best,
> Austin
>
>
>
> On Mon, Sep 5, 2022 at 6:15 AM Gyula Fóra  wrote:
>
>> You can use deployments of course , the operator and native k8s
>> integration does exactly that.
>>
>> Even then job updates can be tricky so I believe you are much better off
>> with the operator.
>>
>> Gyula
>>
>> On Sun, 4 Sep 2022 at 11:11, marco andreas 
>> wrote:
>>
>>> Hello,
>>>
>>> Thanks for the response, I will take a look at it.
>>>
>>> But if we aren't able to use the flink operator due to technical
>>> constraints is it possible to deploy the JM as deployment without any
>>> consequences that I am not aware of?
>>>
>>> Sincerely,
>>>
>>> Le sam. 3 sept. 2022 à 23:27, Gyula Fóra  a
>>> écrit :
>>>
 Hi!
 You should check out the Flink Kubernetes Operator. I think that covers
 all your needs .

 https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/

 Cheers,
 Gyula

 On Sat, 3 Sep 2022 at 13:45, marco andreas 
 wrote:

>
> We are deploying a flink application cluster on k8S. Following the
> official documentation the JM is deployed As a job resource , however we
> are deploying a long running flink job that is not supposed to be
> terminated and also we need to update the image of the flink job.
>
>  The problem is that the job is an immutable resource, we cant update
> it.
>
> So I'm wondering if it's possible to use a deployment resource for the
> jobmanager and if there will be any side effects or repercussions.
>
> Thanks,
>



Re: Flink kafka producer using the same transactional id twice?

2022-09-05 Thread Alexander Fedulov
>
> I then noticed this message showing up twice and thought "this does not
> look right":

That's fine, this is how the sink works (see the comment here:
KafkaWriter.java#L294-L301

 )

There are timeouts on the Kafka side that purge the transactions if they do
not finish on time. The default value is 15 minutes, which also matches the
interval between the log entries you provided. Try to:
1) increase the timeouts
2) check that you checkpoint reasonably frequently in relation to the
existing timeout and that your checkpoints actually complete fast enough
3) make sure that your job is operating correctly without being
backpressured. If you see backpressure in Flink UI frequently, try to
locate the bottleneck, and in the meantime check if enabling unaligned
checkpoints helps.

Here are some useful links:
https://bit.ly/3wVgZKk
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#caveats
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kafka/#fault-tolerance
https://youtu.be/bhcFfS1-eDY?t=410
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/ops/state/checkpoints/#unaligned-checkpoints

Best,
Alexander Fedulov


On Mon, Sep 5, 2022 at 2:21 PM Sebastian Struss  wrote:

> Hi all,
>
> i am quite new to flink and kafka, so i might mix something up here.
> The situation is that we do have a flink application (1.14.5 with scala
> 2.12) running for a few hours to days and suddenly it stops working and
> can't publish to kafka anymore.
> I then noticed this message showing up twice and thought "this does not
> look right":
> > Created new transactional producer prefix-2-9447
> The second message timestamp seems to be the timestamp when the
> application doesn't publish to kafka properly anymore and when checkpoints
> are failing to be made.
> We also see this error message:
> > Producer attempted an operation with an old epoch. Either there is a
> newer producer with the same transactionalId, or the producer's transaction
> has been expired by the broker.
> Am i mistaken when i think that this should be impossible when flink
> handles the sinks?
> I would think that due to the checkpointing and due to us giving flink the
> control about the output, it should never run into this situation.
> We are using an exactly once delivery garantee for kafka and set the flink
> sink parallelism to 4.
> Also we are using the kubernetes operator of flink in version 1.1.0.
> Any hints on what to check/change are highly appreciated.
>
> best,
> Sebastian S.
> [image: image.png]
>


Re: Where will the state be stored in the taskmanager when using rocksdbstatebend?

2022-09-05 Thread Alexander Fedulov
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#state-backend-rocksdb-localdir
Make sure to use a local SSD disk (not NFS/EBS).

Best,
Alexander Fedulov

On Mon, Sep 5, 2022 at 7:24 PM hjw <1010445...@qq.com> wrote:

> The EmbeddedRocksDBStateBackend holds in-flight data in a RocksDB
>  database that is (per default) stored in the
> TaskManager local data directories.
> Which path does local data directories store RocksDB database in
> TaskManager point to in operating system?
> If the job state is very large, I think I should  take some measures to
> deal with it.(mount a volume for local data directories store RocksDB
> database etc...)
>
> thx.
>
> --
> Best,
> Hjw
>


Where will the state be stored in the taskmanager when using rocksdbstatebend??

2022-09-05 Thread hjw
The EmbeddedRocksDBStateBackend holds in-flight data in 
aRocksDBdatabase that is (per default) stored in the TaskManager 
local data directories.
Which path does local data directories store RocksDB database in 
TaskManagerpoint to in operating system?
If the job state is very large, I think I should take some measures to 
deal with it.(mount a volume for local data directories store RocksDB database 
etc...)


thx.



Best,
Hjw

?????? flink ci build run longer than the maximum time of 310 minutes.

2022-09-05 Thread hjw
Hi,Matthias
I have solved this problem as you say.The link to the PR [1] .thank you.


[1]https://github.com/apache/flink/pull/20671



Best,
Hjw







----
??: 
   "Matthias Pohl"  
  
https://github.com/apache/flink/tree/release-1.15


On Sat, Sep 3, 2022 at 10:18 AM hjw <1010445...@qq.com wrote:

Hi,Matthias
The ci build Error in e2e_1_ci job:
Sep 0211:01:51 ##[group]Top 15 biggest directories in terms of used disk space
Sep 02 11:01:52 Searching for .dump, .dumpstream and related files in 
'/home/vsts/work/1/s'
dmesg: read kernel buffer failed: Operation not permitted
Sep 02 11:01:53 No taskexecutor daemon to stop on host fv-az158-417.
Sep 02 11:01:53 No standalonesession daemon to stop on host fv-az158-417.
Sep 02 11:10:27 The command 'docker build --no-cache --network=host -t 
test_docker_embedded_job dev/test_docker_embedded_job-debian' (pid: 188432) did 
not finish after 600 seconds.
/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/common.sh: line 900: 
kill: (188432) - No such process
Sep 02 11:11:06 The command 'docker build --no-cache --network=host -t 
test_docker_embedded_job dev/test_docker_embedded_job-debian' (pid: 188484) did 
not finish after 600 seconds.
/home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/common.sh: line 900: 
kill: (188484) - No such process



I think the issue applies to my case.
However, I have submited some commit to my fork repo and create a pr.The 
pr has not been merged in to flink repo. My fork repo status:This 
branch is4 commits ahead,11 commits behindapache:release-1.15.


When I rebase the branch from upstream and push to my fork repo, the11 
commitsbehindapache:release-1.15 also appear in my pr change files. 
How can I handle this situation? thx.




Best,
Hjw







----
??: 
   "Matthias Pohl"  
  
https://issues.apache.org/jira/browse/FLINK-29161;


On Fri, Sep 2, 2022 at 10:51 AM Martijn Visser https://go.microsoft.com/fwlink/?linkid=2077134


How to solve this problem?
How to triigle the ci building again?
thx.

Re:flink table API使用

2022-09-05 Thread Xuyang
Hi, 可以类似这样写 “.filter($("a").isGreater(10)) "。 更多的使用方法可以参考[1]




[1] 
https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/GettingStartedExample.java




--

Best!
Xuyang





在 2022-09-05 20:53:03,"小昌同学"  写道:
>
>
>Table result = kafka_item.groupBy($("trans_number"))   
>.select($("trans_number"),$("sales_amount").sum().as("sum_amount"))
>   .filter($("sum_amount "));
>各位大佬  请教一个问题  我这边想通过flink table API 达到这样一个效果:
>根据trans_number进行分组  然后对另一个字段进行sum计算  然后我想最后进行过滤的时候 过滤出来这个sum值大于100的
>我这个后续怎么使用API啊  这个filter算子咋用呀
>| |
>小昌
>|
>|
>ccc0606fight...@163.com
>|


Slow Tests in Flink 1.15

2022-09-05 Thread David Jost
Hi,

we were going to upgrade our application from Flink 1.14.4 to Flink 1.15.2, 
when we noticed, that all our job tests, using a MiniClusterWithClientResource, 
are multiple times slower in 1.15 than before in 1.14. I, unfortunately, have 
not found mentions in that regard in the changelog or documentation. The 
slowdown is rather extreme I hope to find a solution to this. I saw it 
mentioned once in the mailing list, but there was no (public) outcome to it.

I would appreciate any help on this. Thank you in advance.

Best
 David

smime.p7s
Description: S/MIME cryptographic signature


Re: [Flink 1.15.1 - Application mode native k8s Exception] - Exception occurred while acquiring lock 'ConfigMapLock

2022-09-05 Thread Tamir Sagi
Hey Yang,

The flink-conf.yaml submitted to the cluster does not contain 
"kubernetes.config.file" at all.
In addition, I verified flink config maps under cluster's namespace do not 
contain "kubernetes.config.file".

In addition, we also noticed the following exception (appears to happen 
sporadically)

2022-09-04T21:06:35,231][Error] {} [i.f.k.c.e.l.LeaderElector]: Exception 
occurred while acquiring lock 'ConfigMapLock: dev-0-flink-jobs - 
data-agg-events-insertion-cluster-config-map 
(fa3dbbc5-1753-46cd-afaf-0baf8ff0947f)'
io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LockException:
 Unable to create ConfigMapLock

Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure 
executing: POST at: 
https://172.20.0.1/api/v1/namespaces/dev-0-flink-jobs/configmaps. Message: 
configmaps "data-agg-events-insertion-cluster-config-map" already exists.

Log file is enclosed.

Thanks,
Tamir.



From: Yang Wang 
Sent: Monday, September 5, 2022 3:03 PM
To: Tamir Sagi 
Cc: user@flink.apache.org ; Lihi Peretz 

Subject: Re: [Flink 1.15.1 - Application mode native k8s Exception] - Exception 
occurred while acquiring lock 'ConfigMapLock


EXTERNAL EMAIL


Could you please check whether the "kubernetes.config.file" is configured to 
/opt/flink/.kube/config in the Flink configmap?
It should be removed before creating the Flink configmap.

Best,
Yang

Tamir Sagi mailto:tamir.s...@niceactimize.com>> 
于2022年9月4日周日 18:08写道:
Hey All,

We recently updated to Flink 1.15.1. We deploy stream cluster in Application 
mode in Native K8S.(Deployed on Amazon EKS).  The cluster is configured with 
Kubernetes HA Service, Minimum 3 replicas of Job manager and pod-template which 
is configured with topologySpreadConstraints to enable distribution across 
different availability zones.
HA storage directory is on S3.

The cluster is deployed and running properly, however, after a while we noticed 
the following exception in Job manager instance(the log file is enclosed)

2022-09-04T02:05:33,097][Error] {} [i.f.k.c.e.l.LeaderElector]: Exception 
occurred while acquiring lock 'ConfigMapLock: dev-0-flink-jobs - 
data-agg-events-insertion-cluster-config-map 
(b6da2ae2-ad2b-471c-801e-ea460a348fab)'
io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for 
kind: [ConfigMap]  with name: [data-agg-events-insertion-cluster-config-map]  
in namespace: [dev-0-flink-jobs]  failed.
Caused by: java.io.FileNotFoundException: /opt/flink/.kube/config (No such file 
or directory)
  at java.io.FileInputStream.open0(Native Method) ~[?:?]
  at java.io.FileInputStream.open(Unknown Source) ~[?:?]
  at java.io.FileInputStream.(Unknown Source) ~[?:?]
  at 
org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.dataformat.yaml.YAMLFactory.createParser(YAMLFactory.java:354)
 ~[flink-dist-1.15.1.jar:1.15.1]
  at 
org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.dataformat.yaml.YAMLFactory.createParser(YAMLFactory.java:15)
 ~[flink-dist-1.15.1.jar:1.15.1]
  at 
org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3494)
 ~[flink-dist-1.15.1.jar:1.15.1]
  at 
io.fabric8.kubernetes.client.internal.KubeConfigUtils.parseConfig(KubeConfigUtils.java:42)
 ~[flink-dist-1.15.1.jar:1.15.1]
  at 
io.fabric8.kubernetes.client.utils.TokenRefreshInterceptor.intercept(TokenRefreshInterceptor.java:44)
 ~[flink-dist-1.15.1.jar:1.15.1]
  at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
 ~[flink-dist-1.15.1.jar:1.15.1]
  at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
 ~[flink-dist-1.15.1.jar:1.15.1]
  at 
io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:68)
 ~[flink-dist-1.15.1.jar:1.15.1]
  at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
 ~[flink-dist-1.15.1.jar:1.15.1]
  at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
 ~[flink-dist-1.15.1.jar:1.15.1]
  at 
io.fabric8.kubernetes.client.utils.HttpClientUtils.lambda$createApplicableInterceptors$6(HttpClientUtils.java:290)
 ~[flink-dist-1.15.1.jar:1.15.1]
  at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
 ~[flink-dist-1.15.1.jar:1.15.1]
  at 
org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
 ~[flink-dist-1.15.1.jar:1.15.1]
  at 
org.apache.flink.kubernetes.shaded.okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229)
 ~[flink-dist-1.15.1.jar:1.15.1]
  at 
org.apache.flink.kubernetes.shaded.okhttp3.RealCall.execute(RealCall.java:81) 

Re: Deploying Jobmanager on k8s as a Deployment

2022-09-05 Thread Austin Cawley-Edwards
Hey Marco,

Unfortunately there is no built in k8s API that models an application mode
JM exactly but Deployments should be fine, in general. As Gyula notes,
where they can be difficult is during application upgrades as Deployments
never let their pods exit, even if successful, so there is no way to stop
the cluster gracefully.

Is stopping your application with a savepoint and redeploying a workable
solution for image upgrades? In this way a Job could still be used.


@Gyula, how are JMs handled in the operator? Job, Deployment, or something
custom?


Best,
Austin



On Mon, Sep 5, 2022 at 6:15 AM Gyula Fóra  wrote:

> You can use deployments of course , the operator and native k8s
> integration does exactly that.
>
> Even then job updates can be tricky so I believe you are much better off
> with the operator.
>
> Gyula
>
> On Sun, 4 Sep 2022 at 11:11, marco andreas 
> wrote:
>
>> Hello,
>>
>> Thanks for the response, I will take a look at it.
>>
>> But if we aren't able to use the flink operator due to technical
>> constraints is it possible to deploy the JM as deployment without any
>> consequences that I am not aware of?
>>
>> Sincerely,
>>
>> Le sam. 3 sept. 2022 à 23:27, Gyula Fóra  a écrit :
>>
>>> Hi!
>>> You should check out the Flink Kubernetes Operator. I think that covers
>>> all your needs .
>>>
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
>>>
>>> Cheers,
>>> Gyula
>>>
>>> On Sat, 3 Sep 2022 at 13:45, marco andreas 
>>> wrote:
>>>

 We are deploying a flink application cluster on k8S. Following the
 official documentation the JM is deployed As a job resource , however we
 are deploying a long running flink job that is not supposed to be
 terminated and also we need to update the image of the flink job.

  The problem is that the job is an immutable resource, we cant update
 it.

 So I'm wondering if it's possible to use a deployment resource for the
 jobmanager and if there will be any side effects or repercussions.

 Thanks,

>>>


flink table API使用

2022-09-05 Thread 小昌同学


Table result = kafka_item.groupBy($("trans_number"))   
.select($("trans_number"),$("sales_amount").sum().as("sum_amount"))
   .filter($("sum_amount "));
各位大佬  请教一个问题  我这边想通过flink table API 达到这样一个效果:
根据trans_number进行分组  然后对另一个字段进行sum计算  然后我想最后进行过滤的时候 过滤出来这个sum值大于100的
我这个后续怎么使用API啊  这个filter算子咋用呀
| |
小昌
|
|
ccc0606fight...@163.com
|

Flink kafka producer using the same transactional id twice?

2022-09-05 Thread Sebastian Struss
Hi all,

i am quite new to flink and kafka, so i might mix something up here.
The situation is that we do have a flink application (1.14.5 with scala
2.12) running for a few hours to days and suddenly it stops working and
can't publish to kafka anymore.
I then noticed this message showing up twice and thought "this does not
look right":
> Created new transactional producer prefix-2-9447
The second message timestamp seems to be the timestamp when the application
doesn't publish to kafka properly anymore and when checkpoints are failing
to be made.
We also see this error message:
> Producer attempted an operation with an old epoch. Either there is a
newer producer with the same transactionalId, or the producer's transaction
has been expired by the broker.
Am i mistaken when i think that this should be impossible when flink
handles the sinks?
I would think that due to the checkpointing and due to us giving flink the
control about the output, it should never run into this situation.
We are using an exactly once delivery garantee for kafka and set the flink
sink parallelism to 4.
Also we are using the kubernetes operator of flink in version 1.1.0.
Any hints on what to check/change are highly appreciated.

best,
Sebastian S.
[image: image.png]


Re: [Flink 1.15.1 - Application mode native k8s Exception] - Exception occurred while acquiring lock 'ConfigMapLock

2022-09-05 Thread Yang Wang
Could you please check whether the "kubernetes.config.file" is configured
to /opt/flink/.kube/config in the Flink configmap?
It should be removed before creating the Flink configmap.

Best,
Yang

Tamir Sagi  于2022年9月4日周日 18:08写道:

> Hey All,
>
> We recently updated to Flink 1.15.1. We deploy stream cluster in
> Application mode in Native K8S.(Deployed on Amazon EKS).  The cluster is
> configured with Kubernetes HA Service, Minimum 3 replicas of Job manager
> and pod-template which is configured with topologySpreadConstraints to
> enable distribution across different availability zones.
> HA storage directory is on S3.
>
> The cluster is deployed and running properly, however, after a while we
> noticed the following exception in Job manager instance(the log file is
> enclosed)
>
> 2022-09-04T02:05:33,097][Error] {} [i.f.k.c.e.l.LeaderElector]: Exception
> occurred while acquiring lock 'ConfigMapLock: dev-0-flink-jobs -
> data-agg-events-insertion-cluster-config-map
> (b6da2ae2-ad2b-471c-801e-ea460a348fab)'
> io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]
>  for kind: [ConfigMap]  with name:
> [data-agg-events-insertion-cluster-config-map]  in namespace:
> [dev-0-flink-jobs]  failed.
> Caused by: java.io.FileNotFoundException: /opt/flink/.kube/config (No such
> file or directory)
> at java.io.FileInputStream.open0(Native Method) ~[?:?]
> at java.io.FileInputStream.open(Unknown Source) ~[?:?]
> at java.io.FileInputStream.(Unknown Source) ~[?:?]
> at
> org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.dataformat.yaml.YAMLFactory.createParser(YAMLFactory.java:354)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.dataformat.yaml.YAMLFactory.createParser(YAMLFactory.java:15)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3494)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> io.fabric8.kubernetes.client.internal.KubeConfigUtils.parseConfig(KubeConfigUtils.java:42)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> io.fabric8.kubernetes.client.utils.TokenRefreshInterceptor.intercept(TokenRefreshInterceptor.java:44)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> io.fabric8.kubernetes.client.utils.ImpersonatorInterceptor.intercept(ImpersonatorInterceptor.java:68)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> io.fabric8.kubernetes.client.utils.HttpClientUtils.lambda$createApplicableInterceptors$6(HttpClientUtils.java:290)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> org.apache.flink.kubernetes.shaded.okhttp3.RealCall.execute(RealCall.java:81)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.retryWithExponentialBackoff(OperationSupport.java:585)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:558)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleResponse(OperationSupport.java:521)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleGet(OperationSupport.java:488)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> io.fabric8.kubernetes.client.dsl.base.OperationSupport.handleGet(OperationSupport.java:470)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.handleGet(BaseOperation.java:830)
> ~[flink-dist-1.15.1.jar:1.15.1]
> at
> io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:200)
> ~[flink-dist-1.15.1.jar:1.15.1]
> ... 12 more
>
> Why is Kube/config needed in Native K8s,  should not service account be
> checked instead?
>
> Are we missing something?
>
> Thanks,
> Tamir.
>
>
> Confidentiality: This communication and any attachments are intended for
> the above-named persons only and may be confidential 

Re: How to open a Prometheus metrics port on the rest service when using the Kubernetes operator?

2022-09-05 Thread Yang Wang
I do not think we could add an additional port to the rest service since it
is created by Flink internally.

Actually, I do not suggest scrapping the metrics from rest service.
Instead, the port in the pod needs to be used.
Because the metrics might not work correctly if multiple JobManagers are
running.


Best,
Yang

Javier Vegas  于2022年9月5日周一 15:00写道:

> What I would need is to set
>
> ports:
>
> - name: metrics
>
>   port: 
>
>   protocol: TCP
>
>
>
> in the generated YAML fir the appname-rest service which properly
> aggregates the metrics from the pods, but I can't not figure out either
> from the job deployment file or modifying the operator templates in the
> Helm chart. Any way I can modify the ports in the Flink rest service?
>
>
> Thanks,
>
>
> Javier Vegas
>
>
>
> El dom, 4 sept 2022 a las 1:59, Javier Vegas ()
> escribió:
>
>> Hi, Biao!
>>
>> Thanks for the fast response! Setting that in the podTemplate opens the
>> metrics port in the pods, but unfortunately not on the rest service. Not
>> sure if that is standard procedure, but my Prometheus setup scraps the
>> metrics port on services but not pods. On my previous non-operator
>> standalone setup, the metrics port on the service was aggregating all the
>> pods metrics and then Prometheus was scrapping that, so I was trying to
>> reproduce that by opening the port on the rest service.
>>
>>
>>
>> El dom, 4 sept 2022 a las 1:03, Geng Biao ()
>> escribió:
>>
>>> Hi Javier,
>>>
>>>
>>>
>>> You can use podTemplate to expose the port in the flink containers.
>>>
>>> Here is a snippet:
>>>
>>> spec:
>>>
>>>   flinkVersion: v1_15
>>>
>>>   flinkConfiguration:
>>>
>>> state.savepoints.dir: file:///flink-data/flink-savepoints
>>>
>>> state.checkpoints.dir: file:///flink-data/flink-checkpoints
>>>
>>> *metrics.reporter.prom.factory.class:
>>> org.apache.flink.metrics.prometheus.PrometheusReporterFactory*
>>>
>>>   serviceAccount: flink
>>>
>>>   podTemplate:
>>>
>>> metadata:
>>>
>>>   annotations:
>>>
>>> prometheus.io/path: /metrics
>>>
>>> prometheus.io/port: "9249"
>>>
>>> prometheus.io/scrape: "true"
>>>
>>> spec:
>>>
>>>   serviceAccount: flink
>>>
>>>   containers:
>>>
>>> - name: flink-main-container
>>>
>>>   volumeMounts:
>>>
>>> - mountPath: /flink-data
>>>
>>>   name: flink-volume
>>>
>>>  * ports:*
>>>
>>> *- containerPort: 9249*
>>>
>>> *  name: metrics*
>>>
>>> *  protocol: TCP*
>>>
>>>   volumes:
>>>
>>> - name: flink-volume
>>>
>>>   emptyDir: {}
>>>
>>>
>>>
>>> The bold line are about how to specify the metric reporter and expose
>>> the metric. The annotations are not required if you use PodMonitor or
>>> ServiceMonitor. Hope it can help!
>>>
>>>
>>>
>>> Best,
>>>
>>> Biao Geng
>>>
>>>
>>>
>>> *From: *Javier Vegas 
>>> *Date: *Sunday, September 4, 2022 at 10:19 AM
>>> *To: *user 
>>> *Subject: *How to open a Prometheus metrics port on the rest service
>>> when using the Kubernetes operator?
>>>
>>> I am migrating my Flink app from standalone Kubernetes to the Kubernetes
>>> operator, it is going well but I ran into a problem, I can not figure out
>>> how to open a Prometheus metrics port in the rest-service to collect all my
>>> custom metrics from the task managers. Note that this is different from the
>>> instructions to "How to Enable Prometheus"
>>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging/#how-to-enable-prometheus-example
>>> that example is to collect the operator pod metrics, but what I am trying
>>> to do is open a port on the rest service to make my job metrics available
>>> to Prometheus.
>>>
>>>
>>>
>>> Thanks,
>>>
>>>
>>>
>>> Javier Vegas
>>>
>>


Re: [E] Re: Kubernetes operator expose UI rest service as NodePort instead of default clusterIP

2022-09-05 Thread Yang Wang
I think we have no concrete reason to always override the
"REST_SERVICE_EXPOSED_TYPE" to "ClusterIP".
It was introduced to fix the default value for releases before 1.15. And I
believe we need to respect the user configured values.

Best,
Yang

Vignesh Kumar Kathiresan  于2022年9月3日周六 05:07写道:

> Jacob,
> Thanks, I checked it out and didn't work. The config overriding to
> ClusterIP part
> 
>  we
> were talking about.  So looks like its always being set to ClusterIP now.
>
> Yang,
> Having the alb target type as ip works with a ClusterIP type service.
>
> On Fri, Sep 2, 2022 at 8:18 AM Jeesmon Jacob  wrote:
>
>> I remember testing the operator with the rest service exposed as
>> NodePort. NodePort requires rbac.nodeRoules.create: true (default is false)
>> in values.yaml. Maybe you missed that?
>>
>>
>> https://github.com/apache/flink-kubernetes-operator/blob/release-1.1/helm/flink-kubernetes-operator/values.yaml#L34-L38
>> 
>>
>> On Thu, Sep 1, 2022 at 11:45 PM Vignesh Kumar Kathiresan via user <
>> user@flink.apache.org> wrote:
>>
>>> Hi Yang,
>>>
>>> Yeah, I gathered that from the operator code soon after posting. I am
>>> using the aws alb ingress class [1]. There under considerations it is
>>> mentioned if the alb target type is "instance" which is the default traffic
>>> mode, the kubernetes service type has to be nodeport or loadbalancer.
>>>
>>> Also alb target if changed to "ip" might work. Let me try that. I
>>> believe there should be a reason to always override the
>>> "REST_SERVICE_EXPOSED_TYPE" to "ClusterIP".
>>>
>>> [1] https://docs.aws.amazon.com/eks/latest/userguide/alb-ingress.html
>>> 
>>>
>>> On Thu, Sep 1, 2022 at 7:01 PM Yang Wang  wrote:
>>>
 I am afraid the current flink-kubernetes-operator always overrides the
 "REST_SERVICE_EXPOSED_TYPE" to "ClusterIP".
 Could you please share why the ingress[1] could not meet your
 requirements? Compared with NodePort, I think it is a more graceful
 implementation.

 [1].
 https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-release-1.1/docs/operations/ingress/
 

 Best,
 Yang

 Vignesh Kumar Kathiresan via user  于2022年9月2日周五
 04:57写道:

> Hello Flink community,
>
> Need some help with "flink kubernetes operator" based cluster setup.
>
> My flink cluster is set up using the flink-kubernetes-operator in AWS
> EKS. The required resources(deployments, pods, services, configmaps etc)
> are created as expected. But the service "*-rest" is created as a
> "ClusterIP" type. I would want it created as a NodePort type.
>
> I want to expose the UI to external viewing via ingress using the aws
> alb class. This aws-load balancer-controller requires my service to be of
> type NodePort.
>
> I have tried a few options but the service is always created as
> ClusterIP.
> 1) In the FlinkDeployment CRD, under spec.flinkConfiguration
> added kubernetes.rest-service.exposed.type: "NodePort"
> 2) In the operator helm values.yaml
>
> defaultConfiguration:
>   create: true
>   # Set append to false to replace configuration files
>   append: true
>   flink-conf.yaml: |+
> # Flink Config Overrides
> kubernetes.rest-service.exposed.type: NodePort
>
> Neither option gives me a NodePort type service for the UI.
> Any suggestions?
>
>
>
>
>
>
>
>


Re: [NOTICE] Switch docker image base to Eclipse Temurin

2022-09-05 Thread Chesnay Schepler

* September 7th

On 05/09/2022 11:27, Chesnay Schepler wrote:
On Wednesday, September 9th, the Flink 1.14.5/1.15.2 Docker images 
will switch bases


FROM openjdk:8/11-jar (Debian-based)
TO eclipse-temurin:8/11-jre-jammy (Ubuntu-based)

due to the deprecation of the OpenJDK images.

Users that customized the images are advised to check for breaking 
changes.


The source Dockerfile for the new images is available at 
https://github.com/apache/flink-docker/tree/4794f9425513fb4c0b55ec1efd629e8eb7e5d8c5. 






Re: Deploying Jobmanager on k8s as a Deployment

2022-09-05 Thread Gyula Fóra
You can use deployments of course , the operator and native k8s integration
does exactly that.

Even then job updates can be tricky so I believe you are much better off
with the operator.

Gyula

On Sun, 4 Sep 2022 at 11:11, marco andreas 
wrote:

> Hello,
>
> Thanks for the response, I will take a look at it.
>
> But if we aren't able to use the flink operator due to technical
> constraints is it possible to deploy the JM as deployment without any
> consequences that I am not aware of?
>
> Sincerely,
>
> Le sam. 3 sept. 2022 à 23:27, Gyula Fóra  a écrit :
>
>> Hi!
>> You should check out the Flink Kubernetes Operator. I think that covers
>> all your needs .
>>
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/
>>
>> Cheers,
>> Gyula
>>
>> On Sat, 3 Sep 2022 at 13:45, marco andreas 
>> wrote:
>>
>>>
>>> We are deploying a flink application cluster on k8S. Following the
>>> official documentation the JM is deployed As a job resource , however we
>>> are deploying a long running flink job that is not supposed to be
>>> terminated and also we need to update the image of the flink job.
>>>
>>>  The problem is that the job is an immutable resource, we cant update it.
>>>
>>> So I'm wondering if it's possible to use a deployment resource for the
>>> jobmanager and if there will be any side effects or repercussions.
>>>
>>> Thanks,
>>>
>>


[NOTICE] Switch docker image base to Eclipse Temurin

2022-09-05 Thread Chesnay Schepler
On Wednesday, September 9th, the Flink 1.14.5/1.15.2 Docker images will 
switch bases


FROM openjdk:8/11-jar (Debian-based)
TO eclipse-temurin:8/11-jre-jammy (Ubuntu-based)

due to the deprecation of the OpenJDK images.

Users that customized the images are advised to check for breaking changes.

The source Dockerfile for the new images is available at 
https://github.com/apache/flink-docker/tree/4794f9425513fb4c0b55ec1efd629e8eb7e5d8c5. 



Re: flink ci build run longer than the maximum time of 310 minutes.

2022-09-05 Thread Matthias Pohl via user
Usually, it would be more helpful to provide a link to the PR to get a
better picture of the problem. I'm not 100% sure whether I grasp what's
wrong.

It looks like your branch is based on apache/flink:release-1.15 [1].
Therefore, you should fetch the most recent version from upstream and then
do a git rebase upstream/release-1.15. This will put your 4 commits which
you've added to your local branch so far "on top" of everything that is
already part of upstream/release-1.15. This should resolve your branch
being 11 commits behind the 1.15 release branch. Force-pushing the changes
in your local branch to your remote repo (your fork) will update the PR.

Keep in mind that you have to specify the right base branch in your Github
PR (pointing to the 1.15 release branch in your case) as well to have the
right diff.

I hope that helps. Best,
Matthias

[1] https://github.com/apache/flink/tree/release-1.15

On Sat, Sep 3, 2022 at 10:18 AM hjw <1010445...@qq.com> wrote:

> Hi,Matthias
> The ci build Error in  e2e_1_ci job:
> Sep 0211:01:51 ##[group]Top 15 biggest directories in terms of used disk
> space
> Sep 02 11:01:52 Searching for .dump, .dumpstream and related files in
> '/home/vsts/work/1/s'
> dmesg: read kernel buffer failed: Operation not permitted
> Sep 02 11:01:53 No taskexecutor daemon to stop on host fv-az158-417.
> Sep 02 11:01:53 No standalonesession daemon to stop on host fv-az158-417.
> Sep 02 11:10:27 The command 'docker build --no-cache --network=host -t
> test_docker_embedded_job dev/test_docker_embedded_job-debian' (pid: 188432)
> did not finish after 600 seconds.
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/common.sh: line
> 900: kill: (188432) - No such process
> Sep 02 11:11:06 The command 'docker build --no-cache --network=host -t
> test_docker_embedded_job dev/test_docker_embedded_job-debian' (pid: 188484)
> did not finish after 600 seconds.
> /home/vsts/work/1/s/flink-end-to-end-tests/test-scripts/common.sh: line
> 900: kill: (188484) - No such process
>
> I think the issue  applies to  my case.
> However, I have submited some commit to my fork repo and create a pr.The
> pr  has not been merged in to flink repo. My fork repo status :This
> branch is 4 commits ahead
> 
> , 11 commits behind
> 
>  apache:release-1.15.
>
> When I rebase the branch from upstream and push to my fork repo, the 11
> commits
> 
>  behind
> 
>  apache:release-1.15
> also appear in my pr change files. How can I handle this situation? thx.
>
> --
> Best,
> Hjw
>
>
>
> -- 原始邮件 --
> *发件人:* "Matthias Pohl" ;
> *发送时间:* 2022年9月2日(星期五) 晚上7:29
> *收件人:* "Martijn Visser";
> *抄送:* "hjw"<1010445...@qq.com>;"user";
> *主题:* Re: flink ci build run longer than the maximum time of 310 minutes.
>
> Not sure whether that applies to your case, but there was a recent issue
> [1] where the e2e_1_ci job ran into a timeout. If that's what you were
> observing, rebasing your branch might help.
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-29161
>
> On Fri, Sep 2, 2022 at 10:51 AM Martijn Visser 
> wrote:
>
>> You can ask the Flinkbot to run again by typing as comment
>>
>> @flinkbot run azure
>>
>> Best regards,
>>
>> Martijn
>>
>> Op vr 2 sep. 2022 om 08:40 schreef hjw <1010445...@qq.com>:
>>
>>> I commit a pr to Flink Github .
>>> A error happened in building ci.
>>> [error]The job running on agent Azure Pipelines 6 ran longer than the
>>> maximum time of 310 minutes. For more information, see
>>> https://go.microsoft.com/fwlink/?linkid=2077134
>>>
>>> How to solve this problem?
>>> How to triigle the ci building again?
>>> thx.
>>>
>>


Re: How to open a Prometheus metrics port on the rest service when using the Kubernetes operator?

2022-09-05 Thread Javier Vegas
What I would need is to set

ports:

- name: metrics

  port: 

  protocol: TCP



in the generated YAML fir the appname-rest service which properly
aggregates the metrics from the pods, but I can't not figure out either
from the job deployment file or modifying the operator templates in the
Helm chart. Any way I can modify the ports in the Flink rest service?


Thanks,


Javier Vegas



El dom, 4 sept 2022 a las 1:59, Javier Vegas () escribió:

> Hi, Biao!
>
> Thanks for the fast response! Setting that in the podTemplate opens the
> metrics port in the pods, but unfortunately not on the rest service. Not
> sure if that is standard procedure, but my Prometheus setup scraps the
> metrics port on services but not pods. On my previous non-operator
> standalone setup, the metrics port on the service was aggregating all the
> pods metrics and then Prometheus was scrapping that, so I was trying to
> reproduce that by opening the port on the rest service.
>
>
>
> El dom, 4 sept 2022 a las 1:03, Geng Biao ()
> escribió:
>
>> Hi Javier,
>>
>>
>>
>> You can use podTemplate to expose the port in the flink containers.
>>
>> Here is a snippet:
>>
>> spec:
>>
>>   flinkVersion: v1_15
>>
>>   flinkConfiguration:
>>
>> state.savepoints.dir: file:///flink-data/flink-savepoints
>>
>> state.checkpoints.dir: file:///flink-data/flink-checkpoints
>>
>> *metrics.reporter.prom.factory.class:
>> org.apache.flink.metrics.prometheus.PrometheusReporterFactory*
>>
>>   serviceAccount: flink
>>
>>   podTemplate:
>>
>> metadata:
>>
>>   annotations:
>>
>> prometheus.io/path: /metrics
>>
>> prometheus.io/port: "9249"
>>
>> prometheus.io/scrape: "true"
>>
>> spec:
>>
>>   serviceAccount: flink
>>
>>   containers:
>>
>> - name: flink-main-container
>>
>>   volumeMounts:
>>
>> - mountPath: /flink-data
>>
>>   name: flink-volume
>>
>>  * ports:*
>>
>> *- containerPort: 9249*
>>
>> *  name: metrics*
>>
>> *  protocol: TCP*
>>
>>   volumes:
>>
>> - name: flink-volume
>>
>>   emptyDir: {}
>>
>>
>>
>> The bold line are about how to specify the metric reporter and expose the
>> metric. The annotations are not required if you use PodMonitor or
>> ServiceMonitor. Hope it can help!
>>
>>
>>
>> Best,
>>
>> Biao Geng
>>
>>
>>
>> *From: *Javier Vegas 
>> *Date: *Sunday, September 4, 2022 at 10:19 AM
>> *To: *user 
>> *Subject: *How to open a Prometheus metrics port on the rest service
>> when using the Kubernetes operator?
>>
>> I am migrating my Flink app from standalone Kubernetes to the Kubernetes
>> operator, it is going well but I ran into a problem, I can not figure out
>> how to open a Prometheus metrics port in the rest-service to collect all my
>> custom metrics from the task managers. Note that this is different from the
>> instructions to "How to Enable Prometheus"
>> https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/operations/metrics-logging/#how-to-enable-prometheus-example
>> that example is to collect the operator pod metrics, but what I am trying
>> to do is open a port on the rest service to make my job metrics available
>> to Prometheus.
>>
>>
>>
>> Thanks,
>>
>>
>>
>> Javier Vegas
>>
>


??????flink-1.14.4 ??????????????????

2022-09-05 Thread kcz
??




----
??: 
   "kcz"
<573693...@qq.com;
:2022??9??5??(??) 11:50
??:"user-zh"