Re: Remote system has been silent for too long. (more than 48.0 hours)

2022-11-07 Thread Guo Thompson
感觉是tm gc太久导致的

Weihua Hu  于2022年11月2日周三 19:47写道:

> Hi,
> 这种情况一般是这两个 TaskManager 出现故障断开连接了。可以再查看下之前的日志验证下。
>
> Best,
> Weihua
>
>
> On Wed, Nov 2, 2022 at 9:41 AM casel.chen  wrote:
>
> > 今天线上 Flink 1.13.2 作业遇到如下报错,请问是何原因,要如何解决?
> > 作业内容是从kafka topic消费canal json数据写到另一个mysql库表
> >
> >
> > 2022-09-17 19:40:03,088 ERROR akka.remote.Remoting
> >  [] - Association to [akka.tcp://
> > flink-metrics@172.19.193.15:34101] with UID [-633015504] irrecoverably
> > failed. Quarantining address.
> >
> > java.util.concurrent.TimeoutException: Remote system has been silent for
> > too long. (more than 48.0 hours)
> >
> > at
> >
> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:387)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at akka.actor.Actor.aroundReceive(Actor.scala:517)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at
> > akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at
> > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at
> > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > 2022-09-25 17:17:21,581 ERROR akka.remote.Remoting
> >  [] - Association to [akka.tcp://
> > flink-metrics@172.19.193.15:38805] with UID [1496738655] irrecoverably
> > failed. Quarantining address.
> >
> > java.util.concurrent.TimeoutException: Remote system has been silent for
> > too long. (more than 48.0 hours)
> >
> > at
> >
> akka.remote.ReliableDeliverySupervisor$$anonfun$idle$1.applyOrElse(Endpoint.scala:387)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at akka.actor.Actor.aroundReceive(Actor.scala:517)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at
> > akka.remote.ReliableDeliverySupervisor.aroundReceive(Endpoint.scala:207)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at
> > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at
> >
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at
> > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
> >
> > at
> >
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> > ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>


Re: support escaping `#` in flink job spec in Flink-operator

2022-11-07 Thread Gyula Fóra
It is also possible that this is a problem of the Flink native Kubernetes
integration, we have to check where exactly it goes wrong before we try to
fix it .

We simply set the args into a Flink config and pass it to the native
deployment logic in the operator.

Gyula

On Tue, 8 Nov 2022 at 07:37, Gyula Fóra  wrote:

> Hi!
>
> How do you submit your yaml?
>
> It’s possible that this is not operator problem. Did you try submitting
> the deployment in json format instead?
>
> If it still doesn't work please open a JIRA ticket with the details to
> reproduce and what you have tried :)
>
> Cheers
> Gyula
>
> On Tue, 8 Nov 2022 at 04:56, liuxiangcao  wrote:
>
>> Hi,
>>
>> We have a job that contains `#` as part of mainArgs and it used to work
>> on Ververica. Now we are switching to our own control plane to deploy to
>> flink-operaotor and the job started to fail due to the main args string
>> getting truncated at `#` character when passed to flink application. I
>> believe this is due to characters after `#` being interpreted as comments
>> in yaml file. To support having `#` in the mainArgs, the flink operator
>> needs to escape `#` when generating k8 yaml file.
>>
>> Assuming the mainArgs contain '\"xyz#abc\".
>>
>> Here is the stack-trace:
>> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
>> not parse value '\"xyz' *(Note: truncated by #)*
>>
>> for key  '$internal.application.program-args'.\n\tat
>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
>> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat
>>  
>> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat
>>  
>> org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
>>  
>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
>>  
>> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused
>>  by: *java.lang.IllegalArgumentException: Could not split string. Quoting 
>> was not closed properly*.\n\tat 
>> org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat
>>  
>> org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat
>>  
>> org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat
>>  
>> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat
>>  
>> org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat
>>  java.base/java.util.Optional.map(Optional.java:265)\n\tat 
>> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t...
>>  5 more\n"},"@version":1,"source_host":"xx","message":"Could not create 
>> application 
>> program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>>
>>
>>  Can someone take a look and help fixing this issue? or I can help fixing
>> this if someone can point me in the right direction.
>>
>> --
>> Best Wishes & Regards
>> Shawn Xiangcao Liu
>>
>


Re: support escaping `#` in flink job spec in Flink-operator

2022-11-07 Thread Gyula Fóra
Hi!

How do you submit your yaml?

It’s possible that this is not operator problem. Did you try submitting the
deployment in json format instead?

If it still doesn't work please open a JIRA ticket with the details to
reproduce and what you have tried :)

Cheers
Gyula

On Tue, 8 Nov 2022 at 04:56, liuxiangcao  wrote:

> Hi,
>
> We have a job that contains `#` as part of mainArgs and it used to work on
> Ververica. Now we are switching to our own control plane to deploy to
> flink-operaotor and the job started to fail due to the main args string
> getting truncated at `#` character when passed to flink application. I
> believe this is due to characters after `#` being interpreted as comments
> in yaml file. To support having `#` in the mainArgs, the flink operator
> needs to escape `#` when generating k8 yaml file.
>
> Assuming the mainArgs contain '\"xyz#abc\".
>
> Here is the stack-trace:
> {"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
> not parse value '\"xyz' *(Note: truncated by #)*
>
> for key  '$internal.application.program-args'.\n\tat
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
> org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat
>  
> org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat
>  
> org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
>  
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
>  
> org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused
>  by: *java.lang.IllegalArgumentException: Could not split string. Quoting was 
> not closed properly*.\n\tat 
> org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat
>  
> org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat
>  
> org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat
>  
> org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat
>  
> org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat
>  java.base/java.util.Optional.map(Optional.java:265)\n\tat 
> org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t...
>  5 more\n"},"@version":1,"source_host":"xx","message":"Could not create 
> application 
> program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}
>
>
>  Can someone take a look and help fixing this issue? or I can help fixing
> this if someone can point me in the right direction.
>
> --
> Best Wishes & Regards
> Shawn Xiangcao Liu
>


Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

2022-11-07 Thread Yun Gao
Hi Etienne,
Very thanks for the article! Flink is currently indeed keeping increasing the
ability of unified batch / stream processing with the same api, and its a great
pleasure that more and more users are trying this functionality. But I also
have some questions regarding some details. 
First IMO, as a whole for the long run Flink will have two unified APIs, namely 
Table / SQL
 API and DataStream API. Users could express the computation logic with these 
two APIs
for both bounded and unbounded data processing. Underlying Flink provides two
execution modes: the streaming mode works with both bounded and unbounded data, 
and it executes in a way of incremental processing based on state; the batch 
mode works
only with bounded data, and it executes in a ways level-by-level similar to the 
traditional 
batch processing frameworks. Users could switch the execution mode via 
EnvironmentSettings.inBatchMode() for 
StreamExecutionEnvironment.setRuntimeMode(). 
Specially for DataStream, as implemented in FLIP-140, currently all the 
existing DataStream 
operation supports the batch execution mode in a unified way[1]: data will be 
sorted for the 
keyBy() edges according to the key, then the following operations like reduce() 
could receive 
all the data belonging to the same key consecutively, then it could directly 
reducing the records
 of the same key without maintaining the intermediate states. In this way users 
could write the 
same code for both streaming and batch processing with the same code. 
# Regarding the migration of Join / Reduce
First I think Reduce is always supported and users could write 
dataStream.keyBy().reduce(xx) 
directly, and if batch execution mode is set, the reduce will not be executed 
in a incremental way, 
instead is acts much like sort-based aggregation in the traditional batch 
processing framework.
Regarding Join, although the issue of FLINK-22587 indeed exists: current join 
has to be bound 
to a window and the GlobalWindow does not work properly, but with some more try 
currently 
it does not need users to re-write the whole join from scratch: Users could 
write a dedicated 
window assigner that assigns all the records to the same window instance and 
return
EventTimeTrigger.create() as the default event-time trigger [2]. Then it works
source1.join(source2)
 .where(a -> a.f0)
 .equalTo(b -> b.f0)
 .window(new EndOfStreamWindows())
 .apply();
It does not requires records have event-time attached since the trigger of 
window is only 
relying on the time range of the window and the assignment does not need 
event-time either. 
The behavior of the join is also similar to sort-based join if batch mode is 
enabled. 
Of course it is not easy to use to let users do the workaround and we'll try to 
fix this issue in 1.17. 
# Regarding support of Sort / Limit
Currently these two operators are indeed not supported in the DataStream API 
directly. One initial 
though for these two operations are that users may convert the DataStream to 
Table API and use 
Table API for these two operators:
DataStream xx = ... // Keeps the customized logic in DataStream
Table tableXX = tableEnv.fromDataStream(dataStream);
tableXX.orderBy($("a").asc());
How do you think about this option? We are also assessing if the combination of 
DataStream
 API / Table API is sufficient for all the batch users. Any suggestions are 
warmly welcome. 
Best,
Yun Gao
[1]  
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution_mode/
 

[2] 
https://github.com/apache/flink-ml/blob/master/flink-ml-core/src/main/java/org/apache/flink/ml/common/datastream/EndOfStreamWindows.java
 

--
From:liu ron 
Send Time:2022 Nov. 8 (Tue.) 10:21
To:dev ; Etienne Chauchot ; user 

Subject:Re: [blog article] Howto migrate a real-life batch pipeline from the 
DataSet API to the DataStream API
Thanks for your post, It looks very good to me, also maybe for developers,
Best,
Liudalong
yuxia mailto:luoyu...@alumni.sjtu.edu.cn >> 
于2022年11月8日周二 09:11写道:
Wow, cool! Thanks for your work.
 It'll be definitely helpful for the users that want to migrate their batch job 
from DataSet API to DataStream API.
 Best regards,
 Yuxia
 - 原始邮件 -
 发件人: "Etienne Chauchot" mailto:echauc...@apache.org >>
 收件人: "dev" mailto:d...@flink.apache.org >>, "User" 
mailto:user@flink.apache.org >>
 发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
 主题: [blog article] Howto migrate a real-life batch pipeline from the DataSet 
API to the DataStream API
 Hi everyone,
 In case some of you are interested, I just posted a blog article 

FeatHub - 一个基于Flink的开源实时特征工程平台

2022-11-07 Thread Dong Lin
大家好,

我们 (阿里云Flink团队) 最近开源了FeatHub,一个基于Flink来完成实时特征工程的特征工程平台 (feature
store)。我们希望这个项目极大简化和支持大家基于Flink来完成特征的定义,部署,监控 etc.

目前项目已经在https://github.com/alibaba/feathub 开源。github页面中有更多的信息。
https://github.com/flink-extended/feathub-examples提供了更多的代码样例。并且我们将在今年11月的Flink
Forward Asia 大会上介绍这个项目。

这个项目还在早期开发中。我们正在添加更多的功能,例如对于Redis的支持。希望大家能关注FeatHub项目和提供反馈!

Cheers,
Dong


FeatHub - A feature store that uses Apache Flink for real-time feature ETL

2022-11-07 Thread Dong Lin
Hi everyone,

In case some of you might be interested, we (developers at Alibaba Flink
team) recently developed and open-sourced FeatHub
, a feature store that uses Apache
Flink for feature engineering.

The github page https://github.com/alibaba/feathub provides more details of
the project. And extra quickstart examples can be found at
https://github.com/flink-extended/feathub-examples. We will also introduce
FeatHub at the Flink Forward Asia
 conference
later this month.

The project is still in its early stages of development. Please stay tuned
as we are actively developing it for production deployment!

It will be great if you can try it out and leave feedback in the Github
issues or slack channel.

Cheers,
Dong


support escaping `#` in flink job spec in Flink-operator

2022-11-07 Thread liuxiangcao
Hi,

We have a job that contains `#` as part of mainArgs and it used to work on
Ververica. Now we are switching to our own control plane to deploy to
flink-operaotor and the job started to fail due to the main args string
getting truncated at `#` character when passed to flink application. I
believe this is due to characters after `#` being interpreted as comments
in yaml file. To support having `#` in the mainArgs, the flink operator
needs to escape `#` when generating k8 yaml file.

Assuming the mainArgs contain '\"xyz#abc\".

Here is the stack-trace:
{"exception":{"exception_class":"java.lang.IllegalArgumentException","exception_message":"Could
not parse value '\"xyz' *(Note: truncated by #)*

for key  '$internal.application.program-args'.\n\tat
org.apache.flink.configuration.Configuration.getOptional(Configuration.java:720)\n\tat
org.apache.flink.configuration.Configuration.get(Configuration.java:704)\n\tat
org.apache.flink.configuration.ConfigUtils.decodeListFromConfig(ConfigUtils.java:123)\n\tat
org.apache.flink.client.deployment.application.ApplicationConfiguration.fromConfiguration(ApplicationConfiguration.java:80)\n\tat
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.getPackagedProgram(KubernetesApplicationClusterEntrypoint.java:93)\n\tat
org.apache.flink.kubernetes.entrypoint.KubernetesApplicationClusterEntrypoint.main(KubernetesApplicationClusterEntrypoint.java:70)\nCaused
by: *java.lang.IllegalArgumentException: Could not split string.
Quoting was not closed properly*.\n\tat
org.apache.flink.configuration.StructuredOptionsSplitter.consumeInQuotes(StructuredOptionsSplitter.java:163)\n\tat
org.apache.flink.configuration.StructuredOptionsSplitter.tokenize(StructuredOptionsSplitter.java:129)\n\tat
org.apache.flink.configuration.StructuredOptionsSplitter.splitEscaped(StructuredOptionsSplitter.java:52)\n\tat
org.apache.flink.configuration.ConfigurationUtils.convertToList(ConfigurationUtils.java:324)\n\tat
org.apache.flink.configuration.Configuration.lambda$getOptional$2(Configuration.java:714)\n\tat
java.base/java.util.Optional.map(Optional.java:265)\n\tat
org.apache.flink.configuration.Configuration.getOptional(Configuration.java:714)\n\t...
5 more\n"},"@version":1,"source_host":"xx","message":"Could not
create application
program.","thread_name":"main","@timestamp":"2022-11-07T18:40:03.369+00:00","level":"ERROR","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint"}


 Can someone take a look and help fixing this issue? or I can help fixing
this if someone can point me in the right direction.

-- 
Best Wishes & Regards
Shawn Xiangcao Liu


Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

2022-11-07 Thread liu ron
Thanks for your post, It looks very good to me, also maybe for developers,

Best,
Liudalong

yuxia  于2022年11月8日周二 09:11写道:

> Wow, cool!  Thanks for your work.
> It'll be definitely helpful for the users that want to migrate their batch
> job from DataSet API to DataStream API.
>
> Best regards,
> Yuxia
>
> - 原始邮件 -
> 发件人: "Etienne Chauchot" 
> 收件人: "dev" , "User" 
> 发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
> 主题: [blog article] Howto migrate a real-life batch pipeline from the
> DataSet API to the DataStream API
>
> Hi everyone,
>
> In case some of you are interested, I just posted a blog article about
> migrating a real-life batch pipeline from the DataSet API to the
> DataStream API:
>
>
> https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html
>
> Best
>
> Etienne
>


Kinesis Connector does not work

2022-11-07 Thread Matt Fysh
Hi, I'm following the kinesis connector instructions as documented here:
https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/connectors/datastream/kinesis/

I'm also running Flink in standalone session mode using docker compose and
the Python images, as described in the Flink docs (Deployment section)

When I try to run a basic datastream.print() / env.execute() example with a
kinesis source, I get the following error. From my limited understanding of
Java, it seems the Kinesis connector is using a shaded version of the AWS
Java SDK, and that older version of the SDK is trying to load a class that
is no longer present in the 1.16.0 Flink docker images. Is there a
workaround for this? Thanks

Caused by: java.lang.NoClassDefFoundError:
org/apache/commons/logging/LogFactory
at
org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfiguration.(ClientConfiguration.java:47)
at
org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getDefaultConfig(ClientConfigurationFactory.java:46)
at
org.apache.flink.kinesis.shaded.com.amazonaws.ClientConfigurationFactory.getConfig(ClientConfigurationFactory.java:36)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.createKinesisClient(KinesisProxy.java:268)
at
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.(KinesisProxy.java:152)


Re: [blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

2022-11-07 Thread yuxia
Wow, cool!  Thanks for your work.
It'll be definitely helpful for the users that want to migrate their batch job 
from DataSet API to DataStream API.

Best regards,
Yuxia

- 原始邮件 -
发件人: "Etienne Chauchot" 
收件人: "dev" , "User" 
发送时间: 星期一, 2022年 11 月 07日 下午 10:29:54
主题: [blog article] Howto migrate a real-life batch pipeline from the DataSet 
API to the DataStream API

Hi everyone,

In case some of you are interested, I just posted a blog article about 
migrating a real-life batch pipeline from the DataSet API to the 
DataStream API:

https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html

Best

Etienne


flink sql作业动态设置告警规则问题

2022-11-07 Thread casel.chen
配置了prometheus收集flink sql作业指标,现在想根据这些指标动态设置一些告警规则,请问要如何实现?
查了下prometheus告警需要配置alert rule之后重启才生效,有没有办法不重启呢?常规实现方案是什么?

[blog article] Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API

2022-11-07 Thread Etienne Chauchot

Hi everyone,

In case some of you are interested, I just posted a blog article about 
migrating a real-life batch pipeline from the DataSet API to the 
DataStream API:


https://echauchot.blogspot.com/2022/11/flink-howto-migrate-real-life-batch.html

Best

Etienne



Load Distribution in specific Slot of Taskmanager in flink(version 1.15.2)

2022-11-07 Thread harshit.varsh...@iktara.ai
Dear Team,

 

I need some advice on setting up load distribution of flink tasks. 

 

I have a flink task that processes transactions for users. Since load is
more than what can be handled on single machine, I want same task to be
executed on 3 machines. 

 

I am trying to use parallelism feature of flink for same. I am able to get
flink Job manager to start same task on 3 machines. I want task on each
machine to handle 1/3 of total user transaction, kindly suggest what
mechanism to use so each task handles only data for their own 1/3 of users.

 

 

Thanks,

Harshit Varshney



Re: 设置slot是vcore的几倍会有什么影响

2022-11-07 Thread Yanfei Lei
Hi junjie,
一个slot可以看作JVM中的一个线程[1],因此可以设置taskmanager.numberOfTaskSlots超过cpu core的数目。

> 这样设置slot是vcore的几倍会有什么影响吗?

设置slot是vcore的几倍可能导致资源bound(如cpu、内存、磁盘、网络带宽等),我曾经遇到过slot数目过多(每个slot上的subtask的状态较大)引起的磁盘不足问题。
[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/flink-architecture/#task-slots-and-resources

Best,
Yanfei

junjie.m...@goupwith.com  于2022年11月7日周一 19:24写道:

> hi,
> 问个问题就是假设我只有一台服务器且只有8vcore,standalone启动时可以设置taskmanager.numberOfTaskSlots >
> 8 吗?
> 我尝试了设置为32等都是可以的且提交job完全占满slot数也是没问题的,这样设置slot是vcore的几倍会有什么影响吗?
> 谢谢!!!
>


-- 
Best,
Yanfei


Re: flink on k8s 提交作业,使用 oss 作为 checkpoint 地址,但找不到 oss

2022-11-07 Thread Lijie Wang
 flink-oss-fs-hadoop-1.13.6.jar 这个 jar 需要放到  flink 的 lib 目录下

Best,
Lijie

highfei2011  于2022年11月1日周二 16:23写道:

> 包冲突了。
>
>
> 在 2022年11月1日 15:39,highfei2011 写道:
>
>
> flink 版本:apache flink 1.13.6 flink operator 版本: 1.2.0
> 提交命令:kubernetes-jobmanager.sh kubernetes-application 异常: Caused by:
> org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not
> find a file system implementation for scheme 'oss'. The scheme is directly
> supported by Flink through t he following plugin: flink-oss-fs-hadoop.
> Please ensure that each plugin resides within its own subfolder within the
> plugins directory. See https://ci.apache.org/projects/flink/flink-docs
> -stable/ops/plugins.html for more information. If you want to use a Hadoop
> file system for that scheme, please add the scheme to the configuration
> fs.allowed-fallback-filesystems. For a f ull list of supported file
> systems, please see
> https://nightlies.apache.org/flink/flink-docs-stable/ops/filesystems/.
> 我排查了 /opt/flink/opt/目录,下面是有 flink-oss-fs-hadoop-1.13.6.jar 注:本地测试正常,仅使用
> flink operator 提交时,发生如上异常。


Re: Consumption stops on some Kafka partitions in job running on Flink 1.15.2

2022-11-07 Thread Samuel Chase
Say we have two partitions A and B. A is the partition which is not
receiving any events, but B is seeing events and processing normally.
Now, because A is not having any events, with no Watermarking
strategy, it is holding the watermark back as described in the docs.
Now, does this mean that lag on B will build up as it cannot move
forward until A starts seeing some input? This is what I can gather to
the best of my understanding.

How does this explain the situations where:
1. A single partition (out of 30) had lag increasing.
2. Nine partitions (out of 30) had lag increasing.

Will be great, if anyone can help clear this up for me.

On Mon, Nov 7, 2022 at 2:18 PM Salva Alcántara  wrote:
>
> Hi Samuel,
>
> I'm glad to hear that! Let us know how the problem is finally solved. 
> Personally I'd upgrade to 1.15.3.
>
> Salva
>
> On Mon, Nov 7, 2022 at 9:42 AM Samuel Chase  wrote:
>>
>> Hi Salva,
>>
>> Thanks for the pointers. They were helpful in gaining a better
>> understanding what happened.
>>
>> In both situations, these outages occurred at a time of the lowest
>> traffic in a day. Due to business-logic reasons, we are using a
>> partition key which may not result in even distribution across all
>> partitions. It seems conceivable to me that during times of low
>> traffic some partitions may not see any events for some time.
>>
>> Now, with no watermarking strategy, I believe we are running into the
>> problem described in
>> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
>> where the Watermark cannot move forward. The docs recommend using a
>> Watermark strategy i.e. withIdleness which detect inputs as being idle
>> which will not hold the watermark back.
>>
>> However, https://issues.apache.org/jira/browse/FLINK-28975 suggests
>> there is an issue with `withIdleness`. Am I right in understanding
>> that we cannot safely use `withIdleness` now in the Watermarking
>> strategy. Does this affect me? I am not doing any joins on the
>> streams, I just need the message to be sent to the sink, and no lag
>> building up.
>>
>> Plan of action:
>>
>> A. Use Watermarking strategy `withIdleness` on 1.15.2 (which is
>> affected by FLINK-28975).
>>
>> OR
>>
>> B. Upgrade to 1.15.3 (FLINK-28975 is fixed here) with a Watermarking
>> strategy `withIdleness`.
>>
>> On Mon, Nov 7, 2022 at 1:05 PM Salva Alcántara  
>> wrote:
>> >
>> > Hi Samuel,
>> >
>> > Maybe related to this https://issues.apache.org/jira/browse/FLINK-28975? 
>> > See also:
>> > - 
>> > https://stackoverflow.com/questions/72654182/flink-interval-join-datastream-with-kafkasource-drops-all-records
>> >
>> > I left a similar comment in your SO post.
>> >
>> > Regards,
>> >
>> > Salva
>> >
>> > On Mon, Nov 7, 2022 at 7:27 AM Samuel Chase  wrote:
>> >>
>> >> Hello,
>> >>
>> >> At work we are using Flink to store timers and notify us when they are
>> >> triggered. It's been working great over several versions over the
>> >> years. Flink 1.5 -> Flink 1.9 -> Flink 1.15.2.
>> >>
>> >> A few months ago we upgraded from Flink 1.9 to Flink 1.15.2. In the
>> >> process we had to upgrade all the Flink API code in our job to use the
>> >> new APIs.
>> >>
>> >> Our job code has a Kafka Source and a Kafka Sink. For our Source, we
>> >> are currently using `WatermarkStrategy.noWatermarks()`. It has been
>> >> running fine ever since we upgraded, but in the last few weeks we have
>> >> faced two outages.
>> >>
>> >> Configuration:
>> >>
>> >> 2 JobManager nodes
>> >> 5 TaskManager nodes (4 slots each)
>> >> Parallelism: 16
>> >> Source topic: 30 partitions
>> >> Using `setStartingOffsets(OffsetsInitializer.latest())` while
>> >> initializing the source.
>> >>
>> >> Outage #1
>> >>
>> >> Our monitoring system alerted us that lag is building up on one
>> >> partition (out of 30). We did not know of anything we could to do
>> >> jumpstart consumption on that partition other than by forcing a
>> >> reassignment. When the TaskManager service on the node to which the
>> >> partition was assigned was restarted, the lag reduced soon after.
>> >>
>> >> Outage #2
>> >>
>> >> Something similar happened again, but this time, lag was building up
>> >> on 9 (out of 30) partitions. Once again, we restarted the TaskManager
>> >> services on all the nodes, and it started consuming once again.
>> >>
>> >> We asked a question on SO,
>> >> https://stackoverflow.com/q/74272277/2165719 and was directed to ask
>> >> on the mailing list as well.
>> >>
>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
>> >>
>> >> In another post, https://stackoverflow.com/a/70101290/2165719 there is
>> >> a suggestion to use `WatermarkStrategy.withIdleness(...)`. Could this
>> >> help us?
>> >>
>> >> Any help/guidance here would be much appreciated.
>> >>
>> >> Thanks,


Re: Consumption stops on some Kafka partitions in job running on Flink 1.15.2

2022-11-07 Thread Salva Alcántara
Hi Samuel,

I'm glad to hear that! Let us know how the problem is finally solved.
Personally I'd upgrade to 1.15.3.

Salva

On Mon, Nov 7, 2022 at 9:42 AM Samuel Chase  wrote:

> Hi Salva,
>
> Thanks for the pointers. They were helpful in gaining a better
> understanding what happened.
>
> In both situations, these outages occurred at a time of the lowest
> traffic in a day. Due to business-logic reasons, we are using a
> partition key which may not result in even distribution across all
> partitions. It seems conceivable to me that during times of low
> traffic some partitions may not see any events for some time.
>
> Now, with no watermarking strategy, I believe we are running into the
> problem described in
>
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
> where the Watermark cannot move forward. The docs recommend using a
> Watermark strategy i.e. withIdleness which detect inputs as being idle
> which will not hold the watermark back.
>
> However, https://issues.apache.org/jira/browse/FLINK-28975 suggests
> there is an issue with `withIdleness`. Am I right in understanding
> that we cannot safely use `withIdleness` now in the Watermarking
> strategy. Does this affect me? I am not doing any joins on the
> streams, I just need the message to be sent to the sink, and no lag
> building up.
>
> Plan of action:
>
> A. Use Watermarking strategy `withIdleness` on 1.15.2 (which is
> affected by FLINK-28975).
>
> OR
>
> B. Upgrade to 1.15.3 (FLINK-28975 is fixed here) with a Watermarking
> strategy `withIdleness`.
>
> On Mon, Nov 7, 2022 at 1:05 PM Salva Alcántara 
> wrote:
> >
> > Hi Samuel,
> >
> > Maybe related to this https://issues.apache.org/jira/browse/FLINK-28975?
> See also:
> > -
> https://stackoverflow.com/questions/72654182/flink-interval-join-datastream-with-kafkasource-drops-all-records
> >
> > I left a similar comment in your SO post.
> >
> > Regards,
> >
> > Salva
> >
> > On Mon, Nov 7, 2022 at 7:27 AM Samuel Chase 
> wrote:
> >>
> >> Hello,
> >>
> >> At work we are using Flink to store timers and notify us when they are
> >> triggered. It's been working great over several versions over the
> >> years. Flink 1.5 -> Flink 1.9 -> Flink 1.15.2.
> >>
> >> A few months ago we upgraded from Flink 1.9 to Flink 1.15.2. In the
> >> process we had to upgrade all the Flink API code in our job to use the
> >> new APIs.
> >>
> >> Our job code has a Kafka Source and a Kafka Sink. For our Source, we
> >> are currently using `WatermarkStrategy.noWatermarks()`. It has been
> >> running fine ever since we upgraded, but in the last few weeks we have
> >> faced two outages.
> >>
> >> Configuration:
> >>
> >> 2 JobManager nodes
> >> 5 TaskManager nodes (4 slots each)
> >> Parallelism: 16
> >> Source topic: 30 partitions
> >> Using `setStartingOffsets(OffsetsInitializer.latest())` while
> >> initializing the source.
> >>
> >> Outage #1
> >>
> >> Our monitoring system alerted us that lag is building up on one
> >> partition (out of 30). We did not know of anything we could to do
> >> jumpstart consumption on that partition other than by forcing a
> >> reassignment. When the TaskManager service on the node to which the
> >> partition was assigned was restarted, the lag reduced soon after.
> >>
> >> Outage #2
> >>
> >> Something similar happened again, but this time, lag was building up
> >> on 9 (out of 30) partitions. Once again, we restarted the TaskManager
> >> services on all the nodes, and it started consuming once again.
> >>
> >> We asked a question on SO,
> >> https://stackoverflow.com/q/74272277/2165719 and was directed to ask
> >> on the mailing list as well.
> >>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
> >>
> >> In another post, https://stackoverflow.com/a/70101290/2165719 there is
> >> a suggestion to use `WatermarkStrategy.withIdleness(...)`. Could this
> >> help us?
> >>
> >> Any help/guidance here would be much appreciated.
> >>
> >> Thanks,
>


Re: Consumption stops on some Kafka partitions in job running on Flink 1.15.2

2022-11-07 Thread Samuel Chase
Hi Salva,

Thanks for the pointers. They were helpful in gaining a better
understanding what happened.

In both situations, these outages occurred at a time of the lowest
traffic in a day. Due to business-logic reasons, we are using a
partition key which may not result in even distribution across all
partitions. It seems conceivable to me that during times of low
traffic some partitions may not see any events for some time.

Now, with no watermarking strategy, I believe we are running into the
problem described in
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources
where the Watermark cannot move forward. The docs recommend using a
Watermark strategy i.e. withIdleness which detect inputs as being idle
which will not hold the watermark back.

However, https://issues.apache.org/jira/browse/FLINK-28975 suggests
there is an issue with `withIdleness`. Am I right in understanding
that we cannot safely use `withIdleness` now in the Watermarking
strategy. Does this affect me? I am not doing any joins on the
streams, I just need the message to be sent to the sink, and no lag
building up.

Plan of action:

A. Use Watermarking strategy `withIdleness` on 1.15.2 (which is
affected by FLINK-28975).

OR

B. Upgrade to 1.15.3 (FLINK-28975 is fixed here) with a Watermarking
strategy `withIdleness`.

On Mon, Nov 7, 2022 at 1:05 PM Salva Alcántara  wrote:
>
> Hi Samuel,
>
> Maybe related to this https://issues.apache.org/jira/browse/FLINK-28975? See 
> also:
> - 
> https://stackoverflow.com/questions/72654182/flink-interval-join-datastream-with-kafkasource-drops-all-records
>
> I left a similar comment in your SO post.
>
> Regards,
>
> Salva
>
> On Mon, Nov 7, 2022 at 7:27 AM Samuel Chase  wrote:
>>
>> Hello,
>>
>> At work we are using Flink to store timers and notify us when they are
>> triggered. It's been working great over several versions over the
>> years. Flink 1.5 -> Flink 1.9 -> Flink 1.15.2.
>>
>> A few months ago we upgraded from Flink 1.9 to Flink 1.15.2. In the
>> process we had to upgrade all the Flink API code in our job to use the
>> new APIs.
>>
>> Our job code has a Kafka Source and a Kafka Sink. For our Source, we
>> are currently using `WatermarkStrategy.noWatermarks()`. It has been
>> running fine ever since we upgraded, but in the last few weeks we have
>> faced two outages.
>>
>> Configuration:
>>
>> 2 JobManager nodes
>> 5 TaskManager nodes (4 slots each)
>> Parallelism: 16
>> Source topic: 30 partitions
>> Using `setStartingOffsets(OffsetsInitializer.latest())` while
>> initializing the source.
>>
>> Outage #1
>>
>> Our monitoring system alerted us that lag is building up on one
>> partition (out of 30). We did not know of anything we could to do
>> jumpstart consumption on that partition other than by forcing a
>> reassignment. When the TaskManager service on the node to which the
>> partition was assigned was restarted, the lag reduced soon after.
>>
>> Outage #2
>>
>> Something similar happened again, but this time, lag was building up
>> on 9 (out of 30) partitions. Once again, we restarted the TaskManager
>> services on all the nodes, and it started consuming once again.
>>
>> We asked a question on SO,
>> https://stackoverflow.com/q/74272277/2165719 and was directed to ask
>> on the mailing list as well.
>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-182%3A+Support+watermark+alignment+of+FLIP-27+Sources
>>
>> In another post, https://stackoverflow.com/a/70101290/2165719 there is
>> a suggestion to use `WatermarkStrategy.withIdleness(...)`. Could this
>> help us?
>>
>> Any help/guidance here would be much appreciated.
>>
>> Thanks,


Re: flink cdc 全量拉取阶段, 会等所有的chunk 都拉取成功后, 才output到下游吗?

2022-11-07 Thread Guo Thompson
各个chunk 边拉取, 边emit到下游

郑 致远  于2022年11月4日周五 15:27写道:

> 请教大佬,  flink cdc 全量拉取阶段,  会等所有的chunk 都拉取成功后, 才output到下游吗?
>
> 还是说 各个chunk 边拉取, 边emit到下游?
>