Re: Kubernetes operator and jobs with last-state upgrades

2022-11-17 Thread zhanghao.chen
Hi,

When you use last state upgrade, you simply delete the Flink deployment but 
keeping HA metadata where checkpoint info is stored, and the upgraded job would 
recover the state from the latest checkpoint.

Best,
Zhanghao Chen

From: Alexis Sarda-Espinosa 
Sent: Thursday, November 17, 2022 1:24
To: user 
Subject: Kubernetes operator and jobs with last-state upgrades

Hello,

I am doing some tests with the operator and, if I'm not mistaken, using 
last-state upgrade means that, when something is changed in the CR, no 
savepoint is taken and the pods are simply terminated. Is that a requirement 
from Flink HA? I would have thought last-state would still use savepoints for 
upgrade if the current status is stable.

Regards,
Alexis.


Re: Flink Operator in Golang?

2022-11-17 Thread zhanghao.chen
Hi Mark,


  1.  Directly quoting from 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-212%3A+Introduce+Flink+Kubernetes+Operator:

Main reasons for choosing Java over Go

  *   Direct access to Flink Client libraries for submitting, managing jobs and 
handling errors
  *   Most Flink developers have strong Java experience while there are only 
few Go experts
  *   Easier to integrate with existing build system and tooling
  *   Required k8s clients and tools for building an operator are also 
available in Java

  1.  unfortunately, Golang API is not supported yet


Best,
Zhanghao Chen

From: Mark Lee 
Sent: Thursday, November 17, 2022 16:16
To: user@flink.apache.org 
Subject: Flink Operator in Golang?


Hi,

  I found we already have Flink operator implemented by java. But I have two 
questions:

1. If we can implement Flink operator using golang? Is there some hidden 
difficult traps?

2. We can submit Java jar jobs or sql jobs, can we submit golang jobs?



Thank you.




Re: ExecutionMode in ExecutionConfig

2022-09-14 Thread zhanghao.chen
It's added in Flink 1.14: 
https://nightlies.apache.org/flink/flink-docs-master/zh/release-notes/flink-1.14/#expose-a-consistent-globaldataexchangemode.
 Not sure if there's a way to change this in 1.13

Best,
Zhanghao Chen

From: Hailu, Andreas 
Sent: Wednesday, September 14, 2022 23:38
To: zhanghao.c...@outlook.com ; 
user@flink.apache.org 
Subject: RE: ExecutionMode in ExecutionConfig


I can give this a try. Do you know which Flink version does this feature become 
available in?



ah



From: zhanghao.c...@outlook.com 
Sent: Wednesday, September 14, 2022 11:10 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: ExecutionMode in ExecutionConfig



Could you try setting ”execution.batch-shuffle-mode‘=‘ALL_EXCHANGES_PIPELINED’? 
Looks like the ExecutionMode in ExecutionConfig does not work for DataStream 
APIs.



The default shuffling behavior for a DataStream API in batch mode is 
'ALL_EXCHANGES_BLOCKING' where upstream and downstream tasks run subsequently. 
On the other hand, the pipelined mode will have upstream and downstream tasks 
run simultaneously.





Best,

Zhanghao Chen



From: Hailu, Andreas mailto:andreas.ha...@gs.com>>
Sent: Wednesday, September 14, 2022 21:37
To: zhanghao.c...@outlook.com 
mailto:zhanghao.c...@outlook.com>>; 
user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: RE: ExecutionMode in ExecutionConfig



Hi Zhanghao,



That seems different than what I’m referencing and one of my points of 
confusion – the documents refer to ExecutionMode as BATCH and STREAMING which 
is different than what the code refers to it as Runtime Mode e.g. 
env.setRuntimeMode(RuntimeExecutionMode.BATCH);



I’m referring to the ExecutionMode in the ExecutionConfig e.g. 
env.getConfig().setExecutionMode(ExecutionMode.BATCH)/ 
env.getConfig().setExecutionMode(ExecutionMode.PIPELINED). I’m not able to find 
documentation on this anywhere.







ah



From: zhanghao.c...@outlook.com 
mailto:zhanghao.c...@outlook.com>>
Sent: Wednesday, September 14, 2022 1:10 AM
To: Hailu, Andreas [Engineering] 
mailto:andreas.ha...@ny.email.gs.com>>; 
user@flink.apache.org
Subject: Re: ExecutionMode in ExecutionConfig



https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
 gives a comprehensive description on it

Execution Mode (Batch/Streaming) | Apache 
Flink

Execution Mode (Batch/Streaming) # The DataStream API supports different 
runtime execution modes from which you can choose depending on the requirements 
of your use case and the characteristics of your job. There is the “classic” 
execution behavior of the DataStream API, which we call STREAMING execution 
mode. This should be used for unbounded jobs that require continuous 
incremental ...

nightlies.apache.org





Best,

Zhanghao Chen



From: Hailu, Andreas mailto:andreas.ha...@gs.com>>
Sent: Wednesday, September 14, 2022 7:13
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: ExecutionMode in ExecutionConfig



Hello,



Is there somewhere I can learn more about the details of the effect of 
ExecutionMode in ExecutionConfig on a job? I am trying sort out some of the 
details as it seems to work differently between the DataStream API and 
deprecated DataSet API.



I’ve attached a picture of this job graph - I’m reading from a total of 3 data 
sources – the results of 2 are sent to CoGroup (orange rectangle), and the 
other has its records forwarded to a sink after some basic filter + map 
operations (red rectangle).



The DataSet API’s job graph has all of the operators RUNNING immediately as we 
desire. However, the DataStream API’s job graph only has the DataSource 
operators that are feeding into the CoGroup online, and the remaining operators 
wake up only when the 2 sources have completed. This winds up introducing a lot 
of latency in processing the batch.



Both of these are running in the same environment on the same data with 
identical ExecutionMode configs, just different APIs. I’m attempting to have 
the 

Re: ExecutionMode in ExecutionConfig

2022-09-14 Thread zhanghao.chen
Could you try setting ”execution.batch-shuffle-mode‘=‘ALL_EXCHANGES_PIPELINED’? 
Looks like the ExecutionMode in ExecutionConfig does not work for DataStream 
APIs.

The default shuffling behavior for a DataStream API in batch mode is 
'ALL_EXCHANGES_BLOCKING' where upstream and downstream tasks run subsequently. 
On the other hand, the pipelined mode will have upstream and downstream tasks 
run simultaneously.


Best,
Zhanghao Chen

From: Hailu, Andreas 
Sent: Wednesday, September 14, 2022 21:37
To: zhanghao.c...@outlook.com ; 
user@flink.apache.org 
Subject: RE: ExecutionMode in ExecutionConfig


Hi Zhanghao,



That seems different than what I’m referencing and one of my points of 
confusion – the documents refer to ExecutionMode as BATCH and STREAMING which 
is different than what the code refers to it as Runtime Mode e.g. 
env.setRuntimeMode(RuntimeExecutionMode.BATCH);



I’m referring to the ExecutionMode in the ExecutionConfig e.g. 
env.getConfig().setExecutionMode(ExecutionMode.BATCH)/ 
env.getConfig().setExecutionMode(ExecutionMode.PIPELINED). I’m not able to find 
documentation on this anywhere.







ah



From: zhanghao.c...@outlook.com 
Sent: Wednesday, September 14, 2022 1:10 AM
To: Hailu, Andreas [Engineering] ; 
user@flink.apache.org
Subject: Re: ExecutionMode in ExecutionConfig



https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
 gives a comprehensive description on it

Execution Mode (Batch/Streaming) | Apache 
Flink

Execution Mode (Batch/Streaming) # The DataStream API supports different 
runtime execution modes from which you can choose depending on the requirements 
of your use case and the characteristics of your job. There is the “classic” 
execution behavior of the DataStream API, which we call STREAMING execution 
mode. This should be used for unbounded jobs that require continuous 
incremental ...

nightlies.apache.org





Best,

Zhanghao Chen



From: Hailu, Andreas mailto:andreas.ha...@gs.com>>
Sent: Wednesday, September 14, 2022 7:13
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: ExecutionMode in ExecutionConfig



Hello,



Is there somewhere I can learn more about the details of the effect of 
ExecutionMode in ExecutionConfig on a job? I am trying sort out some of the 
details as it seems to work differently between the DataStream API and 
deprecated DataSet API.



I’ve attached a picture of this job graph - I’m reading from a total of 3 data 
sources – the results of 2 are sent to CoGroup (orange rectangle), and the 
other has its records forwarded to a sink after some basic filter + map 
operations (red rectangle).



The DataSet API’s job graph has all of the operators RUNNING immediately as we 
desire. However, the DataStream API’s job graph only has the DataSource 
operators that are feeding into the CoGroup online, and the remaining operators 
wake up only when the 2 sources have completed. This winds up introducing a lot 
of latency in processing the batch.



Both of these are running in the same environment on the same data with 
identical ExecutionMode configs, just different APIs. I’m attempting to have 
the same behavior between them. I ask about ExecutionMode as I am able to 
replicate this behavior in DataSet by setting the ExecutionMode from the 
default of PIPELINED to BATCH.



Thanks!



best,

ah







Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices



Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 

Re: Can flink dynamically detect high load and increase the job parallelism automatically?

2022-09-14 Thread zhanghao.chen
Flink will not try to help you do autoscaling and the parallelism is fixed 
unless you enable reactive mode/adaptive scheduler. Max parallelism just means 
the maximum parallelism with which you can rescale your job without losing 
states. The max parallelism limit is related to the Flink key group mechanism 
used to rescale job with keyed states.

I've not sure about the future plan for extending reactive mode beyond 
standalone mode though.

Best,
Zhanghao Chen

From: Erez Yaakov 
Sent: Wednesday, September 14, 2022 20:32
To: zhanghao.c...@outlook.com ; 
user@flink.apache.org 
Subject: RE: Can flink dynamically detect high load and increase the job 
parallelism automatically?


Maybe 'automatically parallelism change' is a not accurate term for describing 
what I mean, so let me re-phrase it:



Assuming I'm submitting my job with parallelism = 2 and max parallelism  = 128 
(default). My expectation is that any instance of the job will actually have 
several instances at run time, from 2 to 128.

Meaning, Flink starts my job with 2 instances of every operator, but once it 
realize that specific operator becomes the bottleneck, it'll create additional 
instances of this operator, until it'll get to the max of 128. For example, in 
my case the kafka topic is partitioned to 16 partitions. Once I increase the 
load and consumers cannot handle that (but the downstream operators do!), I’d 
expect that Flink will increase the actual amount of parallel instances of the 
source operator and if new TaskManager pods are required, flink will interact 
directly with k8s (I'm working on k8s native deployment) in order to spin up 
additional pods.



Is it the expected behavior from k8s native deployment? Or is it that in this 
mode the number of actual parallel operator instances is fixed per job and is 
not changed dynamically during the job life?



Thanks for your clarifications. It's really helpful!





From: zhanghao.c...@outlook.com 
Sent: Tuesday, September 13, 2022 4:30 AM
To: Erez Yaakov ; user@flink.apache.org
Subject: Re: Can flink dynamically detect high load and increase the job 
parallelism automatically?



EXTERNAL EMAIL



Hi Erez,



Unfortunately, autoscaling for streaming jobs is only available with reactive 
mode, which as you've already pointed out, is an MVP feature yet and only 
supports Standalone mode. Some vendors (e.g. Ververica) have already shipped 
their own private implementations of Flink autoscaling though.



Best,

Zhanghao Chen



From: Erez Yaakov 
mailto:erez.yaa...@niceactimize.com>>
Sent: Monday, September 12, 2022 21:38
To: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Can flink dynamically detect high load and increase the job 
parallelism automatically?



Hi,



When running a streaming job that uses a kafka source, is it possible (without 
reactive mode) for flink to dynamically detect high load (high consumers lag, 
high cpu usage…) and increase the job parallelism automatically?



I am running flink streaming job on an application mode cluster using native 
k8s.

My streaming job is consuming messages from a kafka topic with 16 partitions, 
parallelism.default is set to 2, no parallelism is set specifically on the 
operators/sources/sinks.



I tried to send multiple message to the kafka topic at high rate, faster than 
the job can consume, and I saw that the consumer lag was increasing.  I also 
saw in the flink UI that the source task was turning red, indicating a high 
usage of this task.

Even though I created a high load on the job, I didn't see that flink 
automatically changes the parallelism of the job to handle the high load.

Is possible for Flink to increase the parallelism of my job (or of my source) 
dynamically based on the current load (and add task managers automatically)? Or 
is this behavior only available by using reactive mode?



For reactive mode, my understanding based on the documentation is that it is in 
MVP state and is only supported in standalone mode, and is not ready yet for 
production use.



Thanks,

Erez

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.

Confidentiality: This communication and any attachments are intended for the 
above-named persons 

Re: ExecutionMode in ExecutionConfig

2022-09-13 Thread zhanghao.chen
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
 gives a comprehensive description on it
Execution Mode (Batch/Streaming) | Apache 
Flink
Execution Mode (Batch/Streaming) # The DataStream API supports different 
runtime execution modes from which you can choose depending on the requirements 
of your use case and the characteristics of your job. There is the “classic” 
execution behavior of the DataStream API, which we call STREAMING execution 
mode. This should be used for unbounded jobs that require continuous 
incremental ...
nightlies.apache.org


Best,
Zhanghao Chen

From: Hailu, Andreas 
Sent: Wednesday, September 14, 2022 7:13
To: user@flink.apache.org 
Subject: ExecutionMode in ExecutionConfig


Hello,



Is there somewhere I can learn more about the details of the effect of 
ExecutionMode in ExecutionConfig on a job? I am trying sort out some of the 
details as it seems to work differently between the DataStream API and 
deprecated DataSet API.



I’ve attached a picture of this job graph - I’m reading from a total of 3 data 
sources – the results of 2 are sent to CoGroup (orange rectangle), and the 
other has its records forwarded to a sink after some basic filter + map 
operations (red rectangle).



The DataSet API’s job graph has all of the operators RUNNING immediately as we 
desire. However, the DataStream API’s job graph only has the DataSource 
operators that are feeding into the CoGroup online, and the remaining operators 
wake up only when the 2 sources have completed. This winds up introducing a lot 
of latency in processing the batch.



Both of these are running in the same environment on the same data with 
identical ExecutionMode configs, just different APIs. I’m attempting to have 
the same behavior between them. I ask about ExecutionMode as I am able to 
replicate this behavior in DataSet by setting the ExecutionMode from the 
default of PIPELINED to BATCH.



Thanks!



best,

ah





Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices


Re: Re:退订

2022-09-13 Thread zhanghao.chen
Hi,退订发送任意内容至邮箱user-zh-unsubscr...@flink.apache.org 即可

Best,
Zhanghao Chen

From: 陈奉刚 
Sent: Tuesday, September 13, 2022 15:31
To: user-zh 
Subject: Re:退订


退订


Re: A classloading question about submitting Flink jobs on Yarn-Perjob Mode

2022-09-12 Thread zhanghao.chen
@h.yuan...@gmail.com<mailto:h.yuan...@gmail.com> Any thoughts on this?

Best,
Zhanghao Chen

From: hjw <1010445...@qq.com>
Sent: Tuesday, September 13, 2022 11:24
To: zhanghao.chen ; user 
Subject: Re: A classloading question about submitting Flink jobs on Yarn-Perjob 
Mode

Hi,

The yarn.classpath.include-user-jar parameter is shown as 
yarn.per-job-cluster.include-user-jar parameter in Flink 1.14.
I have tried DISABLED、FIRST、LAST、ORDER .But the error still happened.

Best,
Hjw



-- Original --
From: "zhanghao.chen" ;
Date: Tue, Sep 13, 2022 09:42 AM
To: "hjw"<1010445...@qq.com>;"user";
Subject: Re: A classloading question about submitting Flink jobs on Yarn-Perjob 
Mode

Hi,

Did you set any additional classloading-related configs (esp. the 
yarn.classpath.include-user-jar parameter)?

Best,
Zhanghao Chen

From: hjw <1010445...@qq.com>
Sent: Tuesday, September 13, 2022 1:58
To: user 
Subject: A classloading question about submitting Flink jobs on Yarn-Perjob Mode

When I submit a job to yarn on Yarn-perjob Mode.An error occurred during the 
client-side generation of the Jobgraph submitd to yarn.

Error:
java.lang.NoClassDefFoundError:org/apache/orc/PhysicalWriter

Because the cluster is public, there is already related 
flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory.

However ,this class is included by my jar. This class is provided by orc-core 
package. I have packaged it in my jar.

After my attempts, the following measures can solve my problem.
1.remove the related flink-orc_2.12-1.14.0-csa1.6.1.0.jar from /opt/flink/lib 
directory and packaged it to my jar.
2.put the orc-core to /opt/flink/lib directory.

However, I would like to know why an error occurs when placing the 
flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory 
and packaging orc-core into the my jar.


Env:
Flink version: flink 1.14.0


Best,
Hjw



Re: A classloading question about submitting Flink jobs on Yarn-Perjob Mode

2022-09-12 Thread zhanghao.chen
Hi,

Did you set any additional classloading-related configs (esp. the 
yarn.classpath.include-user-jar parameter)?

Best,
Zhanghao Chen

From: hjw <1010445...@qq.com>
Sent: Tuesday, September 13, 2022 1:58
To: user 
Subject: A classloading question about submitting Flink jobs on Yarn-Perjob Mode

When I submit a job to yarn on Yarn-perjob Mode.An error occurred during the 
client-side generation of the Jobgraph submitd to yarn.

Error:
java.lang.NoClassDefFoundError:org/apache/orc/PhysicalWriter

Because the cluster is public, there is already related 
flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory.

However ,this class is included by my jar. This class is provided by orc-core 
package. I have packaged it in my jar.

After my attempts, the following measures can solve my problem.
1.remove the related flink-orc_2.12-1.14.0-csa1.6.1.0.jar from /opt/flink/lib 
directory and packaged it to my jar.
2.put the orc-core to /opt/flink/lib directory.

However, I would like to know why an error occurs when placing the 
flink-orc_2.12-1.14.0-csa1.6.1.0.jar package in the /opt/flink/lib directory 
and packaging orc-core into the my jar.


Env:
Flink version: flink 1.14.0


Best,
Hjw



Re: Can flink dynamically detect high load and increase the job parallelism automatically?

2022-09-12 Thread zhanghao.chen
Hi Erez,

Unfortunately, autoscaling for streaming jobs is only available with reactive 
mode, which as you've already pointed out, is an MVP feature yet and only 
supports Standalone mode. Some vendors (e.g. Ververica) have already shipped 
their own private implementations of Flink autoscaling though.

Best,
Zhanghao Chen

From: Erez Yaakov 
Sent: Monday, September 12, 2022 21:38
To: user@flink.apache.org 
Subject: Can flink dynamically detect high load and increase the job 
parallelism automatically?


Hi,



When running a streaming job that uses a kafka source, is it possible (without 
reactive mode) for flink to dynamically detect high load (high consumers lag, 
high cpu usage…) and increase the job parallelism automatically?



I am running flink streaming job on an application mode cluster using native 
k8s.

My streaming job is consuming messages from a kafka topic with 16 partitions, 
parallelism.default is set to 2, no parallelism is set specifically on the 
operators/sources/sinks.



I tried to send multiple message to the kafka topic at high rate, faster than 
the job can consume, and I saw that the consumer lag was increasing.  I also 
saw in the flink UI that the source task was turning red, indicating a high 
usage of this task.

Even though I created a high load on the job, I didn't see that flink 
automatically changes the parallelism of the job to handle the high load.

Is possible for Flink to increase the parallelism of my job (or of my source) 
dynamically based on the current load (and add task managers automatically)? Or 
is this behavior only available by using reactive mode?



For reactive mode, my understanding based on the documentation is that it is in 
MVP state and is only supported in standalone mode, and is not ready yet for 
production use.



Thanks,

Erez

Confidentiality: This communication and any attachments are intended for the 
above-named persons only and may be confidential and/or legally privileged. Any 
opinions expressed in this communication are not necessarily those of NICE 
Actimize. If this communication has come to you in error you must take no 
action based on it, nor must you copy or show it to anyone; please 
delete/destroy and inform the sender by e-mail immediately.
Monitoring: NICE Actimize may monitor incoming and outgoing e-mails.
Viruses: Although we have taken steps toward ensuring that this e-mail and 
attachments are free from any virus, we advise that in keeping with good 
computing practice the recipient should ensure they are actually virus free.


Re: pyflink内存管理

2022-08-25 Thread zhanghao.chen
是的

Best,
Zhanghao Chen

From: yidan zhao 
Sent: Thursday, August 25, 2022 10:20
To: user-zh 
Subject: Re: pyflink内存管理

感谢。我是standalone集群,配置到 flink-conf.yaml 就可行吧。
https://stackoverflow.com/questions/64323031/pyflink-1-11-2-couldn-t-configure-taskmanager-memory-task-off-heap-size-proper
该文章说到的必须通过 tableEnv 配置是因为使用 pyflink-shell ?

我提交是用 flink run 提交的。

yu'an huang  于2022年8月25日周四 09:25写道:
>
> 你好,
> python部分的内存算flink taskmanager 配置的内存,你应该可以用参数
> *'taskmanager.memory.task.off-heap.size*
> 来配置,可以参考这个问题:
> https://stackoverflow.com/questions/64323031/pyflink-1-11-2-couldn-t-configure-taskmanager-memory-task-off-heap-size-proper
>
>
>
> On Wed, 24 Aug 2022 at 1:05 PM, yidan zhao  wrote:
>
> > 如题,pyflink场景的任务,内存是如何管理呢。
> >
> > python部分的内存是否算入flink TaskManager配置的内存中呢?
> > 比如python算子通过多进程做各种复杂的运算,这部分内存占用是否算入flink呢?
> >
> >
> >
> > ――
> > 如果不算的话,使用pyflink时,容器内存和flink TaskManager内存配置是不是需要预留空间?
> >


Re: 退订

2022-08-25 Thread zhanghao.chen
Hi,退订发送任意内容至邮箱user-zh-unsubscr...@flink.apache.org 即可

Best,
Zhanghao Chen

From: 柳懿珊 
Sent: Thursday, August 25, 2022 18:07
To: user-zh@flink.apache.org 
Subject: 退订

退订


Re: flink自动重启出错

2022-08-19 Thread zhanghao.chen
能提供下你用的是什么 Flink 版本,运行的什么样的作业逻辑吗?老版本有发现过因为 state comparator 实现问题的导致的类似问题: 
https://issues.apache.org/jira/browse/FLINK-18452,1.12 中修复了。

新的作业如果开了 cp,全新启动后因为某些原因挂了也会从 cp 恢复,就能触发此类问题了

Best,
Zhanghao Chen

From: Jason_H 
Sent: Friday, August 19, 2022 11:52
To: flink中文邮件组 
Subject: flink自动重启出错

cause by: java.lang.RuntimeException: Error while getting state
org.apache.flink.util.StateMigrationException: For heap backends, the new state 
serializer must not be incompatible with the old state serializer
大家好,我最近遇到一个很奇怪的问题,我的作业自动重启的时候报这个错,我上网查了一下,说是状态不兼容,但是我的作业都是新的作业启动的,没有根据之前的ck恢复,然后跑一段时间就报这个错,它是自动重启的时候,就报这个错了,这个有遇到过吗?有没有什么解决办法?
强调:作业是新的,没有基于之前的作业的ck进行重启。


| |
Jason_H
|
|
hyb_he...@163.com
|


Re: Re:Re: flink on k8s作业失败后如何自动释放资源?

2022-08-15 Thread zhanghao.chen
这个是符合预期的,你可以调整 flink 
的故障恢复策略来控制这个行为:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/task_failure_recovery/。

Best,
Zhanghao Chen

From: casel.chen 
Sent: Tuesday, August 16, 2022 8:33
To: user-zh@flink.apache.org 
Subject: Re:Re: flink on k8s作业失败后如何自动释放资源?

native模式,发现作业失败后会自动重试几次,最后部署和pod消失

















在 2022-08-14 16:55:48,"yu'an huang"  写道:
>你的部署模式是native还是standalone,正常作业失败是会释放资源的,可以提供更多信息吗?
>
>
>
>> On 14 Aug 2022, at 9:55 AM, casel.chen  wrote:
>>
>> flink on k8s作业失败后现在默认是会一直占用资源,请问要如何配置使其自动释放资源?
>


Re: (Possible) bug in flink-kafka-connector (metrics rewriting)

2022-08-02 Thread zhanghao.chen
Hi, I suggest you creating a ticket for it on 
https://issues.apache.org/jira/projects/FLINK/summary.
Flink - ASF JIRA
Welcome to the Apache Flink project. Apache Flink is an open source platform 
for scalable batch and stream data processing.
issues.apache.org


Best,
Zhanghao Chen

From: Valentina Predtechenskaya 
Sent: Wednesday, August 3, 2022 1:32
To: user@flink.apache.org 
Subject: (Possible) bug in flink-kafka-connector (metrics rewriting)



Hello !


I would like to report a bug with metrics registration on KafkaProducer 
initialization.

Firstly we found the problem with our Flink cluster: metric 
KafkaProducer.outgoing-byte-rate was periodically missing (was equals zero or 
near zero) on several subtasks, in the same time other subtasks was fine with 
this metric. Actual outgoing rate was the same on different subtasks, it was 
clear from, for example, KafkaProducer.records-send-rate metric, which was ok 
on every subtask, problem 100% was with metric itself.


After long investigation we found the root-cause of this behavior:

  *   KafkaWriter creates an instance of FlinkKafkaInternalProducer and then 
initializes metric wrappers over existing KafkaProducer metrics (gauges)  - 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L327-L330
  *   KafkaProducer itself in the constructor creates Sender to access brokers, 
starts a thread (kafka-producer-network-thread) and run Sender in this separate 
thread - 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L458-L460
  *   After starting the Sender, metrics connected with topics and brokers 
register for some time. If they register quickly, KafkaWriter will see them 
before the end of initialization and these metrics will be wrapped as flink 
gauges. Otherwise, they will not.
  *   Some KafkaProducer metrics from producer and from broker has same names - 
for example, outgoing-byte-rate - 
https://docs.confluent.io/platform/current/kafka/monitoring.html#producer-metrics
  *   In case if two metrics has same name, Flink KafkaWriter rewrites metric 
in wrapper - 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaWriter.java#L359-L360

I have debugged these libraries a lot and I'm sure in that behavior. If, for 
example, patch flink-kafka-connector with condition not to initialize metric if 
"producer-node-metrics".equals(metric.metricName().group()), our metrics all 
fine (outgoing-byte-rate is not 0).
Also, the bug does not reproduce if cluster is not very fast (for example, on 
local machine) and data from brokers comes only when all metrics initialized in 
KafkaWriter.

I suppose this is not an expected behavior, but even in the last version of 
flink-kafka-connector code is the same. Is there any treatement ? Maybe some 
workarounds ? To be honest, I don't really want to use my own patched version 
of connector.

Thank you !



“This message contains confidential information/commercial secret. If you are 
not the intended addressee of this message you may not copy, save, print or 
forward it to any third party and you are kindly requested to destroy this 
message and notify the sender thereof by email.
Данное сообщение содержит конфиденциальную информацию/информацию, являющуюся 
коммерческой тайной. Если Вы не являетесь надлежащим адресатом данного 
сообщения, Вы не вправе копировать, сохранять, печатать или пересылать его 
каким либо иным лицам. Просьба уничтожить данное сообщение и уведомить об этом 
отправителя электронным письмом.”