回复:flink sql消费upsert-kafka源表如何指定从哪个位点开始消费?
hi Upsert-kafka 不支持指定消费者位置,默认是从 earliest 位置开始消费的,你可以自己修改代码支持 scan.startup.mode 参数。 Best JasonLee 回复的原邮件 | 发件人 | casel.chen | | 发送日期 | 2022年12月5日 18:24 | | 收件人 | user-zh@flink.apache.org | | 主题 | flink sql消费upsert-kafka源表如何指定从哪个位点开始消费? | flink sql消费upsert-kafka源表如何指定从哪个位点开始消费?仿照kafka source表添加了 scan.startup.mode 参数会报非法参数
回复:如何能正确的获取任务总写入量?
hi 你可以把之前的历史值保存下来,比如保存在状态里面或者第三方存储,任务重启后在加上之前的值。 Best JasonLee 回复的原邮件 | 发件人 | 陈佳豪 | | 发送日期 | 2022年11月24日 18:18 | | 收件人 | | | 主题 | 如何能正确的获取任务总写入量? | 大佬们好 背景: 业务端想基于flink metrics获取到任务总写入量。正常情况下通过指numRecordsIn指标是可以获取到增长的写入量的,如果用户自己误操作删了目标表并且重新建表恢复了,这个时候上报的numRecordsIn指标就会是0这就导致无法准确的获取任务写入总量了。不知道各位大佬有没有碰到这种情况希望能得到一个解决方案! 感谢。
回复:flink作业提交运行后如何监听作业状态发生变化?
Hi 可以通过 Flink 的 Metric 和 Yarn 的 Api 去获取任务的状态(任务提交到 yarn 的话) Best JasonLee 回复的原邮件 | 发件人 | casel.chen | | 发送日期 | 2022年11月23日 08:32 | | 收件人 | user-zh@flink.apache.org | | 主题 | flink作业提交运行后如何监听作业状态发生变化? | 请问flink作业提交运行后如何监听作业状态发生变化以便在控台上实时显示作业状态变更?目前我们的做法是轮询,但效率低,有没有listener可以进行注册的方法呢?
回复:Flink sql从ck恢复,统计数据波动问题
Hi 我理解应该是任务恢复的时候从上一次成功的 checkpoint 或者你指定的 checkpoint 里记录的 offset 开始消费,所以此时的统计值应该是有短暂的下跌,因为数据相当于回复到之前重复计算了一部分。这个应该是符合预期的,可能需要在业务上做一些处理。 Best JasonLee 回复的原邮件 | 发件人 | 天下五帝东 | | 发送日期 | 2022年10月10日 13:34 | | 收件人 | user-zh@flink.apache.org | | 主题 | Flink sql从ck恢复,统计数据波动问题 | Hi: 各位大佬们,flink任务从状态恢复时,统计值有一个明显的下跌,等ck打完后,统计值才会上升,这种情况有什么好的解决方案吗?
回复:flink cdc + kafka场景下增加kafka分区数问题
Hi 跟重启作业没关系哈,你需要自定义写入 kafka 的分区策略。 Best JasonLee 回复的原邮件 | 发件人 | casel.chen | | 发送日期 | 2022年09月26日 23:21 | | 收件人 | user-zh@flink.apache.org | | 主题 | flink cdc + kafka场景下增加kafka分区数问题 | flink cdc 消费mysql写到kafka场景下一开始数据量不大给的分区数可能只有3,后面业务数据量上来了需要添加分区数,例如12。那么问题来了,如何确保同一条记录的数据变更历史发到同一个kafka分区以确保下游消费的顺序性?重启作业好像也不能解决这个问题吧?
回复:如何实现flink作业失败告警功能
Hi 可以通过监控 numRestarts [1] metrics 发送相关的报警 [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/metrics/#availability Best JasonLee 回复的原邮件 | 发件人 | casel.chen | | 发送日期 | 2022年07月18日 22:48 | | 收件人 | user-zh@flink.apache.org | | 主题 | 如何实现flink作业失败告警功能 | 想实现flink作业一旦失败就立马告警功能,请问要如何实现?是否有Listener可以进行注册?
Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030
Hi 仔细看了一下日志,感觉还是 yarn 的配置问题,可以看下 yarn.resourcemanager.scheduler.address 配置的什么吗?在 client 端连接 RM 的时候打印的日志是 2022-07-14 15:10:48,109 INFO org.apache.hadoop.yarn.client.RMProxy[] - Connecting to ResourceManager at /0.0.0.0:8030 这里的地址是不对的,正常应该是 yarn.resourcemanager.scheduler.address:8030 但是日志里面是 0.0.0.0:8030。 Best JasonLee Replied Message | From | lishiyuan0506 | | Date | 07/14/2022 16:52 | | To | user-zh@flink.apache.org | | Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 | hi 是的,这个是运行参数 /opt/flink-1.13.3/bin/flink run \ -t yarn-per-job \ -Dyarn.application.name=test_wordcount \ -Dparallelism.default=1 \ -Dtaskmanager.numberOfTaskSlots=1 \ -Djobmanager.memory.process.size=1024mb \ -Dtaskmanager.memory.process.size=2048mb \ /opt/flink-1.13.3/examples/streaming/WordCount.jar | | lishiyuan0506 | | lishiyuan0...@163.com | Replied Message | From | JasonLee<17610775...@163.com> | | Date | 07/14/2022 16:47 | | To | user-zh@flink.apache.org | | Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 | Hi 这个是直接运行的 examples 里面的 demo 程序吗? Best JasonLee Replied Message | From | lishiyuan0506 | | Date | 07/14/2022 16:25 | | To | user-zh@flink.apache.org | | Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 | 感谢感谢,可以看的,我添加到附件给您发过去 | | lishiyuan0506 | | lishiyuan0...@163.com | Replied Message | From | JasonLee<17610775...@163.com> | | Date | 07/14/2022 16:15 | | To | user-zh@flink.apache.org | | Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 | Hi 可以用 yarn logs -applicationId xxx 看下日志吗? Best JasonLee Replied Message | From | lishiyuan0506 | | Date | 07/14/2022 15:43 | | To | user-zh@flink.apache.org | | Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 | hello,根据您的思路,我查看了所有的yarn-site.xml,没有发现配置出错的地方,Spark和MR的运行都正常,感觉不是yarn的问题 | | lishiyuan0506 | | lishiyuan0...@163.com | Replied Message | From | Biao Geng | | Date | 07/14/2022 15:37 | | To | | | Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 | hi 根据你发的描述,是Flink的job manager在非RM所在机器上启动时,由于尝试连接0.0.0.0:8030 端口去向YARN申请资源时连接不通,导致失败。你可以检查下集群内worker节点的hadoop配置,看看yarn.resourcemanager.hostname等配置是否设置正确。 Best, Biao Geng lishiyuan0506 于2022年7月14日周四 15:17写道: 您好,我之前做了一些尝试: 1. 测试Spark、MR任务正常 2. 将生产环境的Flink和官网干净的Flink安装后测试example,出现同样的问题 3. 尝试在`/opt/flink-1.13.3/bin/config.sh`中配置`DEFAULT_YARN_CONF_DIR="/opt/hadoop-3.1.4/etc/hadoop/"`,测试example出现同样的问题,说明Flink已经成功加载了`yarn-site.xml` 4.尝试在`/opt/flink-1.13.3/bin/config.sh`中配置`export HADOOP_CONF_DIR=/opt/hadoop-3.1.4/etc/hadoop`,`export HADOOP_CLASSPATH=`hadoop classpath`测试example出现同样的问题 目前无论是yarn还是flink都找不出来出现了什么问题,手足无措了 | | lishiyuan0506 | | lishiyuan0...@163.com | Replied Message | From | Biao Geng | | Date | 07/14/2022 14:31 | | To | | | Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 | hi, 你提到跑wordcount.jar时,当作业被调度到RM所在的机器上可以正常运行,调度到非RM所在的机器上就失败。flink环境干净的话,那大概率还是hadoop的环境设置有问题。 有几个可以检查的地方: 1. 运行flink run之前或者在flink的bin/config.sh里是否有正确设置hadoop环境变量,例如 export HADOOP_CLASSPATH=`hadoop classpath` 2. 集群内机器(比如非RM所在机器)的HADOOP_CONF_DIR是否设置正确,其指向的目录里的yarn-site.xml的yarn.resourcemanager.hostname等配置是否设置正确 Best, Biao Geng Biao Geng 于2022年7月14日周四 11:32写道: hi, 你有试过提交flink example(比如wordcount作业)吗?如果报了一样的错误的话,可以检查一下你的flink/lib目录下是否有放多余的YARN配置(比如运行grep -irn "0.0.0.0" $FLINK_HOME);如果example可以成功提交,可以看看是不是你的作业jar里打进了错误的YARN配置。 Best, Biao Geng lishiyuan0506 于2022年7月14日周四 11:06写道: 您好,环境变量里面有,而且flink的conf.sh也设置了 lishiyuan0506 lishiyuan0...@163.com < https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=lishiyuan0506=lishiyuan0506%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22lishiyuan0506%40163.com%22%5D Replied Message From Yang Wang Date 07/14/2022 11:00 To user-zh Subject Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 确认一下你是否正确设置了HADOOP_CONF_DIR环境变量 Best, Yang lishiyuan0506 于2022年7月14日周四 09:41写道: 打扰大家一下,请问一下各位在yarn提交flink的时候,有没有遇到过Retrying connect to server: 0.0.0.0/0.0.0.0:8030这个异常 hadoop的classpath没问题,Spark和MR在Yarn上跑也没问题,就flink有这样的问题 | | lishiyuan0506 | | lishiyuan0...@163.com |
Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030
Hi 这个是直接运行的 examples 里面的 demo 程序吗? Best JasonLee Replied Message | From | lishiyuan0506 | | Date | 07/14/2022 16:25 | | To | user-zh@flink.apache.org | | Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 | 感谢感谢,可以看的,我添加到附件给您发过去 | | lishiyuan0506 | | lishiyuan0...@163.com | Replied Message | From | JasonLee<17610775...@163.com> | | Date | 07/14/2022 16:15 | | To | user-zh@flink.apache.org | | Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 | Hi 可以用 yarn logs -applicationId xxx 看下日志吗? Best JasonLee Replied Message | From | lishiyuan0506 | | Date | 07/14/2022 15:43 | | To | user-zh@flink.apache.org | | Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 | hello,根据您的思路,我查看了所有的yarn-site.xml,没有发现配置出错的地方,Spark和MR的运行都正常,感觉不是yarn的问题 | | lishiyuan0506 | | lishiyuan0...@163.com | Replied Message | From | Biao Geng | | Date | 07/14/2022 15:37 | | To | | | Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 | hi 根据你发的描述,是Flink的job manager在非RM所在机器上启动时,由于尝试连接0.0.0.0:8030 端口去向YARN申请资源时连接不通,导致失败。你可以检查下集群内worker节点的hadoop配置,看看yarn.resourcemanager.hostname等配置是否设置正确。 Best, Biao Geng lishiyuan0506 于2022年7月14日周四 15:17写道: 您好,我之前做了一些尝试: 1. 测试Spark、MR任务正常 2. 将生产环境的Flink和官网干净的Flink安装后测试example,出现同样的问题 3. 尝试在`/opt/flink-1.13.3/bin/config.sh`中配置`DEFAULT_YARN_CONF_DIR="/opt/hadoop-3.1.4/etc/hadoop/"`,测试example出现同样的问题,说明Flink已经成功加载了`yarn-site.xml` 4.尝试在`/opt/flink-1.13.3/bin/config.sh`中配置`export HADOOP_CONF_DIR=/opt/hadoop-3.1.4/etc/hadoop`,`export HADOOP_CLASSPATH=`hadoop classpath`测试example出现同样的问题 目前无论是yarn还是flink都找不出来出现了什么问题,手足无措了 | | lishiyuan0506 | | lishiyuan0...@163.com | Replied Message | From | Biao Geng | | Date | 07/14/2022 14:31 | | To | | | Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 | hi, 你提到跑wordcount.jar时,当作业被调度到RM所在的机器上可以正常运行,调度到非RM所在的机器上就失败。flink环境干净的话,那大概率还是hadoop的环境设置有问题。 有几个可以检查的地方: 1. 运行flink run之前或者在flink的bin/config.sh里是否有正确设置hadoop环境变量,例如 export HADOOP_CLASSPATH=`hadoop classpath` 2. 集群内机器(比如非RM所在机器)的HADOOP_CONF_DIR是否设置正确,其指向的目录里的yarn-site.xml的yarn.resourcemanager.hostname等配置是否设置正确 Best, Biao Geng Biao Geng 于2022年7月14日周四 11:32写道: hi, 你有试过提交flink example(比如wordcount作业)吗?如果报了一样的错误的话,可以检查一下你的flink/lib目录下是否有放多余的YARN配置(比如运行grep -irn "0.0.0.0" $FLINK_HOME);如果example可以成功提交,可以看看是不是你的作业jar里打进了错误的YARN配置。 Best, Biao Geng lishiyuan0506 于2022年7月14日周四 11:06写道: 您好,环境变量里面有,而且flink的conf.sh也设置了 lishiyuan0506 lishiyuan0...@163.com < https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=lishiyuan0506=lishiyuan0506%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22lishiyuan0506%40163.com%22%5D Replied Message From Yang Wang Date 07/14/2022 11:00 To user-zh Subject Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 确认一下你是否正确设置了HADOOP_CONF_DIR环境变量 Best, Yang lishiyuan0506 于2022年7月14日周四 09:41写道: 打扰大家一下,请问一下各位在yarn提交flink的时候,有没有遇到过Retrying connect to server: 0.0.0.0/0.0.0.0:8030这个异常 hadoop的classpath没问题,Spark和MR在Yarn上跑也没问题,就flink有这样的问题 | | lishiyuan0506 | | lishiyuan0...@163.com |
Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030
Hi 可以用 yarn logs -applicationId xxx 看下日志吗? Best JasonLee Replied Message | From | lishiyuan0506 | | Date | 07/14/2022 15:43 | | To | user-zh@flink.apache.org | | Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 | hello,根据您的思路,我查看了所有的yarn-site.xml,没有发现配置出错的地方,Spark和MR的运行都正常,感觉不是yarn的问题 | | lishiyuan0506 | | lishiyuan0...@163.com | Replied Message | From | Biao Geng | | Date | 07/14/2022 15:37 | | To | | | Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 | hi 根据你发的描述,是Flink的job manager在非RM所在机器上启动时,由于尝试连接0.0.0.0:8030 端口去向YARN申请资源时连接不通,导致失败。你可以检查下集群内worker节点的hadoop配置,看看yarn.resourcemanager.hostname等配置是否设置正确。 Best, Biao Geng lishiyuan0506 于2022年7月14日周四 15:17写道: 您好,我之前做了一些尝试: 1. 测试Spark、MR任务正常 2. 将生产环境的Flink和官网干净的Flink安装后测试example,出现同样的问题 3. 尝试在`/opt/flink-1.13.3/bin/config.sh`中配置`DEFAULT_YARN_CONF_DIR="/opt/hadoop-3.1.4/etc/hadoop/"`,测试example出现同样的问题,说明Flink已经成功加载了`yarn-site.xml` 4.尝试在`/opt/flink-1.13.3/bin/config.sh`中配置`export HADOOP_CONF_DIR=/opt/hadoop-3.1.4/etc/hadoop`,`export HADOOP_CLASSPATH=`hadoop classpath`测试example出现同样的问题 目前无论是yarn还是flink都找不出来出现了什么问题,手足无措了 | | lishiyuan0506 | | lishiyuan0...@163.com | Replied Message | From | Biao Geng | | Date | 07/14/2022 14:31 | | To | | | Subject | Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 | hi, 你提到跑wordcount.jar时,当作业被调度到RM所在的机器上可以正常运行,调度到非RM所在的机器上就失败。flink环境干净的话,那大概率还是hadoop的环境设置有问题。 有几个可以检查的地方: 1. 运行flink run之前或者在flink的bin/config.sh里是否有正确设置hadoop环境变量,例如 export HADOOP_CLASSPATH=`hadoop classpath` 2. 集群内机器(比如非RM所在机器)的HADOOP_CONF_DIR是否设置正确,其指向的目录里的yarn-site.xml的yarn.resourcemanager.hostname等配置是否设置正确 Best, Biao Geng Biao Geng 于2022年7月14日周四 11:32写道: hi, 你有试过提交flink example(比如wordcount作业)吗?如果报了一样的错误的话,可以检查一下你的flink/lib目录下是否有放多余的YARN配置(比如运行grep -irn "0.0.0.0" $FLINK_HOME);如果example可以成功提交,可以看看是不是你的作业jar里打进了错误的YARN配置。 Best, Biao Geng lishiyuan0506 于2022年7月14日周四 11:06写道: 您好,环境变量里面有,而且flink的conf.sh也设置了 lishiyuan0506 lishiyuan0...@163.com < https://maas.mail.163.com/dashi-web-extend/html/proSignature.html?ftlId=1=lishiyuan0506=lishiyuan0506%40163.com=https%3A%2F%2Fmail-online.nosdn.127.net%2Fqiyelogo%2FdefaultAvatar.png=%5B%22lishiyuan0506%40163.com%22%5D Replied Message From Yang Wang Date 07/14/2022 11:00 To user-zh Subject Re: Retrying connect to server: 0.0.0.0/0.0.0.0:8030 确认一下你是否正确设置了HADOOP_CONF_DIR环境变量 Best, Yang lishiyuan0506 于2022年7月14日周四 09:41写道: 打扰大家一下,请问一下各位在yarn提交flink的时候,有没有遇到过Retrying connect to server: 0.0.0.0/0.0.0.0:8030这个异常 hadoop的classpath没问题,Spark和MR在Yarn上跑也没问题,就flink有这样的问题 | | lishiyuan0506 | | lishiyuan0...@163.com |
回复:flink sql解析kafka数据
Hi 解析嵌套 JSON 可以参考这篇文章哈,https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA Best JasonLee 回复的原邮件 | 发件人 | 小昌同学 | | 发送日期 | 2022年06月30日 15:02 | | 收件人 | user-zh@flink.apache.org | | 主题 | flink sql解析kafka数据 | 各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink sql建表语句拿到最里面的字段的值 我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true' 但是我在客户端执行的时候 发现识别不到这个字段 有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛 CREATE TABLE ccc_test_20220630_2 ( trans_number STRING, end_timestamp STRING, return_flagSTRING, commodity_type STRING ) COMMENT '中台交易流水小票头' WITH ( 'connector' = 'kafka', 'topic' = 'yh_rme_soc_stream_prod-tlog_header', 'properties.bootstrap.servers' = '', 'properties.group.id' = 'ccc_test_20220630_2', 'properties.request.timeout.ms' = '6', 'format' = 'json', 'scan.startup.mode' = 'group-offsets', -- 'scan.startup.mode' = 'timestamp', -- 'scan.startup.timestamp-millis' = '165373920', 'json.fail-on-missing-field' = 'false', 'json.ignore-parse-errors' = 'true' 'json.infer-schema.flatten-nested-columns.enable'='true' ); | | 小昌 | | ccc0606fight...@163.com |
回复:Flink JobManager 节点 JVM Metaspace 过高
Hi Summer 你的图片挂了,可以把图片上传到图床,然后把链接贴在邮件里,另外可以把 JM 的日志也发一下吗? Best JasonLee 回复的原邮件 | 发件人 | Summer | | 发送日期 | 2022年06月10日 18:15 | | 收件人 | user-zh@flink.apache.org | | 主题 | Flink JobManager 节点 JVM Metaspace 过高 | 使用 FinkUI 上传 Flink 任务 Jar 时,任务启动失败。 这时候JVM Metaspace就会异常增加。 这是什么原因?
回复: flink sql 如何提高下游并发度?
hi 是 10 目前 source 还不支持单独设置并发度,但是 sink 是支持的,当然如果没有单独设置的话 sink 也是 10 Best JasonLee 在2022年01月11日 16:52,RS 写道: Hi, 请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10? 如果source是10的话,那还有7个线程就空闲了? 在 2022-01-11 11:10:41,"Caizhi Weng" 写道: Hi! 可以设置 parallelism.default 为需要的并发数。 Jeff 于2022年1月9日周日 19:44写道: 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?
回复: flink固定延迟重启策略没有延迟
Hi 日志里面应该可以看到这样的信息 Calculating tasks to restart to recover the failed task 。 Best JasonLee 在2021年12月22日 11:08,宋品如 写道: Hi: 谢谢回复,我明白了。还有一个问题: 日志里显示:Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2, backoffTimeMS=3) 这说明我的任务已经重启了2次依然失败了吗? 日志里我看不到重启的记录,这个应该怎么确认呢? 这里的报错是第一次重启还是最后一次重启导致的呢? 谢谢! 在 2021-12-22 10:47:24,"Caizhi Weng" 写道: Hi! log 里的这些信息是同一个 job 里不同的并发分别 fail(可以从 2/3 和 3/3 这两个不同的并发号看出来),并不是说这个 job fail 了两次。 宋品如 于2021年12月22日周三 10:14写道: 发件人: Song PinRu 发送时间: 2021年12月21日 15:19 收件人: user-zh@flink.apache.org 主题: flink固定延迟重启策略没有延迟 Hi: 昨天的邮件截图看不了,把日志贴上来重新发送一份 -- 查看日志发现固定延迟重启策略似乎没有生效,我设置的是30s延迟重启2次, 但是日志显示的是在06:26:50这1秒内重启了2次都失败了,并最终导致任务失败, 我设置的延迟时间似乎完全没有生效,Flink版本是1.12.2。 有没有人能告诉我这是为什么? 设置重启策略的代码: ``` val env = StreamExecutionEnvironment.getExecutionEnvironment val backend = new FsStateBackend(CommonConfig.FLINK_STATEBACKEND_CHECKPOINT) env.setStateBackend(backend) // 每 3ms 开始一次 checkpoint env.enableCheckpointing(3) // 设置模式为精确一次 (这是默认值) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 确认 checkpoints 之间的时间会进行 500 ms env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1) // Checkpoint 必须在2分钟内完成,否则就会被抛弃 env.getCheckpointConfig.setCheckpointTimeout(12) // 可容忍checkpoint失败次数 env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3) // 同一时间只允许一个 checkpoint 进行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //设置全局并行度 // env.setParallelism(3) //重启策略 //PS:默认策略会重启int最大值次,导致任务一直处于重启状态,checkpoint出现连续空文件夹,同时导致有效checkpoint无法使用 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2, Time.seconds(30))) ``` 日志: ``` 2021-12-21 06:26:50,850 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: source -> note -> Sink: sink (2/3) (85ad9ee0f52f04c5709430a8c793817a) switched from RUNNING to FAILED on container_e1595_1638345947522_0010_01_03 @ pbj-cdh-20-72.optaim.com (dataPort=35530). org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: This server is not the leader for that topic-partition. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1392) ~[dws_module-1.0.4.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:850) ~[dws_module-1.0.4.jar:?] at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99) ~[dws_module-1.0.4.jar:?] at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:39) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426) ~[flink-dist_2.11-1.12.2.jar:1.12.2] at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365) ~[dws_module-1.0.4.jar:?] at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsum
回复:实时读取hive参数不生效
hi 看上面似乎这个参数没有生效,你是在哪里设置的呢? Best JasonLee 在2021年12月21日 20:20,Fei Han 写道: @all: 大家好! 我在实时读取hive的时候动态参数不生效,另外flink是否可以通过流读方式读取hive的普通表呢? 版本如下: Flink版本1.13.3 Hive版本hive2.1.1-CDH6.2.0 设置的参数是set 'table.dynamic-table-options.enabled'='true' 报错如下: INSERT INTO qhc_catalog.qhc_hms.qhc_ods_assassin_dept select * from qhc_catalog.qhc_assassin_ods.assassin_dept /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition-order'='create-time') */ 2021-12-21 19:56:45,198 ERROR com.flink.streaming.core.JobApplication [] - 任务执行失败: org.apache.flink.table.api.ValidationException: The 'OPTIONS' hint is allowed only when the config option 'table.dynamic-table-options.enabled' is set to true. at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createFinalCatalogTable(CatalogSourceTable.java:104) ~[flink-table-blink_2.12-1.13.3.jar:1.13.3] at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:79) ~[flink-table-blink_2.12-1.13.3.jar:1.13.3] at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585) ~[flink-table_2.12-1.13.3.jar:1.13.3] at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507) ~[flink-table_2.12-1.13.3.jar:1.13.3] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2140) ~[flink-table_2.12-1.13.3.jar:1.13.3] at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050) ~[flink-table_2.12-1.13.3.jar:1.13.3] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663) ~[flink-table_2.12-1.13.3.jar:1.13.3] at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644) ~[flink-table_2.12-1.13.3.jar:1.13.3] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438) ~[flink-table_2.12-1.13.3.jar:1.13.3] at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570) ~[flink-table_2.12-1.13.3.jar:1.13.3] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:169) ~[flink-table-blink_2.12-1.13.3.jar:1.13.3] at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:161) ~[flink-table-blink_2.12-1.13.3.jar:1.13.3] at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:989) ~[flink-table-blink_2.12-1.13.3.jar:1.13.3] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:958) ~[flink-table-blink_2.12-1.13.3.jar:1.13.3] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:283) ~[flink-table-blink_2.12-1.13.3.jar:1.13.3] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:603) ~[flink-table-blink_2.12-1.13.3.jar:1.13.3] at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:272) ~[flink-table-blink_2.12-1.13.3.jar:1.13.3] at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) ~[flink-table-blink_2.12-1.13.3.jar:1.13.3] at org.apache.flink.table.api.internal.StatementSetImpl.addInsertSql(StatementSetImpl.java:51) ~[flink-table_2.12-1.13.3.jar:1.13.3] at com.flink.streaming.core.execute.ExecuteSql.exeSql(ExecuteSql.java:38) ~[flink-streaming-core-1.3.0.RELEASE.jar:1.3.0.RELEASE] at com.flink.streaming.core.JobApplication.main(JobApplication.java:80) ~[flink-streaming-core-1.3.0.RELEASE.jar:1.3.0.RELEASE] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_211] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_211] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_211] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_211] at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-dist_2.12-1.13.3.jar:1.13.3] at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-dist_2.12-1.13.3.jar:1.13.3] at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) ~[flink-dist_2.12-1.13.3.jar:1.13.3] at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) ~[flink-dist_2.12-1.13.3.jar:1.13.3] at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) ~[flink-dist_2.12-1.13.3.jar:1.13.3] at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) ~[flink-dist_2.12-1.13.3.jar:1.13.3] at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) ~[flink-dist_2.12-1.13.3.jar:1.13.3
Re:How to run SQL Client by Per-Job
Hi 设置一下 execution.target: yarn-per-job 就行了 Best JasonLee On 11/29/2021 18:48, wrote: Hi Can SQL Client run a job by Per-Job ? 在网上看到曾庆东同学分享过《Flink SQL CDC 上线!我们总结了 13 条生产实践经验》,提到使用per-job方式提交,如下图,但我在官网上看到只支持Session方式,求解正确的使用方式 Thx wangz...@163.com
回复: Flink工程停止问题
Hi 应该是你的版本比较低,新版本是可以用的 Best JasonLee 在2021年11月29日 11:35,Mabin 写道: 这个参数我试过了,没用的 发自我的iPhone 在 2021年11月18日,上午11:51,zhisheng 写道: web.cancel.enable: false web.cancel.enable 这个参数可以控制是否显示那个取消按钮 Caizhi Weng 于2021年11月16日周二 下午3:53写道: Hi! Flink 本身不自带安全机制,需要通过外部系统完成访问限制。 疾鹰击皓月 <1764232...@qq.com.invalid> 于2021年11月16日周二 下午2:57写道: 您好 Flink WebUI的左上角有一个cancel按钮,通过按钮可以停止Flink工程。但这会导致一定的权限问题。我们希望只有特定人员可以停止Flink工程,请问有没有方法可以让那个停止按钮不生效或者不显示呢?
回复:Flink1.12 Streaming 消费kafka
hi 可以使用 setPartitions 方法 具体参考官网: https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/#topic-partition-subscription Best JasonLee 在2021年11月8日 17:06,guanyq 写道: 请大佬指导下: flink streaming可以指定partition消费kafka么 如有100个partition,但是我只想消费15partiton。
??????flink on yarn ??pre_job????????,????session????????????
hi ?? jar ??Flink ?? Best JasonLee ??2021??11??4?? 18:41<2572805...@qq.com.INVALID> ?? yarn??: org.apache.flink.runtime.entrypoint.ClusterEntrypointException: Failed to initialize the cluster entrypoint YarnJobClusterEntrypoint. at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:200) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runClusterEntrypoint(ClusterEntrypoint.java:569) at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.main(YarnJobClusterEntrypoint.java:91) Caused by: java.lang.VerifyError: class org.apache.flink.yarn.YarnResourceManager overrides final method onStop.()Ljava/util/concurrent/CompletableFuture; at java.lang.ClassLoader.defineClass1(Native Method) at java.lang.ClassLoader.defineClass(ClassLoader.java:763) at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) at java.net.URLClassLoader.defineClass(URLClassLoader.java:467) at java.net.URLClassLoader.access$100(URLClassLoader.java:73) at java.net.URLClassLoader$1.run(URLClassLoader.java:368) at java.net.URLClassLoader$1.run(URLClassLoader.java:362) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:361) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:338) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:54) at org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint.createDispatcherResourceManagerComponentFactory(YarnJobClusterEntrypoint.java:38) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.runCluster(ClusterEntrypoint.java:231) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.lambda$startCluster$0(ClusterEntrypoint.java:178) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.runtime.entrypoint.ClusterEntrypoint.startCluster(ClusterEntrypoint.java:175) ... 2 common frames omitted Diagnostics: Application application_1635998548270_0028 failed 1 times (global limit =2; local limit is =1) due to AM Container for appattempt_1635998548270_0028_01 exited with exitCode: 1 For more detailed output, check the application tracking page: http://ark1.analysys.xyz:8088/cluster/app/application_1635998548270_0028 Then click on links to logs of each attempt. Diagnostics: Exception from container-launch. Container id: container_e391_1635998548270_0028_01_01 Exit code: 1 Stack trace: ExitCodeException exitCode=1: at org.apache.hadoop.util.Shell.runCommand(Shell.java:944) at org.apache.hadoop.util.Shell.run(Shell.java:848) at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1142) at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:237) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:317) at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:83) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Container exited with a non-zero exit code 1 Failing this attempt. Failing the application. ??
??????????
Hi ?? user-zh-unsubscr...@flink.apache.org Best JasonLee ??2021??10??25?? 10:52??zdj<1361776...@qq.com.INVALID> ??
??????flink-1.14 ???? kafkasource ????watermark????
Hi , wm > window.end_time ,?? wm , Best JasonLee ??2021??10??12?? 11:26??kcz<573693...@qq.com.INVALID> ?? times??+20??StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); KafkaSource
?????? flink sql????????????????sink table?
Hi , SQL SQL ??,?? Best JasonLee ??2021??09??23?? 09:28 ?? sql??sql?? iPhone -- -- ??: 2572805166 <2572805...@qq.com.INVALID : 2021??9??23?? 09:23 ??: user-zh https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like casel.chen ??2021??9??18?? 8:27?? kafka topic??topic) ??flink sqlsink table
回复:退订
Hi 退订应该发送到 user-zh-unsubscr...@flink.apache.org Best JasonLee 在2021年09月23日 15:20,wangjingen<13033709...@163.com> 写道: 退订
??????????
Hi ?? user-zh-unsubscr...@flink.apache.org Best JasonLee ??2021??09??23?? 21:22??Night_xing<1029681...@qq.com.INVALID> ??
回复:退订
Hi 退订应该发送到 user-zh-unsubscr...@flink.apache.org Best JasonLee 在2021年09月23日 15:26,金圣哲 写道: 退订
回复: flink sql是否支持动态创建sink table?
hi 这个我理解在 SQL 任务里面目前是没办法做到的 在 datastream api 任务是可以的 Best JasonLee 在2021年9月22日 11:35,酷酷的浑蛋 写道: 我也有这个需求,意思就是topic里实时新增了一种日志,然后想动态创建对应新的日志的topic表,并写入到新的topic表,在一个任务中完成 | | apache22 | | apach...@163.com | 签名由网易邮箱大师定制 在2021年09月22日 11:23,Caizhi Weng 写道: Hi! 不太明白这个需求,但如果希望发送给不同的 topic,需要给每个 topic 都定义 DDL。 如果是因为各 topic 之间的 schema 重复度比较高,只有些许字段以及 topic 名称等不同,可以看一下 DDL LIKE 语法: https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/create/#like casel.chen 于2021年9月18日周六 上午8:27写道: 上游kafka topic消息带有一个用户类型字段,现在想根据不同用户类型将数据发到不同topic(为了扩展不想写死有哪些类型) ,请问flink sql支持动态创建sink table吗?
回复:flink消费kafka分区消息不均衡问题
hi 图片看不到 我猜大概有两种情况 第一种是你的 source 本身就存在数据倾斜 某几个分区的数据量比其他分区的多 需要修改数据写入 kafka 分区策略让数据尽量均匀 第二种是你的下游计算的时候出现数据倾斜(或其他原因)导致任务反压到 source 端 这种情况需要根据实际的情况采用不同的解决方案 单纯的增加并发和改变 slot 数量没有什么效果 Best JasonLee 在2021年9月22日 09:22,casel.chen 写道: kafka topic有32个分区,实时作业开了32个并行度消费kafka topic,现在监控发现部分分区消息积压严重(如下图所示),请问会有哪些原因造成的?有什么解决办法吗?扩大分区数是不是也不能根治这种情况? PS: 每个分区消息数的确有所不均,但是同样消息数的几个分区也会出现积压不同情况(如15,16,17,18)。会是因为节点带宽受限造成的吗?当前numberOfSlots=8,改成numberOfSlots=1会有效果么? | 分区 ID | 客户端 | 最大位点 | 消费位点 | 堆积量 | | 0 | n/a | 14,131,397 | 14,130,923 | 474 | | 1 | n/a | 14,191,455 | 14,189,396 | 2,059 | | 2 | n/a | 14,611,826 | 14,610,262 | 1,564 | | 3 | n/a | 15,340,150 | 15,335,944 | 4,206 | | 4 | n/a | 16,379,487 | 16,372,237 | 7,250 | | 5 | n/a | 17,696,565 | 17,639,308 | 57,257 | | 6 | n/a | 19,200,829 | 19,129,856 | 70,973 | | 7 | n/a | 20,889,954 | 20,888,652 | 1,302 | | 8 | n/a | 22,643,539 | 22,536,468 | 107,071 | | 9 | n/a | 24,440,881 | 24,439,357 | 1,524 | | 10 | n/a | 26,178,250 | 26,073,197 | 105,053 | | 11 | n/a | 27,828,497 | 27,670,732 | 157,765 | | 12 | n/a | 29,284,463 | 29,283,105 | 1,358 | | 13 | n/a | 30,526,020 | 29,781,704 | 744,316 | | 14 | n/a | 31,468,482 | 31,467,243 | 1,239 | | 15 | n/a | 32,084,198 | 31,467,610 | 616,588 | | 16 | n/a | 32,393,752 | 32,019,836 | 373,916 | | 17 | n/a | 32,302,065 | 32,141,999 | 160,066 | | 18 | n/a | 31,875,063 | 31,874,452 | 611 | | 19 | n/a | 31,137,894 | 31,002,867 | 135,027 | | 20 | n/a | 30,098,926 | 29,930,855 | 168,071 | | 21 | n/a | 28,739,235 | 28,603,509 | 135,726 | | 22 | n/a | 27,221,026 | 27,220,821 | 205 | | 23 | n/a | 25,514,265 | 25,382,536 | 131,729 | | 24 | n/a | 23,779,714 | 23,689,296 | 90,418 | | 25 | n/a | 21,981,307 | 21,981,267 | 40 | | 26 | n/a | 20,237,925 | 20,223,880 | 14,045 | | 27 | n/a | 18,606,490 | 18,606,387 | 103 | | 28 | n/a | 17,178,098 | 17,177,971 | 127 | | 29 | n/a | 15,972,292 | 15,972,105 | 187 | | 30 | n/a | 15,032,355 | 15,032,138 | 217 | | 31 | n/a | 14,426,366 | 14,425,462 | 904 |
回复:退订
Hi 退订应该发送到 user-zh-unsubscr...@flink.apache.org Best JasonLee 在2021年09月14日 11:46,abel0130 写道: 退订
回复:flink on native k8s 无法查看火焰图问题
Hi 这个配置默认是关闭的,因为对性能有一定影响,具体的配置可以参考官网 https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/deployment/config/#advanced-options-for-the-rest-endpoint-and-client Best JasonLee 在2021年09月14日 11:20,赵旭晨 写道: Unable to load requested file /jobs/d2fcac59f4a42ad17ceba8c5371862bb/。。 请问这是什么原因啊,恳请大佬解惑 镜像版本flink:1.13.2-scala_2.12-java8
回复:Flink on yarn的日志监控和checkpoint的监控生产是如何处理的?
Hi 可以参考这两篇文章: https://mp.weixin.qq.com/s/2S4M8p-rBRinIRxmZrZq5Q https://mp.weixin.qq.com/s/44SXmCAUOqSWhQrNiZftoQ Best JasonLee 在2021年08月31日 13:23,guanyq 写道: flink on yarn 在集群中启动很多的task,生产应用中是如何监控task的日志,和checkpoint的呢? 求大佬指导。
??????flink keyby????
hi KeyGroupStreamPartitioner#selectChannel . Best JasonLee ??2021??8??30?? 22:34??cs<58683...@qq.com.INVALID> ?? flink??keybykey??tasktask??id
回复: Flink任务假死;无限100%反压;但下游节点无压力。
Hi 可以发一下任务的 DAG 吗 Best JasonLee 在2021年08月26日 13:09,yidan zhao 写道: 补充了个附录(https://www.yuque.com/sixhours-gid0m/ls9vqu/rramvh )正常任务和异常任务的window算子的FlameGraph,不清楚是否有参考价值。 yidan zhao 于2021年8月26日周四 下午1:01写道: 目前来看,我运行6小时,window总计才收到200MB数据,这个数据量级相比我很多小到没有一样。所以很难想象反压的原因是啥究竟。 目前来看反压节点的outPoolUsage是1,看起来合理,因为处于100%反压。 下游节点的inPoolUsage却是0,这个也很奇怪,同时下游buzz和backpress都是0%. Shengkai Fang 于2021年8月26日周四 下午12:33写道: - 得看一下具体的卡死的节点的栈,分析下具体的工作任务才知道。 - 日志中有包含错误的信息吗? Best, Shengkai yidan zhao 于2021年8月26日周四 下午12:03写道: 可能存在机器压力倾斜,但是我是不太清楚这种现象的原因,直接停滞了任务? 东东 于2021年8月26日周四 上午11:06写道: 建议检查一下是否有数据倾斜 在 2021-08-26 10:22:54,"yidan zhao" 写道: 问题期间的确ckpt时间较长。 但是,这个任务正常ckpt时间才不到1s,ckpt大小也就21MB,所以也很难说ckpt为啥会超时,我超时设置的2min。 Caizhi Weng 于2021年8月26日周四 上午10:20写道: Hi! 从图中情况来看很可能是因为下游 checkpoint 时间过长导致反压上游。是否观察过 checkpoint 的情况? yidan zhao 于2021年8月26日周四 上午10:09写道: 如题,这个问题以前遇到过,后来发生频率低了,近期又多了几次,下面是具体的话题讨论,email不方便贴图。 语雀:https://www.yuque.com/sixhours-gid0m/ls9vqu/rramvh
回复:退订
Hi 退订应该发送到 user-zh-unsubscr...@flink.apache.org Best JasonLee 在2021年08月21日 09:43,牛成 写道: 退订
??????kafka appender????
Hi error ? ?? CPU ?? IO ? Best JasonLee ??2021??08??4?? 12:25??datafollower<609326...@qq.com.INVALID> ?? hi allflink??5000+??error??kafka?? log4j2 ??kafkaappender ??error??kafka?? 1.kafka appender??error log10 2.error??kafka
回复: 如何从复杂的kafka消息体定义 table
Hi 事实上,你的 data 是一个 jsonarray 只需要列转行取 columnName 字段就可以了. Best JasonLee 在2021年07月9日 10:06,Chenzhiyuan(HR) 写道: 消息体里的data是实际每行数据,columnName是字段名,rawData是值,因此我想能定义table 如下: CREATE TABLE MyUserTable( APPLY_PERSON_ID VARCHAR, UPDATE_SALARY DECIMAL, UP_AMOUNT DECIMAL, CURRENCY VARCHAR, EXCHANGE_RATE DECIMAL ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'topic_name', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.group.id' = 'testGroup', 'format.type' = '?' 接下来直接查询每个字段的值: Table result = tEnv.sqlQuery("select APPLY_PERSON_ID, UPDATE_SALARY, UP_AMOUNT, CURRENCY, EXCHANGE_RATE from MyUserTable "); 请教下这个该如何定义DDL. 发件人: 17610775726 [mailto:17610775...@163.com] 发送时间: 2021年7月9日 9:26 收件人: Chenzhiyuan(HR) 主题: 回复:如何从复杂的kafka消息体定义 table hi 用 json 就行 可以参考这篇文章:https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA Best JasonLee 回复的原邮件 发件人 Chenzhiyuan(HR)<mailto:zhiyuan.c...@huawei.com> 发送日期 2021年07月09日 08:59 收件人 user-zh@flink.apache.org<mailto:user-zh@flink.apache.org> 主题 如何从复杂的kafka消息体定义 table 大家好: 我定义了一个table, 从kafka读取数据,不知道要怎么解析,用哪个format.type. 如果json, avro不能满足的话,是不是得自己自定义一个。 自定义的话不知道如何写,请各位帮忙指教下。 定义的表如下: CREATE TABLE MyUserTable( uuid VARCHAR, orgId VARCHAR ) with ( 'connector.type' = 'kafka', 'connector.version' = '0.11', 'connector.topic' = 'topic_name', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.group.id' = 'testGroup', 'format.type' = '?' ) Kafka的消息体如下, 好像不符合avro之类的标准格式: { "beforeData": [], "byteSize": 272, "columnNumber": 32, "data": [{ "byteSize": 8, "columnName": "APPLY_PERSON_ID", "rawData": 10017, "type": "LONG" }, { "byteSize": 12, "columnName": "UPDATE_SALARY", "rawData": "11000.00", "type": "DOUBLE" }, { "byteSize": 11, "columnName": "UP_AMOUNT", "rawData": "1000.00", "type": "DOUBLE" }, { "byteSize": 3, "columnName": "CURRENCY", "rawData": "CNY", "type": "STRING" }, { "byteSize": 32, "columnName": "EXCHANGE_RATE", "rawData": "1.00", "type": "DOUBLE" }, { "byteSize": 11, "columnName": "DEDUCTED_ACCOUNT", "rawData": "1000.00", "type": "DOUBLE" }, { "byteSize": 1, "columnName": "ENTER_AT_PROCESS", "rawData": "Y", "type": "STRING" }], "dataCount": 0, "dataMetaData": { "connector": "mysql", "pos": 1000368076, "row": 0, "ts_ms": 1625565737000, "snapshot": "false", "db": "testdb", "table": "flow_person_t" }, "key": "APPLY_PERSON_ID", "memorySize": 1120, "operation": "insert", "rowIndex": -1, "timestamp": "1970-01-01 00:00:00" }
回复:如何将canal json格式数据按操作类型过滤
hi 最后一个字段 type 就是操作的类型, 过滤掉 DELETE 就行了. Best JasonLee 在2021年7月7日 22:43,casel.chen 写道: 使用场景:我们使用canal将mysql binlog输出到kafka,然后想通过flink消费kafka数据过滤掉 delete 操作的数据插入到文件系统,因为要做历史数据存档用。 查了下官网 https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/connectors/table/formats/canal/#available-metadata {"data":[{"id":"111","name":"scooter","description":"Big 2-wheel scooter","weight":"5.18"}],"database":"inventory","es":158937356,"id":9,"isDdl":false,"mysqlType":{"id":"INTEGER","name":"VARCHAR(255)","description":"VARCHAR(512)","weight":"FLOAT"},"old":[{"weight":"5.15"}],"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"description":12,"weight":7},"table":"products","ts":1589373560798,"type":"UPDATE"} CREATETABLEKafkaTable(origin_databaseSTRINGMETADATAFROM'value.database'VIRTUAL,origin_tableSTRINGMETADATAFROM'value.table'VIRTUAL,origin_sql_typeMAPMETADATAFROM'value.sql-type'VIRTUAL,origin_pk_namesARRAYMETADATAFROM'value.pk-names'VIRTUAL,origin_tsTIMESTAMP(3)METADATAFROM'value.ingestion-timestamp'VIRTUAL,user_idBIGINT,item_idBIGINT,behaviorSTRING)WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','value.format'='canal-json'); 只能获取到原始 database, table, sql-type, pk-names, ingestion-timestamp 字段,而拿不到代表操作类型的 type 字段。请问有什么别的办法么?
??????flink1.13.1 webUI??????????????
Hi ??,??, https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/ Best JasonLee ??2021??07??5?? 12:08??lonely Wanderer<609326...@qq.com.INVALID> ?? hi all ??flink1.13.1webUI ??{"errors":["Unable to load requested file /jobs/b207b23893328639ee15e76f6320fe29/vertices/c27dcf7b54ef6bfd6cff02ca8870b681/flamegraph."]} state code : 404
Re: Re:Re: Re: Re:Re: flink sql job 提交到yarn上报错
hi 先执行一下 export HADOOP_CLASSPATH=`hadoop classpath` 就可以了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 退订
hi 退订发邮件到 user-zh-unsubscr...@flink.apache.org 就可以了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:flinksql消费kafka写入doris中文乱码
hi 你可以先用 print 的 connector 把消费到的数据打印一下看是否乱码? 还是写入到 doris 后出现的乱码? Best JasonLee 在2021年6月17日 21:31,maker_d...@foxmail.com 写道: 我使用flinksql消费kafka并将数据写入doris,但出现中文乱码。 SQL如下: CREATE TABLE `datacollect_business_kafka` ( `id` varchar(36), `chain_id` varchar(36), `app_id` varchar(32) , ... CHARACTER SET `UTF-8` ) WITH ( 'connector' = 'kafka', 'topic' = 'datacollect_business_stage', 'properties.bootstrap.servers' = 'XXX', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE `datacollect_business_doris` ( `id` varchar(36), `chain_id` varchar(36), `app_id` varchar(32) , ... CHARACTER SET `UTF-8` ) WITH ( 'connector' = 'doris', 'fenodes' = 'XXX', 'table.identifier' = 'stage_datacollect.datacollect_business', 'username' = 'XXX', 'password' = 'XXX', 'sink.batch.size' = '1' ); insert into datacollect_business_doris select * from datacollect_business_kafka; 在网上查找信息,flink-conf.yaml文件中添加:env.java.opts: "-Dfile.encoding=UTF-8” flink版本:1.12.4 部署模式:on yarn 希望各位大佬帮助。 谢谢! maker_d...@foxmail.com
Re: Re:Re:flink sql cdc数据同步至mysql
hi sink 端可以通过 sink.parallelism 进行设置. - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Checkpoint时内存不够TaskManager被Kill掉
hi 增大一下 taskmanager.memory.jvm-overhead 的内存试试 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于flink sql的kafka source的开始消费offset相关问题。
hi 那你只需要设置从 latest-offset 开始消费,并且禁用 checkpoint 就行了,至于重启的次数,可以通过 metrics 中的 numRestarts 去获取. - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于flink sql的kafka source的开始消费offset相关问题。
hi sql 也是会从上一次成功的 checkpoint 中保存的 offset 位置开始恢复数据的. - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink checkpoint 速度很慢 问题排查
hi source 端的并发保持和 partition 的个数一样就行了,不要大于 partition 个数,因为这会导致 subtask 空跑,浪费资源,你只需要把 map 的并行度调大即可. - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink checkpoint 速度很慢 问题排查
hi 你理解的可能有点偏差,应该是因为任务出现了反压或者数据倾斜的问题导致了cp时间长,01消息堆积说明已经反压到source端了,需要先定位反压的位置,看是具体什么原因导致的,然后再根据情况解决. - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.12 on yarn WebUI不显示logs
hi 有改过默认的日志配置文件吗? - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink1.13 通过sql cli执行hdfs上面的SQL文件
hi 目前还不支持HDFS路径,只支持本地的文件,未来应该会支持. - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 流与流 left join
Hi 可以看下 interval join 是否能满足你的需求 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: ./sql-client.sh embedded 这种方式提交的flink任务怎么设置state以及checkpoint?
hi 直接在 flink-conf.yaml 文件里面配置就行了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 回复:解析kafka 非标准JSON问题
hi SQL 可以定义一个字段然后分隔再去获取 JSON 数据 或者可以自定义 UDF 去处理 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL Metrics中Kafka Offset请教
hi currentOffsets: 表示的是每个分区的使用者当前读偏移量 , committedOffsets: 表示的是最后一次成功提交到Kafka的偏移量 ,因为 kafka 的 partition 的 offset 是从 0开始的 所以 committedOffsets 会比 currentOffsets 大 1 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink如何从流中取出自定义的数据结构并赋值给变量
hi 你可以用 filter 过滤出多个流或者用测流输出的方式分流处理 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 提交FlinkSQLKafka表报异常cannt load user class
hi 从报错上看是缺少了 Flink-sql-kafka 那个包,把这个包添加到 Flink/lib 下面再跑一下 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink run命令是否支持读取远程文件系统中的jar文件?
hi session ,per-job 模式是不支持的 application 模式是支持的 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink1.12.2 StreamingFileSink 问题
hi 可以参考这篇文章: https://mp.weixin.qq.com/s/HqXaREr_NZbZ8lgu_yi7yA - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql 写hdfs问题
hi 你需要添加下面两个参数: 'csv.line-delimiter'='', 'csv.disable-quote-character'='true' - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 使用了流应用中使用了mysql jdbc的source,Execution处于FINISHED状态无法生成检查点
hi 你需要使用 Temporal Table Join 的语法,具体操作可以参考官网 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/joins.html - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink -conf.yaml修改
hi 如果是 session 模式需要重启集群,如果是 per-job 模式直接提交任务即可. - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink 1.12.2 sql api 使用parquet格式报错
hi 可以参考下这篇文章: https://mp.weixin.qq.com/s/HqXaREr_NZbZ8lgu_yi7yA - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-sql 客户端采用execution.target: yarn-per-job 模式,如何指定提交的队列??
hi 可以通过在 flink-conf.yaml 配置文件中添加 yarn.application.queue 参数来设置 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: interval join 如何用 process time
hi proctime as PROCTIME() 可以这样设置 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于kafka中csv格式数据字段分隔符请教
hi 1,11 也是支持的 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Catalog(Kafka Connectors 的ddl)持久化到hive metastore,groupid一样的问题
hi 社区以及提供了动态修改表属性的功能,具体使用可以参考 https://mp.weixin.qq.com/s/nWKVGmAtENlQ80mdETZzDw - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于端到端的延迟监控
hi 可以在消费 kafka 的时候获取到数据进入 kafka 的时间戳 然后在最终的 sink 的时候再获取一个时间戳 自己定义一个 metric 上报最终的耗时 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: kafka 所有分区无数据的情况下,导致watermark无法前进
hi watermark 是要根据数据里面的时间戳生成的 所有分区都没有数据的情况下 为什么还要 watermark 推进呢? - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL csv格式分隔符设置失败
hi 改成下面这样: \n => U&'\000A' \t => U&'\0009' - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 回复: flink sql读kafka元数据问题
hi 通过 headers 这种方式是可以获取到的 jark 大佬说的那种方式我还没有测试 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql读kafka元数据问题
hi 你写入数据的时候设置 headers 了吗 没设置的话当然是空的了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 关于flink-sql 元数据问题
hi Flink SQL 建的表支持用 hive 的 catalog 来管理元数据,是否可以满足你的需求 ? - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL 1.11支持将数据写入到Hive吗?
hi 可以参考下这篇文章 https://mp.weixin.qq.com/s/HqXaREr_NZbZ8lgu_yi7yA - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink SQL使用Tumble窗口函数报NoSuchMethodError functions/AggregateFunction 异常
hi 从报错信息看应该jar包冲突了,可以贴一下相关的依赖包吗 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink-1.11 使用 application 模式时 jobid 问题
hi 可以参考一下这篇文章: https://mp.weixin.qq.com/s/S_Spm88eDtbza1QoLKiWlg - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re: Flink与Yarn的状态一致性问题
hi 把 -d 参加加上用分离方式启动 应该就可以了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re: Flink与Yarn的状态一致性问题
hi 从你的描述看确实起的是per-job模式,per-job模式目前应该是没有这个问题的.可以再看下你的UI上execution.attached 的值是什么吗? 再有启动任务的时候是否加了 -d 参数 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink与Yarn的状态一致性问题
hi 1,首先确定你提交的是per-job模式吗? 2,你说的任务状态是说jm还在任务在failover,还是任务确实是挂了,jm已经退出了? - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink tm cpu cores设置
hi 设置yarn.containers.vcores这个参数就可以了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 使用BroadcastStream后checkpoint失效
hi checkpoint目前只能用在流数据上,你读取的mongo数据是一个有界的数据源,所以是不支持做checkpoint就会导致整个任务的checkpoint失败,你可以把读取mongo做一个定时读取,这样应该就可以完成checkpoint. - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink获取latencymarker有什么好的方法
hi LatencyMarker 是一个全链路的延迟 不是非常的准确 不过也能大致反应端到端的延迟情况 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: FlinkKafkaConsumer on Yarn 模式下 设置并行度无法提高kafka的消费速度,但是提交两个应用却可以
HI 我理解你的 kafka 生产数据的速度比较慢 你并发设置的再大都是没有用的 正常 source 的并行度设置和 kafka 的 partition 个数相等就可以了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink Plan Visualizer
hi Flink plan visualizer 应该是只能画stream graph stream graph 是一个逻辑上的 DAG 图 你把任务提交到集群上 在 Flink WEB UI 上面就可以看到 job graph 了 stream graph 和 job graph 的区别是 job graph 优化了 operator chain job graph 是在调用 env.execute 方法之后才生成的 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink checkpoint导致反压严重
hi 我理解应该是数据倾斜的问题导致的 可以看下采用加随机数的方式key是否分布的均匀. -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re:Re: Flink SQL 问题;
hi 把Flink-jdbc的包放到flink/lib下面再跑下 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink1.11读取kafka问题
hi 首先是确认一下kafka是否有数据写入,其次把所有的operator都看下是否有反压的情况 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 如何设置FlinkSQL并行度
hi checkpoint savepoint的问题可以看下这个 https://mp.weixin.qq.com/s/Vl6_GsGeG0dK84p9H2Ld0Q -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: kafka分区数扩容对flink任务的影响
hi Flink是可以感知到partition的增加的 消费kafka的时候设置一下KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS这个参数 大于0就可以了 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql 如何指定之前的checkpoint执行
hi 可以参考这篇文章https://mp.weixin.qq.com/s/Vl6_GsGeG0dK84p9H2Ld0Q 在cancel的时候触发一个savepoint 修改完SQL从savepoint恢复任务 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 请教大佬一个在flink调用kafka数据源时'scan.startup.mode'参数的使用问题
hi 参数是这么写的没错 'scan.startup.mode' = 'earliest-offset' 你确定你是用的新的groupid吗 我这里测试是可以的从头开始消费的 不知道是不是你测试的方法不对 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 用hive streaming写 orc文件的问题
hi 我这边测试了ORC的,只需要把stored as pauquet 改成stored as orc即可,success文件能生成,hive里面也能查看数据,但是有一个问题是,Flink Web UI上面显示的数据量是不对的 UI 上面的records send 一直在增大 即使我已经停止向kafka写入数据了 但是hive 里面的数据是对的 我写了30条 hive里面查出来的确实是30条 但UI上面已经显示480条了 且还在增加 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink 1.11 使用sql将流式数据写入hive
Hi 可以看下這個demo https://mp.weixin.qq.com/s/HqXaREr_NZbZ8lgu_yi7yA -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:flink-sql-connector-elasticsearch6_2.11_1.10.0 与 flink-connector-elasticsearch6_2.11_1.10.0 并存问题
Hi 只引入-sql那个包就行了 代码也应该是可以直接用这个包的 | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年08月07日 09:26,费文杰 写道: HI: 使用的flink1.10环境,因为使用sql-client,我在flink/lib目录下放置了flink-sql-connector-elasticsearch6_2.11_1.10.0.jar,因为同事有直接在代码里写的sink到elasticsearch,所以他引入了flink-connector-elasticsearch6_2.11_1.10.0.jar, 这样就会导致他的任务报错。为了重现问题,我在本机同时引入了这两个依赖包: org.apache.httpcomponents httpclient 4.5.2 org.elasticsearch.client elasticsearch-rest-high-level-client ${elasticsearch.version} org.apache.flink flink-connector-elasticsearch6_2.11 ${flink.version} org.apache.flink flink-sql-connector-elasticsearch6_2.11 ${flink.version} provided 本地运行,有以下报错信息: Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: bb3bf961144b75bfc71c9dd3efaf59f9) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) at scala.concurrent.impl.CallbackRunnable.run$$$capture(Promise.scala:36) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) ... 33 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62
Re: flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running
hi 我记得我用1.6.0版本的时候就有这个问题 好像是没有对应的jira 不过我用新版本已经没有遇到这个问题了 应该是偶尔会出现 -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:Flink 通过sql client 启动的任务,kill掉之后,是否可以指定checkpoint恢复?
HI 目前sql-client的方式应该还不支持从指定的checkpoint恢复任务 不过Flink on zeppelin目前已经支持了 有兴趣可以用下 | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年08月04日 16:28,mispower 写道: 通过sql_client 启动的streaming的任务,在维护或者异常之后,如何像flink straming 一样通过指定checkpoint 恢复到上一次的消费节点。 在邮件列表里搜索了相关的问题,好像都没有明确回答。
回复:flink1.9.1任务已经fail掉了,但在yarn上这个application还是在running
hi 这本身就是一个bug 应该是还没有修复 | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年08月04日 15:41,bradyMk 写道: 您好 我这边是用perJob的方式提交的,而且这种现象还是偶发性的,这次错误日志是这样的: 2020-08-04 10:30:14,475 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Job flink2Ots (e11a22af324049217fdff28aca9f73a5) switched from state FAILING to FAILED. java.lang.Exception: Container released on a *lost* node at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:370) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2020-08-04 10:30:14,476 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- Could not restart the job flink2Ots (e11a22af324049217fdff28aca9f73a5) because the restart strategy prevented it. java.lang.Exception: Container released on a *lost* node at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:370) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:397) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:190) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 2020-08-04 10:30:14,476 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping checkpoint coordinator for job e11a22af324049217fdff28aca9f73a5. 2020-08-04 10:30:14,476 INFO org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore - Shutting down 但是我之前也遇到过这个错误时,yarn上的application是可以退出的。 - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:flink1.9.1 在WebUI中查看committedOffsets指标为负值
hi 提交offset到Kafka是在ck成功之后 如果没有开启ck的话 需要设置自动提交提交offset | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月30日 19:37,bradyMk 写道: 谢谢解答~ 这个确实是个不变的值,应该是没有成功提交;而且我发现了,只要是没有设置ck的任务,该指标都会显示这个值,如果设置了ck,就会正常;但是我不懂为什么会这样,请问您知道详细的原因么? -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Could not find any factory for identifier 'kafka'
hi 只需要-sql和-json两个包就可以了 | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master On 07/24/2020 17:02, RS wrote: hi, Flink-1.11.1 尝试运行SQL DDL 读取kafka的数据,执行create 语句的时候报错了 编译的jar包是jar-with-dependencies的 代码片段: public String ddlSql = String.format("CREATE TABLE %s (\n" + " number BIGINT,\n" + " msg STRING,\n" + " username STRING,\n" + " update_time TIMESTAMP(3)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'topic' = '%s',\n" + " 'properties.bootstrap.servers' = '%s',\n" + " 'properties.group.id' = '%s',\n" + " 'format' = 'json',\n" + " 'json.fail-on-missing-field' = 'false',\n" + " 'json.ignore-parse-errors' = 'true'\n" + ")\n", tableName, topic, servers, group); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.executeSql(ddlSql); 报错信息: Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kafka' that implements 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. Available factory identifiers are: datagen at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) ... 33 more 参考了这个 http://apache-flink.147419.n8.nabble.com/flink-1-11-executeSql-DDL-td4890.html#a4893 补充了flink-connector-kafka_2.12,flink-sql-connector-kafka_2.12, 还是会报一样的错 附上pom依赖: org.apache.flink flink-java ${flink.version} org.apache.flink flink-table-api-java-bridge_2.12 ${flink.version} org.apache.flink flink-table-api-java ${flink.version} org.apache.flink flink-connector-kafka_2.12 ${flink.version} org.apache.flink flink-sql-connector-kafka_2.12 ${flink.version} org.apache.flink flink-json ${flink.version} 感谢各位~
回复:flink1.11启动问题
Hi 报错显示的是资源不足了 你确定yarn上的资源是够的吗 看下是不是节点挂了 1.11我这边提交任务都是正常的 | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月21日 16:36,酷酷的浑蛋 写道: 服了啊,这个flink1.11启动怎么净是问题啊 我1.7,1.8,1.9 都没有问题,到11就不行 ./bin/flink run -m yarn-cluster -yqu root.rt_constant -ys 2 -yjm 1024 -yjm 1024 -ynm sql_test ./examples/batch/WordCount.jar --input hdfs://xxx/data/wangty/LICENSE-2.0.txt --output hdfs://xxx/data/wangty/a 报错: Caused by: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not allocate the required slot within slot request timeout. Please make sure that the cluster has enough resources. at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeWrapWithNoResourceAvailableException(DefaultScheduler.java:441) ... 45 more Caused by: java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) ... 25 more 我资源是足的啊,就flink1.11起不来,一直卡在那里,卡好久然后报这个错,大神们帮看看吧,昨天的jar包冲突问题已经解决(只有flink1.11存在的问题),
回复:flink1.11启动问题
HI 你使用的什么模式?启动任务的命令发出来看一下吧 | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月22日 12:44,酷酷的浑蛋 写道: 现在是为什么啊?启动任务自动占用所有core?core数量一直在增加,直到达到队列最大值,然后就卡住了,这flink1.11啥情况啊?
??????flink-1.11 ddl kafka-to-hive????
hi hive?? | | JasonLee | | ??17610775...@163.com | Signature is customized by Netease Mail Master ??2020??07??21?? 19:09??kcz ?? hive-1.2.1 chk ??chkchk??kafkahive?? String hiveSql = "CREATE TABLE stream_tmp.fs_table (\n" + " host STRING,\n" + " url STRING," + " public_date STRING" + ") partitioned by (public_date string) " + "stored as PARQUET " + "TBLPROPERTIES (\n" + " 'sink.partition-commit.delay'='0 s',\n" + " 'sink.partition-commit.trigger'='partition-time',\n" + " 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")"; tableEnv.executeSql(hiveSql); tableEnv.executeSql("INSERT INTO stream_tmp.fs_table SELECT host, url, DATE_FORMAT(public_date, '-MM-dd') FROM stream_tmp.source_table");
??????state??????checkpoint??????
hi UI??checkpoint??checkpoint?? ?? | | JasonLee | | ??17610775...@163.com | Signature is customized by Netease Mail Master ??2020??07??17?? 17:21??sun ?? ??counts ?? Listhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html Best, Congxian sun <1392427...@qq.com ??2020??7??16?? 6:16?? env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // env.setRestartStrategy(RestartStrategies.noRestart()); env.getCheckpointConfig().setCheckpointTimeout(500); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setStateBackend(new RocksDBStateBackend("file:///opt/flink/flink-1.7.2/checkpoints")); ??private transient ListState
回复:Flink 1.11 Hive Streaming Write的问题
hi 需要开启checkpoint | | JasonLee | | 邮箱:17610775...@163.com | Signature is customized by Netease Mail Master 在2020年07月16日 18:03,李佳宸 写道: 想请教下大家 hive streaming write需要有哪些配置,不知道为什么我的作业能够跑起来,但是没有数据写入hive。 批量的hive写入,流环境的读取是正常的。 附代码,很简短: public class KafkaToHiveStreaming { public static void main(String[] arg) throws Exception{ StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings); String name= "myhive"; String defaultDatabase = "default"; String hiveConfDir = "/Users/uzi/Downloads/Hadoop/apache-hive-3.1.2-bin/conf/"; // a local path String version = "3.1.2"; HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); bsTableEnv.registerCatalog("myhive", hive); bsTableEnv.useCatalog("myhive"); bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); bsTableEnv.executeSql("CREATE TABLE topic_products (" + " id BIGINT ," + " order_id STRING," + " amount DECIMAL(10, 2)," + " create_time TIMESTAMP " + ") WITH (" + " 'connector' = 'kafka'," + " 'topic' = 'order.test'," + " 'properties.bootstrap.servers' = 'localhost:9092'," + " 'properties.group.id' = 'testGroup'," + " 'scan.startup.mode' = 'earliest-offset', " + " 'format' = 'json' " + ")"); bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); bsTableEnv.executeSql("CREATE TABLE hive_sink_table_streaming (" + " id BIGINT ," + " order_id STRING," + " amount DECIMAL(10, 2)" + " )"); bsTableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT); bsTableEnv.executeSql("CREATE TABLE print_table WITH ('connector' = 'print')" + "LIKE INSERT INTO hive_sink_table_streaming (EXCLUDING ALL)"); bsTableEnv.getConfig().setSqlDialect(SqlDialect.HIVE); bsTableEnv.executeSql("INSERT INTO hive_sink_table_streaming SELECT " + "id, " + "order_id, " + "amount " + "FROM topic_products"); Table table1 = bsTableEnv.from("hive_sink_table_streaming"); table1.executeInsert("print_table"); } }