回复:flink sql消费upsert-kafka源表如何指定从哪个位点开始消费?

2022-12-05 文章 JasonLee
hi


Upsert-kafka 不支持指定消费者位置,默认是从 earliest 位置开始消费的,你可以自己修改代码支持 scan.startup.mode 参数。


Best
JasonLee


 回复的原邮件 
| 发件人 | casel.chen |
| 发送日期 | 2022年12月5日 18:24 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | flink sql消费upsert-kafka源表如何指定从哪个位点开始消费? |
flink sql消费upsert-kafka源表如何指定从哪个位点开始消费?仿照kafka source表添加了 scan.startup.mode 
参数会报非法参数

回复:如何能正确的获取任务总写入量?

2022-11-24 文章 JasonLee
hi


你可以把之前的历史值保存下来,比如保存在状态里面或者第三方存储,任务重启后在加上之前的值。


Best
JasonLee


 回复的原邮件 
| 发件人 | 陈佳豪 |
| 发送日期 | 2022年11月24日 18:18 |
| 收件人 |  |
| 主题 | 如何能正确的获取任务总写入量? |
大佬们好
背景:
业务端想基于flink 
metrics获取到任务总写入量。正常情况下通过指numRecordsIn指标是可以获取到增长的写入量的,如果用户自己误操作删了目标表并且重新建表恢复了,这个时候上报的numRecordsIn指标就会是0这就导致无法准确的获取任务写入总量了。不知道各位大佬有没有碰到这种情况希望能得到一个解决方案!
 感谢。


回复:flink作业提交运行后如何监听作业状态发生变化?

2022-11-23 文章 JasonLee
Hi


可以通过 Flink 的 Metric 和 Yarn 的 Api 去获取任务的状态(任务提交到 yarn 的话)


Best
JasonLee


 回复的原邮件 
| 发件人 | casel.chen |
| 发送日期 | 2022年11月23日 08:32 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | flink作业提交运行后如何监听作业状态发生变化? |
请问flink作业提交运行后如何监听作业状态发生变化以便在控台上实时显示作业状态变更?目前我们的做法是轮询,但效率低,有没有listener可以进行注册的方法呢?

回复:Flink sql从ck恢复,统计数据波动问题

2022-10-10 文章 JasonLee
Hi



我理解应该是任务恢复的时候从上一次成功的 checkpoint 或者你指定的 checkpoint 里记录的 offset 
开始消费,所以此时的统计值应该是有短暂的下跌,因为数据相当于回复到之前重复计算了一部分。这个应该是符合预期的,可能需要在业务上做一些处理。


Best
JasonLee


 回复的原邮件 
| 发件人 | 天下五帝东 |
| 发送日期 | 2022年10月10日 13:34 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | Flink sql从ck恢复,统计数据波动问题 |
Hi:
各位大佬们,flink任务从状态恢复时,统计值有一个明显的下跌,等ck打完后,统计值才会上升,这种情况有什么好的解决方案吗?

回复:flink cdc + kafka场景下增加kafka分区数问题

2022-09-26 文章 JasonLee
Hi
跟重启作业没关系哈,你需要自定义写入 kafka 的分区策略。


Best
JasonLee


 回复的原邮件 
| 发件人 | casel.chen |
| 发送日期 | 2022年09月26日 23:21 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | flink cdc + kafka场景下增加kafka分区数问题 |
flink cdc 
消费mysql写到kafka场景下一开始数据量不大给的分区数可能只有3,后面业务数据量上来了需要添加分区数,例如12。那么问题来了,如何确保同一条记录的数据变更历史发到同一个kafka分区以确保下游消费的顺序性?重启作业好像也不能解决这个问题吧?

回复:如何实现flink作业失败告警功能

2022-07-18 文章 JasonLee
Hi



可以通过监控 numRestarts [1] metrics 发送相关的报警 


[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#availability
 


Best
JasonLee


 回复的原邮件 
| 发件人 | casel.chen |
| 发送日期 | 2022年07月18日 22:48 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | 如何实现flink作业失败告警功能 |
想实现flink作业一旦失败就立马告警功能,请问要如何实现?是否有Listener可以进行注册?

Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030

2022-07-14 文章 JasonLee
Hi



仔细看了一下日志,感觉还是 yarn 的配置问题,可以看下 yarn.resourcemanager.scheduler.address 配置的什么吗?在 
client 端连接 RM 的时候打印的日志是 2022-07-14 15:10:48,109 INFO  
org.apache.hadoop.yarn.client.RMProxy[] - Connecting to 
ResourceManager at /0.0.0.0:8030 这里的地址是不对的,正常应该是 
yarn.resourcemanager.scheduler.address:8030 但是日志里面是 0.0.0.0:8030。


Best
JasonLee


 Replied Message 
| From | lishiyuan0506 |
| Date | 07/14/2022 16:52 |
| To | user-zh@flink.apache.org |
| Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 |
hi


是的,这个是运行参数


/opt/flink-1.13.3/bin/flink run \
-t yarn-per-job \
-Dyarn.application.name=test_wordcount \
-Dparallelism.default=1 \
-Dtaskmanager.numberOfTaskSlots=1 \
-Djobmanager.memory.process.size=1024mb \
-Dtaskmanager.memory.process.size=2048mb \
/opt/flink-1.13.3/examples/streaming/WordCount.jar


| |
lishiyuan0506
|
|
lishiyuan0...@163.com
|


 Replied Message 
| From | JasonLee<17610775...@163.com> |
| Date | 07/14/2022 16:47 |
| To | user-zh@flink.apache.org |
| Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 |
Hi



这个是直接运行的 examples 里面的 demo 程序吗?


Best
JasonLee


 Replied Message 
| From | lishiyuan0506 |
| Date | 07/14/2022 16:25 |
| To | user-zh@flink.apache.org |
| Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 |
感谢感谢,可以看的,我添加到附件给您发过去


| |
lishiyuan0506
|
|
lishiyuan0...@163.com
|


 Replied Message 
| From | JasonLee<17610775...@163.com> |
| Date | 07/14/2022 16:15 |
| To | user-zh@flink.apache.org |
| Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 |
Hi


可以用 yarn logs -applicationId xxx 看下日志吗?


Best
JasonLee


 Replied Message 
| From | lishiyuan0506 |
| Date | 07/14/2022 15:43 |
| To | user-zh@flink.apache.org |
| Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 |
hello,根据您的思路,我查看了所有的yarn-site.xml,没有发现配置出错的地方,Spark和MR的运行都正常,感觉不是yarn的问题






| |
lishiyuan0506
|
|
lishiyuan0...@163.com
|


 Replied Message 
| From | Biao Geng |
| Date | 07/14/2022 15:37 |
| To |  |
| Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 |
hi
根据你发的描述,是Flink的job manager在非RM所在机器上启动时,由于尝试连接0.0.0.0:8030
端口去向YARN申请资源时连接不通,导致失败。你可以检查下集群内worker节点的hadoop配置,看看yarn.resourcemanager.hostname等配置是否设置正确。

Best,
Biao Geng

lishiyuan0506  于2022年7月14日周四 15:17写道:

您好,我之前做了一些尝试:
1. 测试Spark、MR任务正常
2. 将生产环境的Flink和官网干净的Flink安装后测试example,出现同样的问题
3.
尝试在`/opt/flink-1.13.3/bin/config.sh`中配置`DEFAULT_YARN_CONF_DIR="/opt/hadoop-3.1.4/etc/hadoop/"`,测试example出现同样的问题,说明Flink已经成功加载了`yarn-site.xml`
4.尝试在`/opt/flink-1.13.3/bin/config.sh`中配置`export
HADOOP_CONF_DIR=/opt/hadoop-3.1.4/etc/hadoop`,`export
HADOOP_CLASSPATH=`hadoop classpath`测试example出现同样的问题


目前无论是yarn还是flink都找不出来出现了什么问题,手足无措了
| |
lishiyuan0506
|
|
lishiyuan0...@163.com
|


 Replied Message 
| From | Biao Geng |
| Date | 07/14/2022 14:31 |
| To |  |
| Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 |
hi,

你提到跑wordcount.jar时,当作业被调度到RM所在的机器上可以正常运行,调度到非RM所在的机器上就失败。flink环境干净的话,那大概率还是hadoop的环境设置有问题。
有几个可以检查的地方:
1. 运行flink run之前或者在flink的bin/config.sh里是否有正确设置hadoop环境变量,例如 export
HADOOP_CLASSPATH=`hadoop classpath`
2.

集群内机器(比如非RM所在机器)的HADOOP_CONF_DIR是否设置正确,其指向的目录里的yarn-site.xml的yarn.resourcemanager.hostname等配置是否设置正确

Best,
Biao Geng

Biao Geng  于2022年7月14日周四 11:32写道:

hi,
你有试过提交flink

example(比如wordcount作业)吗?如果报了一样的错误的话,可以检查一下你的flink/lib目录下是否有放多余的YARN配置(比如运行grep
-irn "0.0.0.0" $FLINK_HOME);如果example可以成功提交,可以看看是不是你的作业jar里打进了错误的YARN配置。

Best,
Biao Geng

lishiyuan0506  于2022年7月14日周四 11:06写道:

您好,环境变量里面有,而且flink的conf.sh也设置了


lishiyuan0506
lishiyuan0...@163.com

<
https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=lishiyuan0506=lishiyuan0506%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22lishiyuan0506%40163.com%22%5D


 Replied Message 
From Yang Wang 
Date 07/14/2022 11:00
To user-zh 
Subject Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030
确认一下你是否正确设置了HADOOP_CONF_DIR环境变量

Best,
Yang

lishiyuan0506  于2022年7月14日周四 09:41写道:

打扰大家一下,请问一下各位在yarn提交flink的时候,有没有遇到过Retrying connect to server:
0.0.0.0/0.0.0.0:8030这个异常


hadoop的classpath没问题,Spark和MR在Yarn上跑也没问题,就flink有这样的问题


| |
lishiyuan0506
|
|
lishiyuan0...@163.com
|






Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030

2022-07-14 文章 JasonLee
Hi



这个是直接运行的 examples 里面的 demo 程序吗?


Best
JasonLee


 Replied Message 
| From | lishiyuan0506 |
| Date | 07/14/2022 16:25 |
| To | user-zh@flink.apache.org |
| Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 |
感谢感谢,可以看的,我添加到附件给您发过去


| |
lishiyuan0506
|
|
lishiyuan0...@163.com
|


 Replied Message 
| From | JasonLee<17610775...@163.com> |
| Date | 07/14/2022 16:15 |
| To | user-zh@flink.apache.org |
| Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 |
Hi


可以用 yarn logs -applicationId xxx 看下日志吗?


Best
JasonLee


 Replied Message 
| From | lishiyuan0506 |
| Date | 07/14/2022 15:43 |
| To | user-zh@flink.apache.org |
| Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 |
hello,根据您的思路,我查看了所有的yarn-site.xml,没有发现配置出错的地方,Spark和MR的运行都正常,感觉不是yarn的问题






| |
lishiyuan0506
|
|
lishiyuan0...@163.com
|


 Replied Message 
| From | Biao Geng |
| Date | 07/14/2022 15:37 |
| To |  |
| Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 |
hi
根据你发的描述,是Flink的job manager在非RM所在机器上启动时,由于尝试连接0.0.0.0:8030
端口去向YARN申请资源时连接不通,导致失败。你可以检查下集群内worker节点的hadoop配置,看看yarn.resourcemanager.hostname等配置是否设置正确。

Best,
Biao Geng

lishiyuan0506  于2022年7月14日周四 15:17写道:

您好,我之前做了一些尝试:
1. 测试Spark、MR任务正常
2. 将生产环境的Flink和官网干净的Flink安装后测试example,出现同样的问题
3.
尝试在`/opt/flink-1.13.3/bin/config.sh`中配置`DEFAULT_YARN_CONF_DIR="/opt/hadoop-3.1.4/etc/hadoop/"`,测试example出现同样的问题,说明Flink已经成功加载了`yarn-site.xml`
4.尝试在`/opt/flink-1.13.3/bin/config.sh`中配置`export
HADOOP_CONF_DIR=/opt/hadoop-3.1.4/etc/hadoop`,`export
HADOOP_CLASSPATH=`hadoop classpath`测试example出现同样的问题


目前无论是yarn还是flink都找不出来出现了什么问题,手足无措了
| |
lishiyuan0506
|
|
lishiyuan0...@163.com
|


 Replied Message 
| From | Biao Geng |
| Date | 07/14/2022 14:31 |
| To |  |
| Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 |
hi,

你提到跑wordcount.jar时,当作业被调度到RM所在的机器上可以正常运行,调度到非RM所在的机器上就失败。flink环境干净的话,那大概率还是hadoop的环境设置有问题。
有几个可以检查的地方:
1. 运行flink run之前或者在flink的bin/config.sh里是否有正确设置hadoop环境变量,例如 export
HADOOP_CLASSPATH=`hadoop classpath`
2.

集群内机器(比如非RM所在机器)的HADOOP_CONF_DIR是否设置正确,其指向的目录里的yarn-site.xml的yarn.resourcemanager.hostname等配置是否设置正确

Best,
Biao Geng

Biao Geng  于2022年7月14日周四 11:32写道:

hi,
你有试过提交flink

example(比如wordcount作业)吗?如果报了一样的错误的话,可以检查一下你的flink/lib目录下是否有放多余的YARN配置(比如运行grep
-irn "0.0.0.0" $FLINK_HOME);如果example可以成功提交,可以看看是不是你的作业jar里打进了错误的YARN配置。

Best,
Biao Geng

lishiyuan0506  于2022年7月14日周四 11:06写道:

您好,环境变量里面有,而且flink的conf.sh也设置了


lishiyuan0506
lishiyuan0...@163.com

<
https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=lishiyuan0506=lishiyuan0506%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22lishiyuan0506%40163.com%22%5D


 Replied Message 
From Yang Wang 
Date 07/14/2022 11:00
To user-zh 
Subject Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030
确认一下你是否正确设置了HADOOP_CONF_DIR环境变量

Best,
Yang

lishiyuan0506  于2022年7月14日周四 09:41写道:

打扰大家一下,请问一下各位在yarn提交flink的时候,有没有遇到过Retrying connect to server:
0.0.0.0/0.0.0.0:8030这个异常


hadoop的classpath没问题,Spark和MR在Yarn上跑也没问题,就flink有这样的问题


| |
lishiyuan0506
|
|
lishiyuan0...@163.com
|






Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030

2022-07-14 文章 JasonLee
Hi


可以用 yarn logs -applicationId xxx 看下日志吗?


Best
JasonLee


 Replied Message 
| From | lishiyuan0506 |
| Date | 07/14/2022 15:43 |
| To | user-zh@flink.apache.org |
| Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 |
hello,根据您的思路,我查看了所有的yarn-site.xml,没有发现配置出错的地方,Spark和MR的运行都正常,感觉不是yarn的问题






| |
lishiyuan0506
|
|
lishiyuan0...@163.com
|


 Replied Message 
| From | Biao Geng |
| Date | 07/14/2022 15:37 |
| To |  |
| Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 |
hi
根据你发的描述,是Flink的job manager在非RM所在机器上启动时,由于尝试连接0.0.0.0:8030
端口去向YARN申请资源时连接不通,导致失败。你可以检查下集群内worker节点的hadoop配置,看看yarn.resourcemanager.hostname等配置是否设置正确。

Best,
Biao Geng

lishiyuan0506  于2022年7月14日周四 15:17写道:

您好,我之前做了一些尝试:
1. 测试Spark、MR任务正常
2. 将生产环境的Flink和官网干净的Flink安装后测试example,出现同样的问题
3.
尝试在`/opt/flink-1.13.3/bin/config.sh`中配置`DEFAULT_YARN_CONF_DIR="/opt/hadoop-3.1.4/etc/hadoop/"`,测试example出现同样的问题,说明Flink已经成功加载了`yarn-site.xml`
4.尝试在`/opt/flink-1.13.3/bin/config.sh`中配置`export
HADOOP_CONF_DIR=/opt/hadoop-3.1.4/etc/hadoop`,`export
HADOOP_CLASSPATH=`hadoop classpath`测试example出现同样的问题


目前无论是yarn还是flink都找不出来出现了什么问题,手足无措了
| |
lishiyuan0506
|
|
lishiyuan0...@163.com
|


 Replied Message 
| From | Biao Geng |
| Date | 07/14/2022 14:31 |
| To |  |
| Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 |
hi,

你提到跑wordcount.jar时,当作业被调度到RM所在的机器上可以正常运行,调度到非RM所在的机器上就失败。flink环境干净的话,那大概率还是hadoop的环境设置有问题。
有几个可以检查的地方:
1. 运行flink run之前或者在flink的bin/config.sh里是否有正确设置hadoop环境变量,例如 export
HADOOP_CLASSPATH=`hadoop classpath`
2.

集群内机器(比如非RM所在机器)的HADOOP_CONF_DIR是否设置正确,其指向的目录里的yarn-site.xml的yarn.resourcemanager.hostname等配置是否设置正确

Best,
Biao Geng

Biao Geng  于2022年7月14日周四 11:32写道:

hi,
你有试过提交flink

example(比如wordcount作业)吗?如果报了一样的错误的话,可以检查一下你的flink/lib目录下是否有放多余的YARN配置(比如运行grep
-irn "0.0.0.0" $FLINK_HOME);如果example可以成功提交,可以看看是不是你的作业jar里打进了错误的YARN配置。

Best,
Biao Geng

lishiyuan0506  于2022年7月14日周四 11:06写道:

您好,环境变量里面有,而且flink的conf.sh也设置了


lishiyuan0506
lishiyuan0...@163.com

<
https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=lishiyuan0506=lishiyuan0506%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22lishiyuan0506%40163.com%22%5D


 Replied Message 
From Yang Wang 
Date 07/14/2022 11:00
To user-zh 
Subject Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030
确认一下你是否正确设置了HADOOP_CONF_DIR环境变量

Best,
Yang

lishiyuan0506  于2022年7月14日周四 09:41写道:

打扰大家一下,请问一下各位在yarn提交flink的时候,有没有遇到过Retrying connect to server:
0.0.0.0/0.0.0.0:8030这个异常


hadoop的classpath没问题,Spark和MR在Yarn上跑也没问题,就flink有这样的问题


| |
lishiyuan0506
|
|
lishiyuan0...@163.com
|






回复:flink sql解析kafka数据

2022-07-04 文章 JasonLee
Hi
解析嵌套 JSON 可以参考这篇文章哈,https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA


Best
JasonLee


 回复的原邮件 
| 发件人 | 小昌同学 |
| 发送日期 | 2022年06月30日 15:02 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | flink sql解析kafka数据 |
各位大佬  请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型  我这边想直接通过flink sql建表语句拿到最里面的字段的值
我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true'  但是我在客户端执行的时候  
发现识别不到这个字段  
有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛
CREATE TABLE ccc_test_20220630_2
(  
   trans_number   STRING,
   end_timestamp  STRING,
   return_flagSTRING,
   commodity_type STRING

   
)
   COMMENT '中台交易流水小票头' WITH (
   'connector' = 'kafka',
   'topic' = 'yh_rme_soc_stream_prod-tlog_header',
   'properties.bootstrap.servers' = '',
   'properties.group.id' = 'ccc_test_20220630_2',
   'properties.request.timeout.ms' = '6',
   'format' = 'json',
   'scan.startup.mode' = 'group-offsets',
   -- 'scan.startup.mode' = 'timestamp',
   -- 'scan.startup.timestamp-millis' = '165373920',
   'json.fail-on-missing-field' = 'false',
   'json.ignore-parse-errors' = 'true'
'json.infer-schema.flatten-nested-columns.enable'='true'
);


| |
小昌
|
|
ccc0606fight...@163.com
|

回复:Flink JobManager 节点 JVM Metaspace 过高

2022-06-10 文章 JasonLee
Hi Summer


你的图片挂了,可以把图片上传到图床,然后把链接贴在邮件里,另外可以把 JM 的日志也发一下吗?




Best
JasonLee


 回复的原邮件 
| 发件人 | Summer |
| 发送日期 | 2022年06月10日 18:15 |
| 收件人 | user-zh@flink.apache.org |
| 主题 | Flink JobManager 节点 JVM Metaspace 过高 |
使用 FinkUI 上传 Flink 任务 Jar 时,任务启动失败。 这时候JVM Metaspace就会异常增加。 这是什么原因?



















回复: flink sql 如何提高下游并发度?

2022-01-11 文章 JasonLee
hi


是 10 目前 source 还不支持单独设置并发度,但是 sink 是支持的,当然如果没有单独设置的话 sink 也是 10


Best
JasonLee


在2022年01月11日 16:52,RS 写道:
Hi,
请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10?
如果source是10的话,那还有7个线程就空闲了?



在 2022-01-11 11:10:41,"Caizhi Weng"  写道:
Hi!

可以设置 parallelism.default 为需要的并发数。

Jeff  于2022年1月9日周日 19:44写道:

当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?


回复: flink固定延迟重启策略没有延迟

2021-12-22 文章 JasonLee
Hi
日志里面应该可以看到这样的信息 Calculating tasks to restart to recover the failed task 。


Best
JasonLee


在2021年12月22日 11:08,宋品如 写道:
Hi:


谢谢回复,我明白了。还有一个问题:
日志里显示:Recovery is suppressed by 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, 
backoffTimeMS=3)
这说明我的任务已经重启了2次依然失败了吗?
日志里我看不到重启的记录,这个应该怎么确认呢?
这里的报错是第一次重启还是最后一次重启导致的呢?


谢谢!














在 2021-12-22 10:47:24,"Caizhi Weng"  写道:
Hi!

log 里的这些信息是同一个 job 里不同的并发分别 fail(可以从 2/3 和 3/3 这两个不同的并发号看出来),并不是说这个 job
fail 了两次。

宋品如  于2021年12月22日周三 10:14写道:

发件人: Song PinRu
发送时间: 2021年12月21日 15:19
收件人: user-zh@flink.apache.org
主题: flink固定延迟重启策略没有延迟

Hi:
昨天的邮件截图看不了,把日志贴上来重新发送一份
--

查看日志发现固定延迟重启策略似乎没有生效,我设置的是30s延迟重启2次,

但是日志显示的是在06:26:50这1秒内重启了2次都失败了,并最终导致任务失败,

我设置的延迟时间似乎完全没有生效,Flink版本是1.12.2。

有没有人能告诉我这是为什么?



设置重启策略的代码:
```

val env = StreamExecutionEnvironment.getExecutionEnvironment
val backend = new
FsStateBackend(CommonConfig.FLINK_STATEBACKEND_CHECKPOINT)
env.setStateBackend(backend)
// 每 3ms 开始一次 checkpoint
env.enableCheckpointing(3)
// 设置模式为精确一次 (这是默认值)

env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1)
// Checkpoint 必须在2分钟内完成,否则就会被抛弃
env.getCheckpointConfig.setCheckpointTimeout(12)
// 可容忍checkpoint失败次数
env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3)
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
//设置全局并行度
//  env.setParallelism(3)
//重启策略
//PS:默认策略会重启int最大值次,导致任务一直处于重启状态,checkpoint出现连续空文件夹,同时导致有效checkpoint无法使用


env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2,
Time.seconds(30)))
```






日志:
```

2021-12-21 06:26:50,850 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] - Source:
source -> note -> Sink: sink (2/3) (85ad9ee0f52f04c5709430a8c793817a)
switched from RUNNING to FAILED on
container_e1595_1638345947522_0010_01_03 @ pbj-cdh-20-72.optaim.com
(dataPort=35530).

org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
send data to Kafka: This server is not the leader for that topic-partition.

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392)
~[dws_module-1.0.4.jar:?]

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850)
~[dws_module-1.0.4.jar:?]

at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
~[dws_module-1.0.4.jar:?]

at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39)
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at
org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at
org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
~[flink-dist_2.11-1.12.2.jar:1.12.2]

at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
~[dws_module-1.0.4.jar:?]

at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsum

回复:实时读取hive参数不生效

2021-12-21 文章 JasonLee
hi


看上面似乎这个参数没有生效,你是在哪里设置的呢?


Best
JasonLee


在2021年12月21日 20:20,Fei Han 写道:
@all:
大家好!
我在实时读取hive的时候动态参数不生效,另外flink是否可以通过流读方式读取hive的普通表呢?
版本如下:
Flink版本1.13.3
Hive版本hive2.1.1-CDH6.2.0
设置的参数是set 'table.dynamic-table-options.enabled'='true'
报错如下:
INSERT INTO qhc_catalog.qhc_hms.qhc_ods_assassin_dept
select * from qhc_catalog.qhc_assassin_ods.assassin_dept /*+ 
OPTIONS('streaming-source.enable'='true', 
'streaming-source.partition-order'='create-time') */
2021-12-21 19:56:45,198 ERROR com.flink.streaming.core.JobApplication   
   [] - 任务执行失败:
org.apache.flink.table.api.ValidationException: The 'OPTIONS' hint is allowed 
only when the config option 'table.dynamic-table-options.enabled' is set to 
true.
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createFinalCatalogTable(CatalogSourceTable.java:104)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:79)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) 
~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2140)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:603)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:272)
 ~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) 
~[flink-table-blink_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:51)
 ~[flink-table_2.12-1.13.3.jar:1.13.3]
at com.flink.streaming.core.execute.ExecuteSql.exeSql(ExecuteSql.java:38) 
~[flink-streaming-core-1.3.0.RELEASE.jar:1.3.0.RELEASE]
at com.flink.streaming.core.JobApplication.main(JobApplication.java:80) 
~[flink-streaming-core-1.3.0.RELEASE.jar:1.3.0.RELEASE]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_211]
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
~[?:1.8.0_211]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_211]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_211]
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
 ~[flink-dist_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) 
~[flink-dist_2.12-1.13.3.jar:1.13.3]
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) 
~[flink-dist_2.12-1.13.3.jar:1.13.3

Re:How to run SQL Client by Per-Job

2021-11-29 文章 JasonLee
Hi


设置一下 execution.target: yarn-per-job 就行了 


Best
JasonLee


On 11/29/2021 18:48, wrote:

 

Hi

   Can SQL Client run a job by Per-Job ?

   在网上看到曾庆东同学分享过《Flink SQL CDC 上线!我们总结了 13 
条生产实践经验》,提到使用per-job方式提交,如下图,但我在官网上看到只支持Session方式,求解正确的使用方式

Thx

wangz...@163.com

回复: Flink工程停止问题

2021-11-28 文章 JasonLee
Hi


应该是你的版本比较低,新版本是可以用的


Best
JasonLee


在2021年11月29日 11:35,Mabin 写道:
这个参数我试过了,没用的

发自我的iPhone

在 2021年11月18日,上午11:51,zhisheng  写道:

web.cancel.enable: false

web.cancel.enable 这个参数可以控制是否显示那个取消按钮

Caizhi Weng  于2021年11月16日周二 下午3:53写道:

Hi!

Flink 本身不自带安全机制,需要通过外部系统完成访问限制。

疾鹰击皓月 <1764232...@qq.com.invalid> 于2021年11月16日周二 下午2:57写道:

您好

Flink

WebUI的左上角有一个cancel按钮,通过按钮可以停止Flink工程。但这会导致一定的权限问题。我们希望只有特定人员可以停止Flink工程,请问有没有方法可以让那个停止按钮不生效或者不显示呢?



回复:Flink1.12 Streaming 消费kafka

2021-11-08 文章 JasonLee
hi


可以使用 setPartitions 方法 
具体参考官网: 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#topic-partition-subscription


Best
JasonLee
在2021年11月8日 17:06,guanyq 写道:
请大佬指导下:
flink streaming可以指定partition消费kafka么
如有100个partition,但是我只想消费15partiton。











??????flink on yarn ??pre_job????????,????session????????????

2021-11-04 文章 JasonLee
hi


?? jar ??Flink ??


Best
JasonLee
??2021??11??4?? 18:41<2572805...@qq.com.INVALID> ??
yarn??:
org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to 
initialize the cluster entrypoint YarnJobClusterEntrypoint.   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569)
   at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:91)
 Caused by: java.lang.VerifyError: class 
org.apache.flink.yarn.YarnResourceManager overrides final method 
onStop.()Ljava/util/concurrent/CompletableFuture;   at 
java.lang.ClassLoader.defineClass1(Native Method)   at 
java.lang.ClassLoader.defineClass(ClassLoader.java:763)   at 
java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)   at 
java.net.URLClassLoader.defineClass(URLClassLoader.java:467)   at 
java.net.URLClassLoader.access$100(URLClassLoader.java:73)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:368)   at 
java.net.URLClassLoader$1.run(URLClassLoader.java:362)   at 
java.security.AccessController.doPrivileged(Native Method)   at 
java.net.URLClassLoader.findClass(URLClassLoader.java:361)   at 
java.lang.ClassLoader.loadClass(ClassLoader.java:424)   at 
sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338)   at 
java.lang.ClassLoader.loadClass(ClassLoader.java:357)   at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:54)
   at 
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:38)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:231)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178)
   at java.security.AccessController.doPrivileged(Native Method)   at 
javax.security.auth.Subject.doAs(Subject.java:422)   at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
   at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
   at 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175)
   ... 2 common frames omitted

Diagnostics:
  Application application_1635998548270_0028 failed 1  times (global limit 
=2; local limit is =1) due to AM Container for  
appattempt_1635998548270_0028_01 exited with  exitCode: 1   
For more detailed output, check the application  tracking page:  
http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028  Then 
click on links to logs of each attempt.   
Diagnostics: Exception from container-launch.   
Container id: container_e391_1635998548270_0028_01_01   
Exit code: 1   
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:944)   
at org.apache.hadoop.util.Shell.run(Shell.java:848)   
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1142)   

at  
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:237)
   
at  
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317)
   
at  
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83)
   
at java.util.concurrent.FutureTask.run(FutureTask.java:266)   
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
  
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
  
at java.lang.Thread.run(Thread.java:748)   


Container exited with a non-zero exit code 1   
Failing this attempt. Failing the application.   

??




??????????

2021-10-24 文章 JasonLee
Hi


?? user-zh-unsubscr...@flink.apache.org 


Best
JasonLee


??2021??10??25?? 10:52??zdj<1361776...@qq.com.INVALID> ??


??????flink-1.14 ???? kafkasource ????watermark????

2021-10-11 文章 JasonLee
Hi


, wm > window.end_time 
,?? wm 
,


Best
JasonLee


??2021??10??12?? 11:26??kcz<573693...@qq.com.INVALID> ??
 
times??+20??StreamExecutionEnvironment
 env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource

?????? flink sql????????????????sink table?

2021-09-23 文章 JasonLee
Hi


,  SQL  SQL 
??,??


Best
JasonLee


??2021??09??23?? 09:28 ??
sql??sql??

iPhone


--  --
??: 2572805166 <2572805...@qq.com.INVALID
: 2021??9??23?? 09:23
??: user-zh https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like
 casel.chen ??2021??9??18?? 8:27?? kafka 
topic??topic)
 ??flink sqlsink table

回复:退订

2021-09-23 文章 JasonLee
Hi


退订应该发送到 user-zh-unsubscr...@flink.apache.org 



Best
JasonLee


在2021年09月23日 15:20,wangjingen<13033709...@163.com> 写道:
退订

??????????

2021-09-23 文章 JasonLee
Hi


?? user-zh-unsubscr...@flink.apache.org 



Best
JasonLee


??2021??09??23?? 21:22??Night_xing<1029681...@qq.com.INVALID> ??


回复:退订

2021-09-23 文章 JasonLee
Hi


退订应该发送到 user-zh-unsubscr...@flink.apache.org 



Best
JasonLee


在2021年09月23日 15:26,金圣哲 写道:
退订

回复: flink sql是否支持动态创建sink table?

2021-09-22 文章 JasonLee
hi


这个我理解在 SQL 任务里面目前是没办法做到的 在 datastream api 任务是可以的


Best
JasonLee
在2021年9月22日 11:35,酷酷的浑蛋 写道:
我也有这个需求,意思就是topic里实时新增了一种日志,然后想动态创建对应新的日志的topic表,并写入到新的topic表,在一个任务中完成


| |
apache22
|
|
apach...@163.com
|
签名由网易邮箱大师定制


在2021年09月22日 11:23,Caizhi Weng 写道:
Hi!

不太明白这个需求,但如果希望发送给不同的 topic,需要给每个 topic 都定义 DDL。

如果是因为各 topic 之间的 schema 重复度比较高,只有些许字段以及 topic 名称等不同,可以看一下 DDL LIKE 语法:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like

casel.chen  于2021年9月18日周六 上午8:27写道:

上游kafka topic消息带有一个用户类型字段,现在想根据不同用户类型将数据发到不同topic(为了扩展不想写死有哪些类型) ,请问flink
sql支持动态创建sink table吗?


回复:flink消费kafka分区消息不均衡问题

2021-09-22 文章 JasonLee
hi 


图片看不到 我猜大概有两种情况 第一种是你的 source 本身就存在数据倾斜 某几个分区的数据量比其他分区的多 需要修改数据写入 kafka 
分区策略让数据尽量均匀 第二种是你的下游计算的时候出现数据倾斜(或其他原因)导致任务反压到 source 端 这种情况需要根据实际的情况采用不同的解决方案 
单纯的增加并发和改变 slot 数量没有什么效果


Best
JasonLee
在2021年9月22日 09:22,casel.chen 写道:
kafka topic有32个分区,实时作业开了32个并行度消费kafka 
topic,现在监控发现部分分区消息积压严重(如下图所示),请问会有哪些原因造成的?有什么解决办法吗?扩大分区数是不是也不能根治这种情况?
PS: 
每个分区消息数的确有所不均,但是同样消息数的几个分区也会出现积压不同情况(如15,16,17,18)。会是因为节点带宽受限造成的吗?当前numberOfSlots=8,改成numberOfSlots=1会有效果么?


|
分区 ID
|
客户端
|
最大位点
|
消费位点
|
堆积量
|
|
0
|
n/a
|
14,131,397
|
14,130,923
|
474
|
|
1
|
n/a
|
14,191,455
|
14,189,396
|
2,059
|
|
2
|
n/a
|
14,611,826
|
14,610,262
|
1,564
|
|
3
|
n/a
|
15,340,150
|
15,335,944
|
4,206
|
|
4
|
n/a
|
16,379,487
|
16,372,237
|
7,250
|
|
5
|
n/a
|
17,696,565
|
17,639,308
|
57,257
|
|
6
|
n/a
|
19,200,829
|
19,129,856
|
70,973
|
|
7
|
n/a
|
20,889,954
|
20,888,652
|
1,302
|
|
8
|
n/a
|
22,643,539
|
22,536,468
|
107,071
|
|
9
|
n/a
|
24,440,881
|
24,439,357
|
1,524
|
|
10
|
n/a
|
26,178,250
|
26,073,197
|
105,053
|
|
11
|
n/a
|
27,828,497
|
27,670,732
|
157,765
|
|
12
|
n/a
|
29,284,463
|
29,283,105
|
1,358
|
|
13
|
n/a
|
30,526,020
|
29,781,704
|
744,316
|
|
14
|
n/a
|
31,468,482
|
31,467,243
|
1,239
|
|
15
|
n/a
|
32,084,198
|
31,467,610
|
616,588
|
|
16
|
n/a
|
32,393,752
|
32,019,836
|
373,916
|
|
17
|
n/a
|
32,302,065
|
32,141,999
|
160,066
|
|
18
|
n/a
|
31,875,063
|
31,874,452
|
611
|
|
19
|
n/a
|
31,137,894
|
31,002,867
|
135,027
|
|
20
|
n/a
|
30,098,926
|
29,930,855
|
168,071
|
|
21
|
n/a
|
28,739,235
|
28,603,509
|
135,726
|
|
22
|
n/a
|
27,221,026
|
27,220,821
|
205
|
|
23
|
n/a
|
25,514,265
|
25,382,536
|
131,729
|
|
24
|
n/a
|
23,779,714
|
23,689,296
|
90,418
|
|
25
|
n/a
|
21,981,307
|
21,981,267
|
40
|
|
26
|
n/a
|
20,237,925
|
20,223,880
|
14,045
|
|
27
|
n/a
|
18,606,490
|
18,606,387
|
103
|
|
28
|
n/a
|
17,178,098
|
17,177,971
|
127
|
|
29
|
n/a
|
15,972,292
|
15,972,105
|
187
|
|
30
|
n/a
|
15,032,355
|
15,032,138
|
217
|
|
31
|
n/a
|
14,426,366
|
14,425,462
|
904
|

回复:退订

2021-09-13 文章 JasonLee
Hi


退订应该发送到 user-zh-unsubscr...@flink.apache.org 


Best
JasonLee


在2021年09月14日 11:46,abel0130 写道:
退订

回复:flink on native k8s 无法查看火焰图问题

2021-09-13 文章 JasonLee
Hi


这个配置默认是关闭的,因为对性能有一定影响,具体的配置可以参考官网 
https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#advanced-options-for-the-rest-endpoint-and-client


Best
JasonLee


在2021年09月14日 11:20,赵旭晨 写道:
Unable to load requested file /jobs/d2fcac59f4a42ad17ceba8c5371862bb/。。


请问这是什么原因啊,恳请大佬解惑
镜像版本flink:1.13.2-scala_2.12-java8




 

回复:Flink on yarn的日志监控和checkpoint的监控生产是如何处理的?

2021-08-31 文章 JasonLee
Hi


可以参考这两篇文章:
https://mp.weixin.qq.com/s/2S4M8p-rBRinIRxmZrZq5Q 
https://mp.weixin.qq.com/s/44SXmCAUOqSWhQrNiZftoQ


Best
JasonLee


在2021年08月31日 13:23,guanyq 写道:
flink on yarn 在集群中启动很多的task,生产应用中是如何监控task的日志,和checkpoint的呢?


求大佬指导。

??????flink keyby????

2021-08-30 文章 JasonLee
hi


 KeyGroupStreamPartitioner#selectChannel .


Best
JasonLee
??2021??8??30?? 22:34??cs<58683...@qq.com.INVALID> ??
flink??keybykey??tasktask??id

回复: Flink任务假死;无限100%反压;但下游节点无压力。

2021-08-25 文章 JasonLee
Hi


可以发一下任务的 DAG 吗 


Best
JasonLee


在2021年08月26日 13:09,yidan zhao 写道:
补充了个附录(https://www.yuque.com/sixhours-gid0m/ls9vqu/rramvh
)正常任务和异常任务的window算子的FlameGraph,不清楚是否有参考价值。

yidan zhao  于2021年8月26日周四 下午1:01写道:

目前来看,我运行6小时,window总计才收到200MB数据,这个数据量级相比我很多小到没有一样。所以很难想象反压的原因是啥究竟。

目前来看反压节点的outPoolUsage是1,看起来合理,因为处于100%反压。
下游节点的inPoolUsage却是0,这个也很奇怪,同时下游buzz和backpress都是0%.



Shengkai Fang  于2021年8月26日周四 下午12:33写道:

- 得看一下具体的卡死的节点的栈,分析下具体的工作任务才知道。
- 日志中有包含错误的信息吗?

Best,
Shengkai

yidan zhao  于2021年8月26日周四 下午12:03写道:

可能存在机器压力倾斜,但是我是不太清楚这种现象的原因,直接停滞了任务?

东东  于2021年8月26日周四 上午11:06写道:

建议检查一下是否有数据倾斜


在 2021-08-26 10:22:54,"yidan zhao"  写道:
问题期间的确ckpt时间较长。
但是,这个任务正常ckpt时间才不到1s,ckpt大小也就21MB,所以也很难说ckpt为啥会超时,我超时设置的2min。

Caizhi Weng  于2021年8月26日周四 上午10:20写道:

Hi!

从图中情况来看很可能是因为下游 checkpoint 时间过长导致反压上游。是否观察过 checkpoint 的情况?

yidan zhao  于2021年8月26日周四 上午10:09写道:

如题,这个问题以前遇到过,后来发生频率低了,近期又多了几次,下面是具体的话题讨论,email不方便贴图。

语雀:https://www.yuque.com/sixhours-gid0m/ls9vqu/rramvh








回复:退订

2021-08-20 文章 JasonLee
Hi


退订应该发送到 user-zh-unsubscr...@flink.apache.org 


Best
JasonLee


在2021年08月21日 09:43,牛成 写道:
退订

??????kafka appender????

2021-08-03 文章 JasonLee
Hi


 error ? ?? CPU ??  IO 
?


Best
JasonLee


??2021??08??4?? 12:25??datafollower<609326...@qq.com.INVALID> ??
hi allflink??5000+??error??kafka?? 
log4j2 ??kafkaappender
??error??kafka??

1.kafka appender??error log10
2.error??kafka

回复: 如何从复杂的kafka消息体定义 table

2021-07-08 文章 JasonLee
Hi


事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了.


Best
JasonLee


在2021年07月9日 10:06,Chenzhiyuan(HR) 写道:
消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下:

CREATE TABLE MyUserTable(
APPLY_PERSON_ID VARCHAR,
UPDATE_SALARY DECIMAL,
UP_AMOUNT DECIMAL,
CURRENCY VARCHAR,
EXCHANGE_RATE DECIMAL
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'

接下来直接查询每个字段的值:
Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, 
CURRENCY, EXCHANGE_RATE from MyUserTable ");

请教下这个该如何定义DDL.



发件人: 17610775726 [mailto:17610775...@163.com]
发送时间: 2021年7月9日 9:26
收件人: Chenzhiyuan(HR) 
主题: 回复:如何从复杂的kafka消息体定义 table

hi

用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA

Best
JasonLee
 回复的原邮件 
发件人

Chenzhiyuan(HR)<mailto:zhiyuan.c...@huawei.com>

发送日期

2021年07月09日 08:59

收件人

user-zh@flink.apache.org<mailto:user-zh@flink.apache.org>

主题

如何从复杂的kafka消息体定义 table

大家好:
我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type.
如果json, avro不能满足的话,是不是得自己自定义一个。
自定义的话不知道如何写,请各位帮忙指教下。

定义的表如下:
CREATE TABLE MyUserTable(
uuid VARCHAR,
orgId VARCHAR
) with (
'connector.type' = 'kafka',
'connector.version' = '0.11',
'connector.topic' = 'topic_name',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = '?'
)


Kafka的消息体如下, 好像不符合avro之类的标准格式:

{
"beforeData": [],
"byteSize": 272,
"columnNumber": 32,
"data": [{
"byteSize": 8,
"columnName": "APPLY_PERSON_ID",
"rawData": 10017,
"type": "LONG"
}, {
"byteSize": 12,
"columnName": "UPDATE_SALARY",
"rawData": "11000.00",
"type": "DOUBLE"
}, {
"byteSize": 11,
"columnName": "UP_AMOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 3,
"columnName": "CURRENCY",
"rawData": "CNY",
"type": "STRING"
}, {
"byteSize": 32,
"columnName": "EXCHANGE_RATE",
"rawData": "1.00",
"type": "DOUBLE"
},  {
"byteSize": 11,
"columnName": "DEDUCTED_ACCOUNT",
"rawData": "1000.00",
"type": "DOUBLE"
}, {
"byteSize": 1,
"columnName": "ENTER_AT_PROCESS",
"rawData": "Y",
"type": "STRING"
}],
"dataCount": 0,
"dataMetaData": {
"connector": "mysql",
"pos": 1000368076,
"row": 0,
"ts_ms": 1625565737000,
"snapshot": "false",
"db": "testdb",
"table": "flow_person_t"
},
"key": "APPLY_PERSON_ID",
"memorySize": 1120,
"operation": "insert",
"rowIndex": -1,
"timestamp": "1970-01-01 00:00:00"
}


回复:如何将canal json格式数据按操作类型过滤

2021-07-07 文章 JasonLee
hi


最后一个字段 type 就是操作的类型, 过滤掉 DELETE 就行了.


Best
JasonLee
在2021年7月7日 22:43,casel.chen 写道:
使用场景:我们使用canal将mysql binlog输出到kafka,然后想通过flink消费kafka数据过滤掉 delete 
操作的数据插入到文件系统,因为要做历史数据存档用。
查了下官网 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/canal/#available-metadata
{"data":[{"id":"111","name":"scooter","description":"Big 2-wheel 
scooter","weight":"5.18"}],"database":"inventory","es":158937356,"id":9,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.15"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products","ts":1589373560798,"type":"UPDATE"}
CREATETABLEKafkaTable(origin_databaseSTRINGMETADATAFROM'value.database'VIRTUAL,origin_tableSTRINGMETADATAFROM'value.table'VIRTUAL,origin_sql_typeMAPMETADATAFROM'value.sql-type'VIRTUAL,origin_pk_namesARRAYMETADATAFROM'value.pk-names'VIRTUAL,origin_tsTIMESTAMP(3)METADATAFROM'value.ingestion-timestamp'VIRTUAL,user_idBIGINT,item_idBIGINT,behaviorSTRING)WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','value.format'='canal-json');
只能获取到原始 database, table, sql-type, pk-names, ingestion-timestamp 字段,而拿不到代表操作类型的 
type 字段。请问有什么别的办法么?






??????flink1.13.1 webUI??????????????

2021-07-04 文章 JasonLee
Hi


??,??, 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/


Best
JasonLee


??2021??07??5?? 12:08??lonely Wanderer<609326...@qq.com.INVALID> ??
hi all
??flink1.13.1webUI
??{"errors":["Unable to load requested file 
/jobs/b207b23893328639ee15e76f6320fe29/vertices/c27dcf7b54ef6bfd6cff02ca8870b681/flamegraph."]}
state code : 404


Re: Re:Re: Re: Re:Re: flink sql job 提交到yarn上报错

2021-06-19 文章 JasonLee
hi

先执行一下 export HADOOP_CLASSPATH=`hadoop classpath` 就可以了



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 退订

2021-06-19 文章 JasonLee
hi

退订发邮件到 user-zh-unsubscr...@flink.apache.org 就可以了




-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:flinksql消费kafka写入doris中文乱码

2021-06-17 文章 JasonLee
hi


你可以先用 print 的 connector 把消费到的数据打印一下看是否乱码? 还是写入到 doris 后出现的乱码?


Best
JasonLee
在2021年6月17日 21:31,maker_d...@foxmail.com 写道:
我使用flinksql消费kafka并将数据写入doris,但出现中文乱码。

SQL如下:

CREATE TABLE `datacollect_business_kafka` (
`id` varchar(36),
`chain_id` varchar(36),
`app_id` varchar(32) ,
...
CHARACTER SET `UTF-8`
) WITH (
'connector' = 'kafka',
'topic' = 'datacollect_business_stage',
'properties.bootstrap.servers' = 'XXX',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);

CREATE TABLE `datacollect_business_doris` (
`id` varchar(36),
`chain_id` varchar(36),
`app_id` varchar(32) ,
...
CHARACTER SET `UTF-8`
) WITH (
'connector' = 'doris',
'fenodes' = 'XXX',
'table.identifier' = 'stage_datacollect.datacollect_business',
'username' = 'XXX',
'password' = 'XXX',
'sink.batch.size' = '1'
);

insert into datacollect_business_doris select * from datacollect_business_kafka;


在网上查找信息,flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8”

flink版本:1.12.4
部署模式:on yarn

希望各位大佬帮助。
谢谢!


maker_d...@foxmail.com


Re: Re:Re:flink sql cdc数据同步至mysql

2021-06-11 文章 JasonLee
hi

sink 端可以通过 sink.parallelism 进行设置.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Checkpoint时内存不够TaskManager被Kill掉

2021-06-10 文章 JasonLee
hi

增大一下 taskmanager.memory.jvm-overhead 的内存试试



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink sql的kafka source的开始消费offset相关问题。

2021-06-06 文章 JasonLee
hi

那你只需要设置从 latest-offset 开始消费,并且禁用 checkpoint 就行了,至于重启的次数,可以通过 metrics 中的
numRestarts 去获取.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink sql的kafka source的开始消费offset相关问题。

2021-06-03 文章 JasonLee
hi

sql 也是会从上一次成功的 checkpoint 中保存的 offset 位置开始恢复数据的.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink checkpoint 速度很慢 问题排查

2021-06-03 文章 JasonLee
hi

source 端的并发保持和 partition 的个数一样就行了,不要大于 partition 个数,因为这会导致 subtask
空跑,浪费资源,你只需要把 map 的并行度调大即可.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink checkpoint 速度很慢 问题排查

2021-06-03 文章 JasonLee
hi

你理解的可能有点偏差,应该是因为任务出现了反压或者数据倾斜的问题导致了cp时间长,01消息堆积说明已经反压到source端了,需要先定位反压的位置,看是具体什么原因导致的,然后再根据情况解决.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.12 on yarn WebUI不显示logs

2021-06-02 文章 JasonLee
hi

有改过默认的日志配置文件吗?



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.13 通过sql cli执行hdfs上面的SQL文件

2021-05-30 文章 JasonLee
hi

目前还不支持HDFS路径,只支持本地的文件,未来应该会支持.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 流与流 left join

2021-05-27 文章 JasonLee
Hi
可以看下 interval join 是否能满足你的需求



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: ./sql-client.sh embedded 这种方式提交的flink任务怎么设置state以及checkpoint?

2021-05-11 文章 JasonLee
hi

直接在 flink-conf.yaml 文件里面配置就行了



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:解析kafka 非标准JSON问题

2021-04-27 文章 JasonLee
hi

SQL 可以定义一个字段然后分隔再去获取 JSON 数据 或者可以自定义 UDF 去处理



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL Metrics中Kafka Offset请教

2021-04-25 文章 JasonLee
hi

currentOffsets: 表示的是每个分区的使用者当前读偏移量 , committedOffsets:
表示的是最后一次成功提交到Kafka的偏移量 ,因为 kafka 的 partition 的 offset 是从 0开始的 所以
committedOffsets 会比 currentOffsets 大 1



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink如何从流中取出自定义的数据结构并赋值给变量

2021-04-25 文章 JasonLee
hi

你可以用 filter 过滤出多个流或者用测流输出的方式分流处理



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 提交FlinkSQLKafka表报异常cannt load user class

2021-04-25 文章 JasonLee
hi

从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink run命令是否支持读取远程文件系统中的jar文件?

2021-04-22 文章 JasonLee
hi 

session ,per-job 模式是不支持的 application 模式是支持的 



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.12.2 StreamingFileSink 问题

2021-04-18 文章 JasonLee
hi

可以参考这篇文章: https://mp.weixin.qq.com/s/HqXaREr_NZbZ8lgu_yi7yA



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql 写hdfs问题

2021-04-15 文章 JasonLee
hi

你需要添加下面两个参数:
'csv.line-delimiter'='',
'csv.disable-quote-character'='true'



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点

2021-04-14 文章 JasonLee
hi

你需要使用 Temporal Table Join 的语法,具体操作可以参考官网
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink -conf.yaml修改

2021-04-12 文章 JasonLee
hi

如果是 session 模式需要重启集群,如果是 per-job 模式直接提交任务即可.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink 1.12.2 sql api 使用parquet格式报错

2021-04-04 文章 JasonLee
hi

可以参考下这篇文章: https://mp.weixin.qq.com/s/HqXaREr_NZbZ8lgu_yi7yA



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-sql 客户端采用execution.target: yarn-per-job 模式,如何指定提交的队列??

2021-04-02 文章 JasonLee
hi

可以通过在 flink-conf.yaml 配置文件中添加 yarn.application.queue 参数来设置 




-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: interval join 如何用 process time

2021-03-17 文章 JasonLee
hi

proctime as PROCTIME() 可以这样设置



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于kafka中csv格式数据字段分隔符请教

2021-02-02 文章 JasonLee
hi

1,11 也是支持的



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Catalog(Kafka Connectors 的ddl)持久化到hive metastore,groupid一样的问题

2021-01-30 文章 JasonLee
hi

社区以及提供了动态修改表属性的功能,具体使用可以参考 https://mp.weixin.qq.com/s/nWKVGmAtENlQ80mdETZzDw



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于端到端的延迟监控

2021-01-30 文章 JasonLee
hi

可以在消费 kafka 的时候获取到数据进入 kafka 的时间戳 然后在最终的 sink 的时候再获取一个时间戳 自己定义一个 metric
上报最终的耗时



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: kafka 所有分区无数据的情况下,导致watermark无法前进

2021-01-30 文章 JasonLee
hi 

watermark 是要根据数据里面的时间戳生成的 所有分区都没有数据的情况下 为什么还要 watermark 推进呢?



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL csv格式分隔符设置失败

2021-01-27 文章 JasonLee
hi

改成下面这样:

\n => U&'\000A'   

\t => U&'\0009'



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: flink sql读kafka元数据问题

2021-01-15 文章 JasonLee
hi

通过 headers 这种方式是可以获取到的 jark 大佬说的那种方式我还没有测试



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql读kafka元数据问题

2021-01-12 文章 JasonLee
hi

你写入数据的时候设置 headers 了吗 没设置的话当然是空的了



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink-sql 元数据问题

2020-12-11 文章 JasonLee
hi
Flink SQL 建的表支持用 hive 的 catalog 来管理元数据,是否可以满足你的需求 ?



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL 1.11支持将数据写入到Hive吗?

2020-12-11 文章 JasonLee
hi
可以参考下这篇文章 
https://mp.weixin.qq.com/s/HqXaREr_NZbZ8lgu_yi7yA



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink SQL使用Tumble窗口函数报NoSuchMethodError functions/AggregateFunction 异常

2020-12-03 文章 JasonLee
hi

从报错信息看应该jar包冲突了,可以贴一下相关的依赖包吗



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-1.11 使用 application 模式时 jobid 问题

2020-11-13 文章 JasonLee
hi
可以参考一下这篇文章: https://mp.weixin.qq.com/s/S_Spm88eDtbza1QoLKiWlg



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 JasonLee
hi

把 -d 参加加上用分离方式启动 应该就可以了 



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 JasonLee
hi

从你的描述看确实起的是per-job模式,per-job模式目前应该是没有这个问题的.可以再看下你的UI上execution.attached
的值是什么吗? 再有启动任务的时候是否加了 -d 参数



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink与Yarn的状态一致性问题

2020-11-12 文章 JasonLee
hi
1,首先确定你提交的是per-job模式吗?
2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了?




-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink tm cpu cores设置

2020-11-04 文章 JasonLee
hi 设置yarn.containers.vcores这个参数就可以了



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 使用BroadcastStream后checkpoint失效

2020-11-03 文章 JasonLee
hi
checkpoint目前只能用在流数据上,你读取的mongo数据是一个有界的数据源,所以是不支持做checkpoint就会导致整个任务的checkpoint失败,你可以把读取mongo做一个定时读取,这样应该就可以完成checkpoint.



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink获取latencymarker有什么好的方法

2020-09-26 文章 JasonLee
hi

LatencyMarker 是一个全链路的延迟 不是非常的准确 不过也能大致反应端到端的延迟情况



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以

2020-09-18 文章 JasonLee
HI

我理解你的 kafka 生产数据的速度比较慢 你并发设置的再大都是没有用的 正常 source 的并行度设置和 kafka 的 partition
个数相等就可以了 



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink Plan Visualizer

2020-09-09 文章 JasonLee
hi

Flink plan visualizer 应该是只能画stream graph stream graph 是一个逻辑上的 DAG 图
你把任务提交到集群上 在 Flink WEB UI 上面就可以看到 job graph 了 stream graph 和 job graph 的区别是
job graph 优化了 operator chain 
job graph 是在调用 env.execute 方法之后才生成的



-
Best Wishes
JasonLee
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink checkpoint导致反压严重

2020-08-31 文章 JasonLee
hi

我理解应该是数据倾斜的问题导致的 可以看下采用加随机数的方式key是否分布的均匀.



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re:Re: Flink SQL 问题;

2020-08-28 文章 JasonLee
hi

把Flink-jdbc的包放到flink/lib下面再跑下



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.11读取kafka问题

2020-08-27 文章 JasonLee
hi

首先是确认一下kafka是否有数据写入,其次把所有的operator都看下是否有反压的情况



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 如何设置FlinkSQL并行度

2020-08-23 文章 JasonLee
hi
checkpoint savepoint的问题可以看下这个
https://mp.weixin.qq.com/s/Vl6_GsGeG0dK84p9H2Ld0Q



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: kafka分区数扩容对flink任务的影响

2020-08-19 文章 JasonLee
hi
Flink是可以感知到partition的增加的
消费kafka的时候设置一下KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS这个参数 大于0就可以了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql 如何指定之前的checkpoint执行

2020-08-18 文章 JasonLee
hi
可以参考这篇文章https://mp.weixin.qq.com/s/Vl6_GsGeG0dK84p9H2Ld0Q
在cancel的时候触发一个savepoint 修改完SQL从savepoint恢复任务



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 请教大佬一个在flink调用kafka数据源时'scan.startup.mode'参数的使用问题

2020-08-14 文章 JasonLee
hi

参数是这么写的没错 'scan.startup.mode' = 'earliest-offset' 你确定你是用的新的groupid吗
我这里测试是可以的从头开始消费的 不知道是不是你测试的方法不对



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 用hive streaming写 orc文件的问题

2020-08-14 文章 JasonLee
hi

我这边测试了ORC的,只需要把stored as pauquet 改成stored as
orc即可,success文件能生成,hive里面也能查看数据,但是有一个问题是,Flink Web UI上面显示的数据量是不对的 UI
上面的records send 一直在增大 即使我已经停止向kafka写入数据了 但是hive 里面的数据是对的 我写了30条
hive里面查出来的确实是30条 但UI上面已经显示480条了 且还在增加



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink 1.11 使用sql将流式数据写入hive

2020-08-11 文章 JasonLee
Hi

可以看下這個demo 
https://mp.weixin.qq.com/s/HqXaREr_NZbZ8lgu_yi7yA



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:flink-sql-connector-elasticsearch6_2.11_1.10.0 与 flink-connector-elasticsearch6_2.11_1.10.0 并存问题

2020-08-06 文章 JasonLee
Hi


只引入-sql那个包就行了 代码也应该是可以直接用这个包的


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年08月07日 09:26,费文杰 写道:

HI:
  
使用的flink1.10环境,因为使用sql-client,我在flink/lib目录下放置了flink-sql-connector-elasticsearch6_2.11_1.10.0.jar,因为同事有直接在代码里写的sink到elasticsearch,所以他引入了flink-connector-elasticsearch6_2.11_1.10.0.jar,
这样就会导致他的任务报错。为了重现问题,我在本机同时引入了这两个依赖包:

 org.apache.httpcomponents
 httpclient
 4.5.2
   

   
 org.elasticsearch.client
 elasticsearch-rest-high-level-client
 ${elasticsearch.version}
   

   
 org.apache.flink
 flink-connector-elasticsearch6_2.11
 ${flink.version}
 
   

   
 org.apache.flink
 flink-sql-connector-elasticsearch6_2.11
 ${flink.version}
 provided
   

本地运行,有以下报错信息:
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job 
failed (JobID: bb3bf961144b75bfc71c9dd3efaf59f9)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at 
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at 
org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
at 
org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at 
akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala)
at 
akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at 
akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution 
failed.
at 
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at 
org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
... 33 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by 
NoRestartBackoffTimeStrategy
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at 
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at 
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at 
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
at 
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62

Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-08-04 文章 JasonLee
hi
我记得我用1.6.0版本的时候就有这个问题 好像是没有对应的jira 不过我用新版本已经没有遇到这个问题了 应该是偶尔会出现



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复:Flink 通过sql client 启动的任务,kill掉之后,是否可以指定checkpoint恢复?

2020-08-04 文章 JasonLee
HI
目前sql-client的方式应该还不支持从指定的checkpoint恢复任务 不过Flink on zeppelin目前已经支持了 有兴趣可以用下


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年08月04日 16:28,mispower 写道:
通过sql_client 启动的streaming的任务,在维护或者异常之后,如何像flink straming 一样通过指定checkpoint 
恢复到上一次的消费节点。
在邮件列表里搜索了相关的问题,好像都没有明确回答。

回复:flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running

2020-08-04 文章 JasonLee
hi
这本身就是一个bug 应该是还没有修复


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年08月04日 15:41,bradyMk 写道:
您好
我这边是用perJob的方式提交的,而且这种现象还是偶发性的,这次错误日志是这样的:

2020-08-04 10:30:14,475 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Job
flink2Ots (e11a22af324049217fdff28aca9f73a5) switched from state FAILING to
FAILED.
java.lang.Exception: Container released on a *lost* node
   at
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:370)
   at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
   at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
   at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
   at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-08-04 10:30:14,476 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not
restart the job flink2Ots (e11a22af324049217fdff28aca9f73a5) because the
restart strategy prevented it.
java.lang.Exception: Container released on a *lost* node
   at
org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:370)
   at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397)
   at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190)
   at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
   at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
   at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
   at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
   at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
   at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
   at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
   at akka.actor.ActorCell.invoke(ActorCell.scala:561)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
   at akka.dispatch.Mailbox.run(Mailbox.scala:225)
   at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
   at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
2020-08-04 10:30:14,476 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping
checkpoint coordinator for job e11a22af324049217fdff28aca9f73a5.
2020-08-04 10:30:14,476 INFO
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  -
Shutting down

但是我之前也遇到过这个错误时,yarn上的application是可以退出的。



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复:flink1.9.1 在WebUI中查看committedOffsets指标为负值

2020-07-30 文章 JasonLee
hi
提交offset到Kafka是在ck成功之后 如果没有开启ck的话 需要设置自动提交提交offset


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月30日 19:37,bradyMk 写道:
谢谢解答~
这个确实是个不变的值,应该是没有成功提交;而且我发现了,只要是没有设置ck的任务,该指标都会显示这个值,如果设置了ck,就会正常;但是我不懂为什么会这样,请问您知道详细的原因么?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Could not find any factory for identifier 'kafka'

2020-07-24 文章 JasonLee
hi
只需要-sql和-json两个包就可以了


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

On 07/24/2020 17:02, RS wrote:
hi,
Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了
编译的jar包是jar-with-dependencies的


代码片段:
   public String ddlSql = String.format("CREATE TABLE %s (\n" +
   "  number BIGINT,\n" +
   "  msg STRING,\n" +
   "  username STRING,\n" +
   "  update_time TIMESTAMP(3)\n" +
   ") WITH (\n" +
   " 'connector' = 'kafka',\n" +
   " 'topic' = '%s',\n" +
   " 'properties.bootstrap.servers' = '%s',\n" +
   " 'properties.group.id' = '%s',\n" +
   " 'format' = 'json',\n" +
   " 'json.fail-on-missing-field' = 'false',\n" +
   " 'json.ignore-parse-errors' = 'true'\n" +
   ")\n", tableName, topic, servers, group);


   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
   StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
   tableEnv.executeSql(ddlSql);


报错信息:
Caused by: org.apache.flink.table.api.ValidationException: Could not find any 
factory for identifier 'kafka' that implements 
'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath.
Available factory identifiers are:
datagen
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240)
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
... 33 more


参考了这个 
http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893
补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错


附上pom依赖:

   
   org.apache.flink
   flink-java
   ${flink.version}
   
   
   org.apache.flink
   flink-table-api-java-bridge_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-table-api-java
   ${flink.version}
   
   
   org.apache.flink
   flink-connector-kafka_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-sql-connector-kafka_2.12
   ${flink.version}
   
   
   org.apache.flink
   flink-json
   ${flink.version}
   
   


感谢各位~

回复:flink1.11启动问题

2020-07-21 文章 JasonLee
Hi
报错显示的是资源不足了 你确定yarn上的资源是够的吗 看下是不是节点挂了 1.11我这边提交任务都是正常的


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月21日 16:36,酷酷的浑蛋 写道:


服了啊,这个flink1.11启动怎么净是问题啊


我1.7,1.8,1.9 都没有问题,到11就不行
./bin/flink run -m yarn-cluster -yqu root.rt_constant -ys 2 -yjm 1024 -yjm 1024 
-ynm sql_test ./examples/batch/WordCount.jar --input 
hdfs://xxx/data/wangty/LICENSE-2.0.txt --output hdfs://xxx/data/wangty/a


报错:
Caused by: 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
Could not allocate the required slot within slot request timeout. Please make 
sure that the cluster has enough resources. at 
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441)
 ... 45 more Caused by: java.util.concurrent.CompletionException: 
java.util.concurrent.TimeoutException at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
 at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
 at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) 
at 
java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
 ... 25 more


我资源是足的啊,就flink1.11起不来,一直卡在那里,卡好久然后报这个错,大神们帮看看吧,昨天的jar包冲突问题已经解决(只有flink1.11存在的问题),



回复:flink1.11启动问题

2020-07-21 文章 JasonLee
HI
你使用的什么模式?启动任务的命令发出来看一下吧


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月22日 12:44,酷酷的浑蛋 写道:
现在是为什么啊?启动任务自动占用所有core?core数量一直在增加,直到达到队列最大值,然后就卡住了,这flink1.11啥情况啊?  





??????flink-1.11 ddl kafka-to-hive????

2020-07-21 文章 JasonLee
hi
hive??


| |
JasonLee
|
|
??17610775...@163.com
|

Signature is customized by Netease Mail Master

??2020??07??21?? 19:09??kcz ??
hive-1.2.1
chk 
??chkchk??kafkahive??
String hiveSql = "CREATE  TABLE  stream_tmp.fs_table (\n" +
   "  host STRING,\n" +
   "  url STRING," +
   "  public_date STRING" +
   ") partitioned by (public_date string) " +
   "stored as PARQUET " +
   "TBLPROPERTIES (\n" +
   "  'sink.partition-commit.delay'='0 s',\n" +
   "  'sink.partition-commit.trigger'='partition-time',\n" +
   "  'sink.partition-commit.policy.kind'='metastore,success-file'" +
   ")";
tableEnv.executeSql(hiveSql);


tableEnv.executeSql("INSERT INTO  stream_tmp.fs_table SELECT host, url, 
DATE_FORMAT(public_date, '-MM-dd') FROM stream_tmp.source_table");

??????state??????checkpoint??????

2020-07-17 文章 JasonLee
hi
UI??checkpoint??checkpoint?? 
??


| |
JasonLee
|
|
??17610775...@163.com
|

Signature is customized by Netease Mail Master

??2020??07??17?? 17:21??sun ??
??counts ??  Listhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
Best,
Congxian


sun <1392427...@qq.com ??2020??7??16?? 6:16??


 
env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 //
 env.setRestartStrategy(RestartStrategies.noRestart());
 env.getCheckpointConfig().setCheckpointTimeout(500);

 
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

 
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 env.setStateBackend(new
 RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints"));
 ??private transient ListState

回复:Flink 1.11 Hive Streaming Write的问题

2020-07-16 文章 JasonLee
hi
需要开启checkpoint


| |
JasonLee
|
|
邮箱:17610775...@163.com
|

Signature is customized by Netease Mail Master

在2020年07月16日 18:03,李佳宸 写道:
想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。
批量的hive写入,流环境的读取是正常的。

附代码,很简短:

public class KafkaToHiveStreaming {
   public static void main(String[] arg) throws Exception{
   StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
   EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
   StreamTableEnvironment bsTableEnv =
StreamTableEnvironment.create(bsEnv, bsSettings);
   String name= "myhive";
   String defaultDatabase = "default";
   String hiveConfDir =
"/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local
path
   String version = "3.1.2";

   HiveCatalog hive = new HiveCatalog(name, defaultDatabase,
hiveConfDir, version);
   bsTableEnv.registerCatalog("myhive", hive);
   bsTableEnv.useCatalog("myhive");
   bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
   bsTableEnv.executeSql("CREATE TABLE topic_products (" +
   "  id BIGINT ," +
   "  order_id STRING," +
   "  amount DECIMAL(10, 2)," +
   "  create_time TIMESTAMP " +
   ") WITH (" +
   " 'connector' = 'kafka'," +
   " 'topic' = 'order.test'," +
   " 'properties.bootstrap.servers' = 'localhost:9092'," +
   " 'properties.group.id' = 'testGroup'," +
   " 'scan.startup.mode' = 'earliest-offset', " +
   " 'format' = 'json'  " +
   ")");
   bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);

   bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" +
   "  id BIGINT ," +
   "  order_id STRING," +
   "  amount DECIMAL(10, 2)" +
   "  )");
   bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
   bsTableEnv.executeSql("CREATE TABLE print_table WITH
('connector' = 'print')" +
   "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)");

   bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
   bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " +
   "id, " +
   "order_id, " +
   "amount " +
   "FROM topic_products");

   Table table1 = bsTableEnv.from("hive_sink_table_streaming");
   table1.executeInsert("print_table");
   }
}


  1   2   >