flink1.12.4 写入hdfs报错 java.lang.OutOfMemoryError: Direct buffer memory
通过flink 1.12.4 streaming file sink 写入hdfs,运行过程中抛出以下异常: 2021-11-08 20:39:05 java.io.IOException: java.lang.OutOfMemoryError: Direct buffer memory at org.apache.hadoop.hdfs.DataStreamer$LastExceptionInStreamer.set(DataStreamer.java:299) at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:820) Caused by: java.lang.OutOfMemoryError: Direct buffer memory at java.nio.Bits.reserveMemory(Bits.java:694) at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123) at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) at sun.nio.ch.IOUtil.write(IOUtil.java:58) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468) at org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63) at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142) at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159) at org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117) at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122) at java.io.DataOutputStream.write(DataOutputStream.java:107) at org.apache.hadoop.hdfs.DFSPacket.writeTo(DFSPacket.java:180) at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:765)
Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点
Hi 可以考虑使用yarn的node label特性,将flink的任务提交到特定的node上 Original Message Sender: r pp Recipient: user-zh Date: Monday, Dec 21, 2020 21:25 Subject: Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点 嗯,指定几台机子 用于使用flink 运行,为什么 不在yarn 为flink 专门制定 一个队列呢?需要 网络隔离 。。。内网速度多大? 于2020年12月21日周一 下午5:48写道: > 通过yarn label可以实现 > > -邮件原件- > 发件人: user-zh-return-10095-afweijian=163@flink.apache.org > 代表 yujianbo > 发送时间: 2020年12月21日 16:44 > 收件人: user-zh@flink.apache.org > 主题: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点 > > 各位大佬好: > 请问Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: FlinkSQL如何定义JsonObject数据的字段类型
Hi Jark sorry,是1.12.0, 我打错了 Original Message Sender: Jark Wu Recipient: user-zh Date: Wednesday, Dec 9, 2020 14:40 Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的? 1.12.0 这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao cai wrote: > 好的,计划下周升级测试下,另:1.12.1计划何时发布呢 > > > Original Message > Sender: Jark Wu > Recipient: user-zh > Date: Tuesday, Dec 8, 2020 13:41 > Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 > > > hailong 说的定义成 STRING 是在1.12 版本上支持的, > https://issues.apache.org/jira/browse/FLINK-18002 1.12 > 这两天就会发布,如果能升级的话,可以尝试一下。 Best, Jark On Tue, 8 Dec 2020 at 11:56, wxpcc < > wxp4...@outlook.com> wrote: > 可以使用字符串的方式,或者自定义 > String类型format,内部结构再通过udf去做后续的实现 > > > > -- > Sent from: > http://apache-flink.147419.n8.nabble.com/
Re: FlinkSQL如何定义JsonObject数据的字段类型
Hi Jark Original Message Sender: Jark Wu Recipient: user-zh Date: Wednesday, Dec 9, 2020 14:40 Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的? 1.12.0 这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao cai wrote: > 好的,计划下周升级测试下,另:1.12.1计划何时发布呢 > > > Original Message > Sender: Jark Wu > Recipient: user-zh > Date: Tuesday, Dec 8, 2020 13:41 > Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 > > > hailong 说的定义成 STRING 是在1.12 版本上支持的, > https://issues.apache.org/jira/browse/FLINK-18002 1.12 > 这两天就会发布,如果能升级的话,可以尝试一下。 Best, Jark On Tue, 8 Dec 2020 at 11:56, wxpcc < > wxp4...@outlook.com> wrote: > 可以使用字符串的方式,或者自定义 > String类型format,内部结构再通过udf去做后续的实现 > > > > -- > Sent from: > http://apache-flink.147419.n8.nabble.com/
Re: FlinkSQL如何定义JsonObject数据的字段类型
好的,计划下周升级测试下,另:1.12.1计划何时发布呢 Original Message Sender: Jark Wu Recipient: user-zh Date: Tuesday, Dec 8, 2020 13:41 Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 hailong 说的定义成 STRING 是在1.12 版本上支持的, https://issues.apache.org/jira/browse/FLINK-18002 1.12 这两天就会发布,如果能升级的话,可以尝试一下。 Best, Jark On Tue, 8 Dec 2020 at 11:56, wxpcc wrote: > 可以使用字符串的方式,或者自定义 String类型format,内部结构再通过udf去做后续的实现 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Re: FlinkSQL如何定义JsonObject数据的字段类型
如果我只是想要返回jsonObject.toString的内容呢?不需要解析嵌套结构 Original Message Sender: 赵一旦 Recipient: user-zh Date: Monday, Dec 7, 2020 21:13 Subject: Re: Re: FlinkSQL如何定义JsonObject数据的字段类型 flink sql 支持不了这个需要。最多支持到Map,Map内部继续嵌套是不支持的。 hailongwang <18868816...@163.com> 于2020年12月7日周一 下午8:03写道: > > > Schema 不太确定的话,那么下游怎么用这个数据呢? > > > Best, > Hailong > > 在 2020-12-07 15:21:16,"xiao cai" 写道: > >ROW需要写明具体的字段类型,比如: > >ROW<`id` string, …>,但是我并没有办法知道jsonObject中具体的schema > > > > > > Original Message > >Sender: 李轲 > >Recipient: user-zh > >Date: Monday, Dec 7, 2020 16:14 > >Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 > > > > > >可以试试ROW 发自我的iPhone > 在 2020年12月7日,15:43,xiao cai 写道: > > > String不行,取出来的值是null > > > Original Message > Sender: silence< > slle...@aliyun.com.INVALID> > Recipient: user-zh > > Date: Monday, Dec 7, 2020 14:26 > Subject: Re: > FlinkSQL如何定义JsonObject数据的字段类型 > > > 可以用string -- Sent from: > http://apache-flink.147419.n8.nabble.com/ >
Re:Re: FlinkSQL如何定义JsonObject数据的字段类型
Schema不确定,下游可以当做JsonString来处理,不同的业务下游处理各自上游的业务,这是由业务两端自己沟通确定的。但是通用的平台这边转的时候是不知道这个规则的,所以需要有一个字段,来统一提供一个字段给外部业务方填充、传递。 Original Message Sender: hailongwang<18868816...@163.com> Recipient: user-zh Date: Monday, Dec 7, 2020 19:52 Subject: Re:Re: FlinkSQL如何定义JsonObject数据的字段类型 Schema 不太确定的话,那么下游怎么用这个数据呢? Best, Hailong 在 2020-12-07 15:21:16,"xiao cai" 写道: >ROW需要写明具体的字段类型,比如: >ROW<`id` string, …>,但是我并没有办法知道jsonObject中具体的schema > > > Original Message >Sender: 李轲 >Recipient: user-zh >Date: Monday, Dec 7, 2020 16:14 >Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 > > >可以试试ROW 发自我的iPhone > 在 2020年12月7日,15:43,xiao cai 写道: > > String不行,取出来的值是null > > > Original Message > Sender: silence > Recipient: user-zh > Date: Monday, Dec 7, 2020 14:26 > Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 > > > 可以用string -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: FlinkSQL如何定义JsonObject数据的字段类型
ROW需要写明具体的字段类型,比如: ROW<`id` string, …>,但是我并没有办法知道jsonObject中具体的schema Original Message Sender: 李轲 Recipient: user-zh Date: Monday, Dec 7, 2020 16:14 Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 可以试试ROW 发自我的iPhone > 在 2020年12月7日,15:43,xiao cai 写道: > > String不行,取出来的值是null > > > Original Message > Sender: silence > Recipient: user-zh > Date: Monday, Dec 7, 2020 14:26 > Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 > > > 可以用string -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: FlinkSQL如何定义JsonObject数据的字段类型
String不行,取出来的值是null Original Message Sender: silence Recipient: user-zh Date: Monday, Dec 7, 2020 14:26 Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 可以用string -- Sent from: http://apache-flink.147419.n8.nabble.com/
FlinkSQL如何定义JsonObject数据的字段类型
Hi, flink version: 1.11.2 api: flink-sql 场景:使用flink sql定义了一张kafka的source表,kafka中数据为json格式的字符串。 其中context是json的一个键,其值为jsonObject,数据示例如下: { “id”: 1, "context”: { … (这里的数据为jsonObject,具体schema不确定, 由各个业务方自行确定,可能嵌套,也可能不嵌套,完全不可控) } } 建表语句为: CREATE TABLE json_source ( id bigint, context ) WITH ( 'connector' = 'kafka’, 'format' = 'json’ ); 问题: 该使用什么数据类型来指定类型呢?从目前的flink sql 的 data type 里感觉没有很合适的匹配项,不管是ROW,或者MAP都不太合适。 请求指教,万分感谢!
Re:FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题
Hi wang: 非常感谢解答,我先顺着你的思路去详细了解下这个过程。 Good luck. Best, xiao 原始邮件 发件人: hailongwang<18868816...@163.com> 收件人: user-zh 发送时间: 2020年11月3日(周二) 21:42 主题: Re:FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题 Hi xiao, 从报错来看,这个 SQL 应该是 match 了 `StreamExecJoinRule`,而 regular join 不能有 rowtime 属性。 应该是因为你的 kafkaSouce1 table 的 rowtime 经过 group by 后使用了 last_value 导致不是时间属性类型->`TimeIndicatorRelDataType`,而在 rule 进行判断后没有 windowBounds,所以就报了现在这个错误了。 Best, Hailong Wang 在 2020-11-03 18:27:51,"xiao cai" 写道: >Hi : >flink 版本 1.11.2 >问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. > > >代码: >// stream 1 >create table kafkaSource1 ( >id int, >field_1 int, >field_2 varchar, >ts1 timestamp(3), >watermark for `ts1` >) with ( >connector = kafka >) >// stream 2 >create table kafkaSource2 ( >id int, >field_3 >ts2 timestamp(3), >watermark for `ts2` >) with ( >connector = kafka >) > > >//create view >create view kafkaSource1_view as >select >field_1 as field_1, >last_value(field_2) as field_2, >last_value(ts1) as ts1 >from kafkaSouce1 >group by field_1 > > >// query >insert into sinkTable >select >a.field_1, >b.field_3 >from kafkaSource2 a join kafkaSource1_view b >on a.id = b.id >and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY
FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题
Hi : flink 版本 1.11.2 问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. 代码: // stream 1 create table kafkaSource1 ( id int, field_1 int, field_2 varchar, ts1 timestamp(3), watermark for `ts1` ) with ( connector = kafka ) // stream 2 create table kafkaSource2 ( id int, field_3 ts2 timestamp(3), watermark for `ts2` ) with ( connector = kafka ) //create view create view kafkaSource1_view as select field_1 as field_1, last_value(field_2) as field_2, last_value(ts1) as ts1 from kafkaSouce1 group by field_1 // query insert into sinkTable select a.field_1, b.field_3 from kafkaSource2 a join kafkaSource1_view b on a.id = b.id and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY
FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题
Hi : flink 版本 1.11.2 问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before. 代码: // stream 1 create table kafkaSource1 ( id int, field_1 int, field_2 varchar, ts1 timestamp(3), watermark for `ts1` ) with ( connector = kafka ) // stream 2 create table kafkaSource2 ( id int, field_3 ts2 timestamp(3), watermark for `ts2` ) with ( connector = kafka ) //create view create view kafkaSource1_view as select field_1 as field_1, last_value(field_2) as field_2, last_value(ts1) as ts1 from kafkaSouce1 group by field_1 // query insert into sinkTable select a.field_1, b.field_3 from kafkaSource2 a join kafkaSource1_view b on a.id = b.id and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY
如何为每个flink任务分别设置metrics的reporter
Hi: 已知的设置metrics reporter的方式是在conf/flink-conf.yaml中,如果想要为每个任务分别设置不同的metrics reporter或者设置不同的参数,比如设置prometheus pushgateway的多个自定义的k=v,该如何设置呢? Best xiao.
Re: 怎么样在Flink中使用java代码提交job到yarn
这个我们有尝试,遇到了classpath的问题,导致包冲突,无法启动进程,你们有遇到过相关的情况吗? 原始邮件 发件人: todd 收件人: user-zh 发送时间: 2020年9月29日(周二) 17:36 主题: Re: 怎么样在Flink中使用java代码提交job到yarn https://github.com/todd5167/flink-spark-submiter 可以参考这个案例,用ClusterCLient提交。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复: Re: 怎么样在Flink中使用java代码提交job到yarn
非常感谢建议,有zeeplin api的相关文档吗 原始邮件 发件人: chengyanan1...@foxmail.com 收件人: user-zh 发送时间: 2020年9月29日(周二) 09:54 主题: 回复: Re: 怎么样在Flink中使用java代码提交job到yarn 我们项目中也是用到了这个,我也是暂时采用的捕获日志来解析得到yarn application id 和 flink job id的 后期重点研究一下zeeplin,或许可以修改一下源码来镶嵌到我们自己的系统中或者直接调用zeeplin的api 发件人: xushanshan 发送时间: 2020-09-25 16:42 收件人: user-zh 主题: Re: 怎么样在Flink中使用java代码提交job到yarn 可以捕获控制台打印出来的日志,flink相关日志的格式很固定,字符串截取就能获得 yarn application id 和 flink job id > 在 2020年9月25日,下午4:23,xiao cai 写道: > > Hi all: > 大家好,我目前遇到一个flink 任务提交方面的困扰: > 想要在自己的项目中(springboot)提交flink job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包 > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢 > > > best, > xiao
Re:Re:HistoryServer完成任务丢失的问题
貌似是个bug 原始邮件 发件人: xiao cai 收件人: user-zh 发送时间: 2020年9月27日(周日) 18:31 主题: Re:Re:HistoryServer完成任务丢失的问题 是在history server中没有,但是yarn logs还是可以看到的,我理解是任务结束前,jobManager没有upload文件到指定目录,所以history server没有拉取到。但是为何没有upload,我通过jobManager的日志也没看出来,没有任何报错。 原始邮件 发件人: Michael Ran 收件人: user-zh 发送时间: 2020年9月27日(周日) 17:06 主题: Re:Re:HistoryServer完成任务丢失的问题 你的意思是,日志彻底消失了?完全找不到? 不会是你任务有问题,压根就没成功,没产生吧。 理论上日志不可能平白无故消失的 在 2020-09-27 17:03:45,"xiao cai" 写道: >是的,默认是10s一次,但是这个是去jobmanager的归档目录里拉取的间隔。 >问题是cancel的那次job,并没有上传日志信息到归档目录里。 > > > 原始邮件 >发件人: Michael Ran >收件人: user-zh >发送时间: 2020年9月27日(周日) 16:45 >主题: Re:HistoryServer完成任务丢失的问题 > > >history 记得是定时拉取的,有延迟过去 在 2020-09-27 16:40:27,"xiao cai" 写道: >Hi: >flink 1.11.0 >我启动了一个任务,当这个任务进行checkpoint的时候,我在webui中点击了cancel,任务在yarn中的状态时killed,但是再到history server中却找不到这个任务。同时我尝试了再yarn中kill application的方式,也是无法发现。这个问题有点随机,不是每次都会出现,有时候执行cancel,在history server又能看到。希望了解history serve相关原理的同学给予帮助。 >非常感谢。 > > > > >best, >xiao.
Re:Re:HistoryServer完成任务丢失的问题
貌似是个bug,我的版本是1.11.0 https://issues.apache.org/jira/browse/FLINK-18959?jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20Bug%20AND%20text%20~%20%22history%20server%22 原始邮件 发件人: xiao cai 收件人: user-zh 发送时间: 2020年9月27日(周日) 18:41 主题: Re:Re:HistoryServer完成任务丢失的问题 貌似是个bug 原始邮件 发件人: xiao cai 收件人: user-zh 发送时间: 2020年9月27日(周日) 18:31 主题: Re:Re:HistoryServer完成任务丢失的问题 是在history server中没有,但是yarn logs还是可以看到的,我理解是任务结束前,jobManager没有upload文件到指定目录,所以history server没有拉取到。但是为何没有upload,我通过jobManager的日志也没看出来,没有任何报错。 原始邮件 发件人: Michael Ran 收件人: user-zh 发送时间: 2020年9月27日(周日) 17:06 主题: Re:Re:HistoryServer完成任务丢失的问题 你的意思是,日志彻底消失了?完全找不到? 不会是你任务有问题,压根就没成功,没产生吧。 理论上日志不可能平白无故消失的 在 2020-09-27 17:03:45,"xiao cai" 写道: >是的,默认是10s一次,但是这个是去jobmanager的归档目录里拉取的间隔。 >问题是cancel的那次job,并没有上传日志信息到归档目录里。 > > > 原始邮件 >发件人: Michael Ran >收件人: user-zh >发送时间: 2020年9月27日(周日) 16:45 >主题: Re:HistoryServer完成任务丢失的问题 > > >history 记得是定时拉取的,有延迟过去 在 2020-09-27 16:40:27,"xiao cai" 写道: >Hi: >flink 1.11.0 >我启动了一个任务,当这个任务进行checkpoint的时候,我在webui中点击了cancel,任务在yarn中的状态时killed,但是再到history server中却找不到这个任务。同时我尝试了再yarn中kill application的方式,也是无法发现。这个问题有点随机,不是每次都会出现,有时候执行cancel,在history server又能看到。希望了解history serve相关原理的同学给予帮助。 >非常感谢。 > > > > >best, >xiao.
Re:Re:HistoryServer完成任务丢失的问题
是在history server中没有,但是yarn logs还是可以看到的,我理解是任务结束前,jobManager没有upload文件到指定目录,所以history server没有拉取到。但是为何没有upload,我通过jobManager的日志也没看出来,没有任何报错。 原始邮件 发件人: Michael Ran 收件人: user-zh 发送时间: 2020年9月27日(周日) 17:06 主题: Re:Re:HistoryServer完成任务丢失的问题 你的意思是,日志彻底消失了?完全找不到? 不会是你任务有问题,压根就没成功,没产生吧。 理论上日志不可能平白无故消失的 在 2020-09-27 17:03:45,"xiao cai" 写道: >是的,默认是10s一次,但是这个是去jobmanager的归档目录里拉取的间隔。 >问题是cancel的那次job,并没有上传日志信息到归档目录里。 > > > 原始邮件 >发件人: Michael Ran >收件人: user-zh >发送时间: 2020年9月27日(周日) 16:45 >主题: Re:HistoryServer完成任务丢失的问题 > > >history 记得是定时拉取的,有延迟过去 在 2020-09-27 16:40:27,"xiao cai" 写道: >Hi: >flink 1.11.0 >我启动了一个任务,当这个任务进行checkpoint的时候,我在webui中点击了cancel,任务在yarn中的状态时killed,但是再到history server中却找不到这个任务。同时我尝试了再yarn中kill application的方式,也是无法发现。这个问题有点随机,不是每次都会出现,有时候执行cancel,在history server又能看到。希望了解history serve相关原理的同学给予帮助。 >非常感谢。 > > > > >best, >xiao.
Re:HistoryServer完成任务丢失的问题
是的,默认是10s一次,但是这个是去jobmanager的归档目录里拉取的间隔。 问题是cancel的那次job,并没有上传日志信息到归档目录里。 原始邮件 发件人: Michael Ran 收件人: user-zh 发送时间: 2020年9月27日(周日) 16:45 主题: Re:HistoryServer完成任务丢失的问题 history 记得是定时拉取的,有延迟过去 在 2020-09-27 16:40:27,"xiao cai" 写道: >Hi: >flink 1.11.0 >我启动了一个任务,当这个任务进行checkpoint的时候,我在webui中点击了cancel,任务在yarn中的状态时killed,但是再到history server中却找不到这个任务。同时我尝试了再yarn中kill application的方式,也是无法发现。这个问题有点随机,不是每次都会出现,有时候执行cancel,在history server又能看到。希望了解history serve相关原理的同学给予帮助。 >非常感谢。 > > > > >best, >xiao.
HistoryServer完成任务丢失的问题
Hi: flink 1.11.0 我启动了一个任务,当这个任务进行checkpoint的时候,我在webui中点击了cancel,任务在yarn中的状态时killed,但是再到history server中却找不到这个任务。同时我尝试了再yarn中kill application的方式,也是无法发现。这个问题有点随机,不是每次都会出现,有时候执行cancel,在history server又能看到。希望了解history serve相关原理的同学给予帮助。 非常感谢。 best, xiao.
Flink SQL如何设置checkpoint的TTL
Hi: 目前想了解下载Flink SQL下该如何设置checkpoint的TTL。 非常感谢指教 Best, xiao.
Re:Re:Re: Re: Flink SQL撤回流问题
Hi Michael Ran: 是的,我其实也不需要id,但是dba建表要求必须有自增id,所以才发现这个问题。我去查了mysql的文档,是innodb对auto_increment做了设置,默认就会对所有insert执行auto_increment + 1操作,可以通过修改innodb的配置来避免这个情况,但是会引擎写入性能的下降(有锁)。这个问题确实很隐含,很难发现,非常感谢解答。 但是,对于insert into on dumplicate key的方式还是有质疑,感觉如果可以将insert和update明确的区分开,这样会更加好。再次感谢。 原始邮件 发件人: Michael Ran 收件人: user-zh 发送时间: 2020年9月27日(周日) 15:03 主题: Re:Re:Re: Re: Flink SQL撤回流问题 感觉这不是flink的问题,我们也有类似场景,dt 按天其实并不多,直接就没要id了,如果你非要id,而且数量变化巨大,那么用integer ,当然还是有可能超。 auto 一般适用数据量不大的单表场景。分布式大数据量场景,都是自己设计id,或者不要id 在 2020-09-27 14:56:06,"xiao cai" 写道: >Hi Ran: >非常感谢,我试了insert into ON DUPLICATE KEY UPDATE dt=“dt"的方式,确实是会出现update的始终是id=1,但是auto_increment 却一直增加的情况。感觉这样不是很合理,因为随着数据量的增加,迟早会出现数值越界的情形。 > > > 原始邮件 >发件人: Michael Ran >收件人: user-zh >发送时间: 2020年9月27日(周日) 14:37 >主题: Re:Re: Re: Flink SQL撤回流问题 > > >没有传入id,始终是1 ? 那就是第一次insert update 之后,生成的1.后面都是insert into table(dt,num) values(dt,新数量) ON DUPLICATE KEY UPDATE dt=values(dt)你模拟下这个语句呢,看看id成为1 之后,是不是就不变了 在 2020-09-27 14:32:57,"xiao cai" 写道: >Hi lec ssmi: > insert的时候没有指定id,只指定了dt和cnt,因为id在mysql的sink表里是自增的主键,所以flink sql里并没有指定。 > 原始邮件 >发件人: lec ssmi >收件人: flink-user-cn >发送时间: 2020年9月27日(周日) 14:25 >主题: Re: Re: Flink SQL撤回流问题 > > >你那个主键=1,应该是传进去的,不是mysql自动生成的吧。所以auto_increment才会一直增长啊。 kandy.wang 于2020年9月27日周日 下午2:01写道: > > > > > > > hi > 你建mysql要指定主键,另外创建flink表时也要指定一下主键 > > PRIMARY KEY (id) NOT ENFORCED,这样就会根据主键upsert了 > > > > > > > > > 在 2020-09-27 13:36:25,"xiao cai" 写道: > > >如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。 > >这是我很困惑的地方。 > > > > > > 原始邮件 > >发件人: lec ssmi > >收件人: flink-user-cn > >发送时间: 2020年9月27日(周日) 13:06 > >主题: Re: Flink SQL撤回流问题 > > > > > >是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。 xiao cai < > flin...@163.com> 于2020年9月27日周日 下午12:08写道: > 场景如下: > source table: kafka > > sink table: mysql schem(id, dt, cnt) > > > insert : > insert into sink > > select dt,count(distinct id) from source group by dt; > > > > 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show > create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > > 发件人: Michael Ran > 收件人: user-zh< > user-zh@flink.apache.org> > 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink > SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai" > 写道: >Hi: > >使用Flink SQL撤回流写入MySQL,表的auto_increment > > 越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。 >
Re:Re: Re: Flink SQL撤回流问题
Hi Ran: 非常感谢,我试了insert into ON DUPLICATE KEY UPDATE dt=“dt"的方式,确实是会出现update的始终是id=1,但是auto_increment 却一直增加的情况。感觉这样不是很合理,因为随着数据量的增加,迟早会出现数值越界的情形。 原始邮件 发件人: Michael Ran 收件人: user-zh 发送时间: 2020年9月27日(周日) 14:37 主题: Re:Re: Re: Flink SQL撤回流问题 没有传入id,始终是1 ? 那就是第一次insert update 之后,生成的1.后面都是insert into table(dt,num) values(dt,新数量) ON DUPLICATE KEY UPDATE dt=values(dt)你模拟下这个语句呢,看看id成为1 之后,是不是就不变了 在 2020-09-27 14:32:57,"xiao cai" 写道: >Hi lec ssmi: > insert的时候没有指定id,只指定了dt和cnt,因为id在mysql的sink表里是自增的主键,所以flink sql里并没有指定。 > 原始邮件 >发件人: lec ssmi >收件人: flink-user-cn >发送时间: 2020年9月27日(周日) 14:25 >主题: Re: Re: Flink SQL撤回流问题 > > >你那个主键=1,应该是传进去的,不是mysql自动生成的吧。所以auto_increment才会一直增长啊。 kandy.wang 于2020年9月27日周日 下午2:01写道: > > > > > > > hi > 你建mysql要指定主键,另外创建flink表时也要指定一下主键 > > PRIMARY KEY (id) NOT ENFORCED,这样就会根据主键upsert了 > > > > > > > > > 在 2020-09-27 13:36:25,"xiao cai" 写道: > > >如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。 > >这是我很困惑的地方。 > > > > > > 原始邮件 > >发件人: lec ssmi > >收件人: flink-user-cn > >发送时间: 2020年9月27日(周日) 13:06 > >主题: Re: Flink SQL撤回流问题 > > > > > >是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。 xiao cai < > flin...@163.com> 于2020年9月27日周日 下午12:08写道: > 场景如下: > source table: kafka > > sink table: mysql schem(id, dt, cnt) > > > insert : > insert into sink > > select dt,count(distinct id) from source group by dt; > > > > 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show > create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > > 发件人: Michael Ran > 收件人: user-zh< > user-zh@flink.apache.org> > 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink > SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai" > 写道: >Hi: > >使用Flink SQL撤回流写入MySQL,表的auto_increment > > 越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。 >
Re: Re: Flink SQL撤回流问题
Hi lec ssmi: insert的时候没有指定id,只指定了dt和cnt,因为id在mysql的sink表里是自增的主键,所以flink sql里并没有指定。 原始邮件 发件人: lec ssmi 收件人: flink-user-cn 发送时间: 2020年9月27日(周日) 14:25 主题: Re: Re: Flink SQL撤回流问题 你那个主键=1,应该是传进去的,不是mysql自动生成的吧。所以auto_increment才会一直增长啊。 kandy.wang 于2020年9月27日周日 下午2:01写道: > > > > > > > hi > 你建mysql要指定主键,另外创建flink表时也要指定一下主键 > > PRIMARY KEY (id) NOT ENFORCED,这样就会根据主键upsert了 > > > > > > > > > 在 2020-09-27 13:36:25,"xiao cai" 写道: > > >如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。 > >这是我很困惑的地方。 > > > > > > 原始邮件 > >发件人: lec ssmi > >收件人: flink-user-cn > >发送时间: 2020年9月27日(周日) 13:06 > >主题: Re: Flink SQL撤回流问题 > > > > > >是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。 xiao cai < > flin...@163.com> 于2020年9月27日周日 下午12:08写道: > 场景如下: > source table: kafka > > sink table: mysql schem(id, dt, cnt) > > > insert : > insert into sink > > select dt,count(distinct id) from source group by dt; > > > > 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show > create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > > 发件人: Michael Ran > 收件人: user-zh< > user-zh@flink.apache.org> > 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink > SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai" > 写道: >Hi: > >使用Flink SQL撤回流写入MySQL,表的auto_increment > > 越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。 >
Re:Re: Flink SQL撤回流问题
Hi kandy.wang: 忘记说明,我指定了dt为primary key,按理说会按照dt做update,但是为何auto_increment会不断的变大呢,而id也没有变化,id字段值始终为1。还望解惑。 原始邮件 发件人: kandy.wang 收件人: user-zh 发送时间: 2020年9月27日(周日) 14:01 主题: Re:Re: Flink SQL撤回流问题 hi 你建mysql要指定主键,另外创建flink表时也要指定一下主键 PRIMARY KEY (id) NOT ENFORCED,这样就会根据主键upsert了 在 2020-09-27 13:36:25,"xiao cai" 写道: >如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。 >这是我很困惑的地方。 > > > 原始邮件 >发件人: lec ssmi >收件人: flink-user-cn >发送时间: 2020年9月27日(周日) 13:06 >主题: Re: Flink SQL撤回流问题 > > >是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。 xiao cai 于2020年9月27日周日 下午12:08写道: > 场景如下: > source table: kafka > sink table: mysql schem(id, dt, cnt) > > > insert : > insert into sink > select dt,count(distinct id) from source group by dt; > > > 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > 发件人: Michael Ran > 收件人: user-zh > 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai" 写道: >Hi: > >使用Flink SQL撤回流写入MySQL,表的auto_increment > 越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。
Re: Flink SQL撤回流问题
如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。 这是我很困惑的地方。 原始邮件 发件人: lec ssmi 收件人: flink-user-cn 发送时间: 2020年9月27日(周日) 13:06 主题: Re: Flink SQL撤回流问题 是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。 xiao cai 于2020年9月27日周日 下午12:08写道: > 场景如下: > source table: kafka > sink table: mysql schem(id, dt, cnt) > > > insert : > insert into sink > select dt,count(distinct id) from source group by dt; > > > 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > 发件人: Michael Ran > 收件人: user-zh > 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai" 写道: >Hi: > >使用Flink SQL撤回流写入MySQL,表的auto_increment > 越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。
Re:Flink SQL撤回流问题
场景如下: source table: kafka sink table: mysql schem(id, dt, cnt) insert : insert into sink select dt,count(distinct id) from source group by dt; 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 show create table sink可以发现auto_increment在不断的变大。 当超过id的取值范围,就会报错了。 原始邮件 发件人: Michael Ran 收件人: user-zh 发送时间: 2020年9月27日(周日) 11:51 主题: Re:Flink SQL撤回流问题 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai" 写道: >Hi: >使用Flink SQL撤回流写入MySQL,表的auto_increment 越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。
Flink SQL撤回流问题
Hi: 使用Flink SQL撤回流写入MySQL,表的auto_increment 越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。
Re: 怎么样在Flink中使用java代码提交job到yarn
Hi zilong: 这种方式我考虑过,个人认为平台层面如果有业务逻辑的侵入,会影响后续的升级。所以我们是在标注输出中正则匹配出jobId和applicationId。你了解YarnClusterDescripto吗?之前社区看到有人用这个提交的。 原始邮件 发件人: zilong xiao 收件人: user-zh 发送时间: 2020年9月25日(周五) 17:12 主题: Re: 怎么样在Flink中使用java代码提交job到yarn 你提交的任务是可以指定job name的呀,你的job name和你的业务主键绑定就可以做到唯一了,然后根据这个关系查询即可,没记错-ynm 是指定job name的 xiao cai 于2020年9月25日周五 下午5:01写道: > hi zilong: > 通过process提交任务以后,通过rest > api,如何知道哪一个是我提交的呢?如果这时有多个请求过来同时都提交了任务,怎么知道rest返回的application应该与哪一次提交对应呢? > > > 原始邮件 > 发件人: zilong xiao > 收件人: user-zh > 发送时间: 2020年9月25日(周五) 16:55 > 主题: Re: 怎么样在Flink中使用java代码提交job到yarn > > > 我知道呀,你不是从输出里抠的嘛?你想要更优雅的方式可以用rest api啊,这些信息都是可以拿到的 xiao cai < > flin...@163.com> 于2020年9月25日周五 下午4:53写道: > hi zilong: > > 你说的这些信息我目前都能拿到,已经实现了。只是实现的方式很不优雅,而且提交的速度很慢。 > > > 原始邮件 > 发件人: zilong xiao< > acidzz...@gmail.com> > 收件人: user-zh > 发送时间: > 2020年9月25日(周五) 16:48 > 主题: Re: 怎么样在Flink中使用java代码提交job到yarn > > > > JobId是说Flink的JobId还是yarn上面的application ID呢?containerID可以通过yarn rest api拿到呀 > > xiao cai 于2020年9月25日周五 下午4:43写道: > > > > 使用process没办法拿到当前提交任务的JobId和提交到Yarn上的对应的containerId,自能从标准输出中使用正则解析出来。我们目前使用的就是这种方式,已经实现了。但是这样很不优雅,而且提交的速度也会比较慢。 > > > > > 原始邮件 > 发件人: zilong xiao > 收件人: user-zh< > > user-zh@flink.apache.org> > 发送时间: 2020年9月25日(周五) 16:32 > 主题: Re: > > 怎么样在Flink中使用java代码提交job到yarn > > > > > Java程序用process调用脚本提交任务没啥问题吧,获取jobId&containerId的问题,我理解可以用yarn rest api & > > > flink rest api来完成,希望对你有帮助,祝好~ xiao cai 于2020年9月25日周五 > > > 下午4:23写道: > Hi all: > 大家好,我目前遇到一个flink 任务提交方面的困扰: > > > > 想要在自己的项目中(springboot)提交flink > > > > job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包 > > > > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢 > > > > > > best, > xiao
Re: 怎么样在Flink中使用java代码提交job到yarn
hi zilong: 通过process提交任务以后,通过rest api,如何知道哪一个是我提交的呢?如果这时有多个请求过来同时都提交了任务,怎么知道rest返回的application应该与哪一次提交对应呢? 原始邮件 发件人: zilong xiao 收件人: user-zh 发送时间: 2020年9月25日(周五) 16:55 主题: Re: 怎么样在Flink中使用java代码提交job到yarn 我知道呀,你不是从输出里抠的嘛?你想要更优雅的方式可以用rest api啊,这些信息都是可以拿到的 xiao cai 于2020年9月25日周五 下午4:53写道: > hi zilong: > 你说的这些信息我目前都能拿到,已经实现了。只是实现的方式很不优雅,而且提交的速度很慢。 > > > 原始邮件 > 发件人: zilong xiao > 收件人: user-zh > 发送时间: 2020年9月25日(周五) 16:48 > 主题: Re: 怎么样在Flink中使用java代码提交job到yarn > > > JobId是说Flink的JobId还是yarn上面的application ID呢?containerID可以通过yarn rest api拿到呀 > xiao cai 于2020年9月25日周五 下午4:43写道: > > > 使用process没办法拿到当前提交任务的JobId和提交到Yarn上的对应的containerId,自能从标准输出中使用正则解析出来。我们目前使用的就是这种方式,已经实现了。但是这样很不优雅,而且提交的速度也会比较慢。 > > > > 原始邮件 > 发件人: zilong xiao > 收件人: user-zh< > user-zh@flink.apache.org> > 发送时间: 2020年9月25日(周五) 16:32 > 主题: Re: > 怎么样在Flink中使用java代码提交job到yarn > > > > Java程序用process调用脚本提交任务没啥问题吧,获取jobId&containerId的问题,我理解可以用yarn rest api & > > flink rest api来完成,希望对你有帮助,祝好~ xiao cai 于2020年9月25日周五 > > 下午4:23写道: > Hi all: > 大家好,我目前遇到一个flink 任务提交方面的困扰: > > > 想要在自己的项目中(springboot)提交flink > > > job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包 > > > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢 > > > > > best, > xiao
Re: 怎么样在Flink中使用java代码提交job到yarn
hi zilong: 你说的这些信息我目前都能拿到,已经实现了。只是实现的方式很不优雅,而且提交的速度很慢。 原始邮件 发件人: zilong xiao 收件人: user-zh 发送时间: 2020年9月25日(周五) 16:48 主题: Re: 怎么样在Flink中使用java代码提交job到yarn JobId是说Flink的JobId还是yarn上面的application ID呢?containerID可以通过yarn rest api拿到呀 xiao cai 于2020年9月25日周五 下午4:43写道: > > 使用process没办法拿到当前提交任务的JobId和提交到Yarn上的对应的containerId,自能从标准输出中使用正则解析出来。我们目前使用的就是这种方式,已经实现了。但是这样很不优雅,而且提交的速度也会比较慢。 > > > 原始邮件 > 发件人: zilong xiao > 收件人: user-zh > 发送时间: 2020年9月25日(周五) 16:32 > 主题: Re: 怎么样在Flink中使用java代码提交job到yarn > > > Java程序用process调用脚本提交任务没啥问题吧,获取jobId&containerId的问题,我理解可以用yarn rest api & > flink rest api来完成,希望对你有帮助,祝好~ xiao cai 于2020年9月25日周五 > 下午4:23写道: > Hi all: > 大家好,我目前遇到一个flink 任务提交方面的困扰: > > 想要在自己的项目中(springboot)提交flink > > job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包 > > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢 > > > > best, > xiao
Re: 怎么样在Flink中使用java代码提交job到yarn
使用process没办法拿到当前提交任务的JobId和提交到Yarn上的对应的containerId,自能从标准输出中使用正则解析出来。我们目前使用的就是这种方式,已经实现了。但是这样很不优雅,而且提交的速度也会比较慢。 原始邮件 发件人: zilong xiao 收件人: user-zh 发送时间: 2020年9月25日(周五) 16:32 主题: Re: 怎么样在Flink中使用java代码提交job到yarn Java程序用process调用脚本提交任务没啥问题吧,获取jobId&containerId的问题,我理解可以用yarn rest api & flink rest api来完成,希望对你有帮助,祝好~ xiao cai 于2020年9月25日周五 下午4:23写道: > Hi all: > 大家好,我目前遇到一个flink 任务提交方面的困扰: > 想要在自己的项目中(springboot)提交flink > job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包 > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢 > > > best, > xiao
怎么样在Flink中使用java代码提交job到yarn
Hi all: 大家好,我目前遇到一个flink 任务提交方面的困扰: 想要在自己的项目中(springboot)提交flink job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 非常感谢 best, xiao
Re: 如何在启动taskmanager时传入自定义的java参数
学习了,非常感谢~ 原始邮件 发件人: wch...@163.com 收件人: user-zh@flink.apache.org 发送时间: 2020年9月15日(周二) 19:18 主题: Re: 如何在启动taskmanager时传入自定义的java参数 官网有相关配置 https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jvm-and-logging-options taskmanager自定义参数应该是使用 env.java.opts.taskmanager 下面是我的部分启动启动参数: /data1/flink-1.10.0/bin/flink run -m yarn-cluster -ynm smartStareJob -yjm 2048 -ytm 4096 -ys 2 -p 6 \ -yD env.java.opts="-Dxdiamond.server.host=daily.inzwc.com -Dxdiamond.project.profile=daily" \ -yD zookeeper.sasl.disable=true \ -yD taskmanager.exit-on-fatal-akka-erro=ture \ -yD taskmanager.network.netty.client.numThreads=2 \ -yD taskmanager.network.netty.server.numThreads=2 \ -c com.hstong.fintech.cep.main.SmartStareJob /data0/www/quant-cep.jar \ --profile daily --channalName smartStareChannal wch...@163.com 发件人: xiao cai 发送时间: 2020-09-15 17:46 收件人: user-zh 主题: 如何在启动taskmanager时传入自定义的java参数 Hi: 我修改了flink的一些源码,需要通过外部-Dkey=value的形式动态将值传入,试了下无法直接通过bin/flink run 后加-D的方式来添加,有什么好的办法吗?
Re: 如何在启动taskmanager时传入自定义的java参数
解决了我的问题,非常感谢。 原始邮件 发件人: zilong xiao 收件人: user-zh 发送时间: 2020年9月15日(周二) 18:23 主题: Re: 如何在启动taskmanager时传入自定义的java参数 可以在flink-conf.yaml里设置,例如: env.java.opts: -Djob.name={{job_name}} xiao cai 于2020年9月15日周二 下午5:46写道: > Hi: > 我修改了flink的一些源码,需要通过外部-Dkey=value的形式动态将值传入,试了下无法直接通过bin/flink run > 后加-D的方式来添加,有什么好的办法吗?
如何在启动taskmanager时传入自定义的java参数
Hi: 我修改了flink的一些源码,需要通过外部-Dkey=value的形式动态将值传入,试了下无法直接通过bin/flink run 后加-D的方式来添加,有什么好的办法吗?
FlinkSQL如何处理上游的表结构变更
Hi all: flink version : 1.11.0 场景:上游的数据来自binlog,当发生表结构变更时,希望能够实时的变动flink内部表的schema,但是目前来看,表的schema都是create table时写死的,有什么办法可以处理这种场景呢
Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据
Hi shizk233: 我这边也复现了你说的情况,一模一样。 可以尝试使用定时调度任务检查flink任务的执行情况,当不再处于运行状态时,主动调用pushgateway的delete方法来删除pushgetway的metrics。 原始邮件 发件人: shizk233 收件人: user-zh@flink.apache.org 发送时间: 2020年9月1日(周二) 19:10 主题: Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据 Hi Xiao, 我这边实践过程中发现,该参数只能删除jobmanager对应的metrics group,不能删除tm的。 我们开启了randomJobNameSuffix,该参数会让JM和TM的metrics信息分属不同metrics group。 感觉这可能是一个bug? xiao cai 于2020年9月1日周二 下午4:57写道: > Hi: > 可以试试在flink-conf.yaml中添加: > metrics.reporter.promgateway.deleteOnShutdown: true > > > Best, > Xiao > 原始邮件 > 发件人: bradyMk > 收件人: user-zh > 发送时间: 2020年9月1日(周二) 16:50 > 主题: Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据 > > > 您好,我不是很懂您的意思,例如我这边有一个这样的指标:flink_jobmanager_job_uptime 监控一个任务的运行时长; > 如果该任务被kill掉,那么这个指标的数值会变成一个不变的量,一直显示在grafana中。我不太会promeQL,我尝试这样: > flink_jobmanager_job_uptime[1m],这样是个非法查询命令,按照您的意思,应该怎么改呢? - Best Wishes > -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复:关于FlinkSQL的窗口和触发
Hi: 可以试试增加如下配置: table.exec.emit.early-fire.enabled = true table.exec.emit.early-fire.delay = 1 Best, Xiao 原始邮件 发件人: Cayden chen<1193216...@qq.com> 收件人: user-zh 发送时间: 2020年9月1日(周二) 17:10 主题: 回复:关于FlinkSQL的窗口和触发 hi,目前不支持。sql语义只支持窗口结束触发计算 -- 原始邮件 -- 发件人: "user-zh"
Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据
Hi: 可以试试在flink-conf.yaml中添加: metrics.reporter.promgateway.deleteOnShutdown: true Best, Xiao 原始邮件 发件人: bradyMk 收件人: user-zh 发送时间: 2020年9月1日(周二) 16:50 主题: Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据 您好,我不是很懂您的意思,例如我这边有一个这样的指标:flink_jobmanager_job_uptime 监控一个任务的运行时长; 如果该任务被kill掉,那么这个指标的数值会变成一个不变的量,一直显示在grafana中。我不太会promeQL,我尝试这样: flink_jobmanager_job_uptime[1m],这样是个非法查询命令,按照您的意思,应该怎么改呢? - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件
Hi 确实可以稳定复现,failover后就会出现找不到lib包中的jar文件里的class文件,只能重启。不过我是cli模式启动的on-yarn,没有试过per-job和application,计划这两天尝试下application指定jar包地址到hdfs上,看是否能够复现。 Best, xiao cai 原始邮件 发件人: Congxian Qiu 收件人: user-zh 发送时间: 2020年8月24日(周一) 20:39 主题: Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 Hi 理论上第一次能启动,后续的 failover 也应该是可以正常恢复的。你这边是稳定复现吗?如果能够稳定复现的话,有可能是 bug Best, Congxian xiao cai 于2020年8月20日周四 下午2:27写道: > Hi: > 感谢答复,确实是个思路。 > > 不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。 > > > Best, > xiao cai > > > 原始邮件 > 发件人: 范超 > 收件人: user-zh@flink.apache.org > 发送时间: 2020年8月20日(周四) 09:11 > 主题: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 > > > 我之前开启job的failover > restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task > executor No TaskExecutor registered under containe_. > 我这边干脆写了个脚本通过savepoint的方式来reload应用了 希望对你有帮助 -邮件原件- 发件人: xiao cai > [mailto:flin...@163.com] 发送时间: 2020年8月19日 星期三 12:50 收件人: user-zh < > user-zh@flink.apache.org> 主题: Flink on Yarn > 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 如题:link on Yarn > 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我的任务时kafka source -> hbase sink > 任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。 > Best xiao cai 错误堆栈: 2020-08-19 11:23:08,099 INFO > org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers. > 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] - > Received 1 containers with resource , 1 pending > container requests. 2020-08-19 11:23:08,100 INFO > org.apache.flink.yarn.YarnResourceManager [] - TaskExecutor > container_e07_1596440446172_0094_01_69 will be started on 10.3.15.22 > with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb > (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), > taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, > networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb > (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), > jvmOverheadSize=192.000mb (201326592 bytes)}. 2020-08-19 11:23:08,101 INFO > org.apache.flink.yarn.YarnResourceManager [] - Creating container launch > context for TaskManagers 2020-08-19 11:23:08,101 INFO > org.apache.flink.yarn.YarnResourceManager [] - Starting TaskManagers > 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - > Removing container request Capability[]Priority[1]. > 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - > Accepted 1 requested containers, returned 0 excess containers, 0 pending > container requests of resource . 2020-08-19 > 11:23:08,102 INFO > org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - > Processing Event EventType: START_CONTAINER for Container > container_e07_1596440446172_0094_01_69 2020-08-19 11:23:10,851 ERROR > org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler > [] - Unhandled exception. > org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: > No TaskExecutor registered under > container_e07_1596440446172_0094_01_68. at > org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498) > ~[?:1.8.0_191] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > ~[flink-dist_2.1
DDL中声明主键会报类型不匹配
Hi: flink版本1.11.0 connector为kafka DDL中声明某个字段为primary key时,会报类型不匹配,去掉primary key constraint就可以正常执行。 把shop_id设置为 varchar not null也不行。 org.apache.flink.table.api.ValidationException: Type STRING NOT NULL of table field 'shop_id' does not match with the physical type STRING of the 'shop_id' field of the TableSource return type. SQL如下: create table source_0 ( `shop_id` varchar, `user_id` bigint, `category_id` int, `ts` bigint, `proc_time` as PROCTIME(), `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, '-MM-dd HH:mm:ss')), watermark for event_time AS event_time, PRIMARY KEY (shop_id, user_id) NOT ENFORCED ) with ( 'connector.type' = 'kafka', )
回复:答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件
Hi: 感谢答复,确实是个思路。 不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。 Best, xiao cai 原始邮件 发件人: 范超 收件人: user-zh@flink.apache.org 发送时间: 2020年8月20日(周四) 09:11 主题: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我之前开启job的failover restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task executor No TaskExecutor registered under containe_. 我这边干脆写了个脚本通过savepoint的方式来reload应用了 希望对你有帮助 -邮件原件- 发件人: xiao cai [mailto:flin...@163.com] 发送时间: 2020年8月19日 星期三 12:50 收件人: user-zh 主题: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 如题:link on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我的任务时kafka source -> hbase sink 任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。 Best xiao cai 错误堆栈: 2020-08-19 11:23:08,099 INFO org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers. 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers with resource , 1 pending container requests. 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] - TaskExecutor container_e07_1596440446172_0094_01_69 will be started on 10.3.15.22 with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=192.000mb (201326592 bytes)}. 2020-08-19 11:23:08,101 INFO org.apache.flink.yarn.YarnResourceManager [] - Creating container launch context for TaskManagers 2020-08-19 11:23:08,101 INFO org.apache.flink.yarn.YarnResourceManager [] - Starting TaskManagers 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - Removing container request Capability[]Priority[1]. 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - Accepted 1 requested containers, returned 0 excess containers, 0 pending container requests of resource . 2020-08-19 11:23:08,102 INFO org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Processing Event EventType: START_CONTAINER for Container container_e07_1596440446172_0094_01_69 2020-08-19 11:23:10,851 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler [] - Unhandled exception. org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: No TaskExecutor registered under container_e07_1596440446172_0094_01_68. at org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.11-1.11.0.jar:1.11.0]
回复: flinkSQL eventtime问题
Hi: create table kafka ( nested_field Row(event_time timestamp(3), other_field string), watermark for nested_field.event_time as {watermark_definition} ) with ( ‘connector' = ‘kafka' ) 看看这样能够使用 Best, xiao cai 原始邮件 发件人: ★猛★ 收件人: user-zh 发送时间: 2020年8月19日(周三) 17:48 主题: 回复: flinkSQL eventtime问题 我是通过kafka直接注册tablesource ,数据是avro的。 descriptor.inAppendMode().registerTableSource(source_table_name); 我们在想怎么把avro里嵌套的某个字段用作eventime -- 原始邮件 -- 发件人: "user-zh"
Re: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件
Hi Flink版本是 1.11.0 期望的行为是将kafka中的数据实时写入hbase表 xxx这个class是在lib下的某个jar中的 任务初始运行都是正常的,jar包也是可以找到的,运行期间失败了,然后进入了restarting状态,就不停的在running和restarting状态切换 我提交任务的节点是20,然后container运行的节点是22,lib中的jar都在20节点上,所以猜测是任务运行过程中,重新分配新的container时 丢失了lib中jar资源。 Best, xiao cai 原始邮件 发件人: Congxian Qiu 收件人: user-zh 发送时间: 2020年8月19日(周三) 13:34 主题: Re: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 Hi 你的 Flink 是哪个版本,期望的行为是什么样的? 从你给的日志看,是没有 这个 class,这个 是在你放到 lib 下的某个 jar 包里面吗?另外你这个作业第一次运行的时候失败,还是中间中间 failover 之后恢复回来的时候失败呢? Best, Congxian xiao cai 于2020年8月19日周三 下午12:50写道: > 如题:link on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 > > > 我的任务时kafka source -> hbase sink > > > > 任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。 > > > Best > xiao cai > > > 错误堆栈: > 2020-08-19 11:23:08,099 INFO org.apache.flink.yarn.YarnResourceManager > [] - Received 1 containers. > 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager > [] - Received 1 containers with resource vCores:4>, 1 pending container requests. > 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager > [] - TaskExecutor > container_e07_1596440446172_0094_01_69 will be started on 10.3.15.22 > with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb > (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), > taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, > networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb > (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), > jvmOverheadSize=192.000mb (201326592 bytes)}. > 2020-08-19 11:23:08,101 INFO org.apache.flink.yarn.YarnResourceManager > [] - Creating container launch context for TaskManagers > 2020-08-19 11:23:08,101 INFO org.apache.flink.yarn.YarnResourceManager > [] - Starting TaskManagers > 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager > [] - Removing container request Capability[ vCores:4>]Priority[1]. > 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager > [] - Accepted 1 requested containers, returned 0 excess > containers, 0 pending container requests of resource vCores:4>. > 2020-08-19 11:23:08,102 INFO > org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - > Processing Event EventType: START_CONTAINER for Container > container_e07_1596440446172_0094_01_69 > 2020-08-19 11:23:10,851 ERROR > org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler > [] - Unhandled exception. > org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: > No TaskExecutor registered under container_e07_1596440446172_0094_01_68. > at > org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_191] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at akka.actor.Act
Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件
如题:link on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我的任务时kafka source -> hbase sink 任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。 Best xiao cai 错误堆栈: 2020-08-19 11:23:08,099 INFO org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers. 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers with resource , 1 pending container requests. 2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] - TaskExecutor container_e07_1596440446172_0094_01_69 will be started on 10.3.15.22 with TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb (536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=192.000mb (201326592 bytes)}. 2020-08-19 11:23:08,101 INFO org.apache.flink.yarn.YarnResourceManager [] - Creating container launch context for TaskManagers 2020-08-19 11:23:08,101 INFO org.apache.flink.yarn.YarnResourceManager [] - Starting TaskManagers 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - Removing container request Capability[]Priority[1]. 2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - Accepted 1 requested containers, returned 0 excess containers, 0 pending container requests of resource . 2020-08-19 11:23:08,102 INFO org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Processing Event EventType: START_CONTAINER for Container container_e07_1596440446172_0094_01_69 2020-08-19 11:23:10,851 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler [] - Unhandled exception. org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException: No TaskExecutor registered under container_e07_1596440446172_0094_01_68. at org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.Actor$class.aroundReceive(Actor.scala:517) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.actor.ActorCell.invoke(ActorCell.scala:561) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.Mailbox.run(Mailbox.scala:225) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ~[flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [flink-dist_2.11-1.11.0.jar:1.11.0] at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [flink-dist_2.11-1.11.0.jar:1.11.0] 2020-08-19 11:23:10,
Re: Print SQL connector无法正常使用
Hi china_tao: 你好,HBase肯定没有问题的,请问你可以正常使用print connector吗,能否让我看看正确的使用姿势,感谢 原始邮件 发件人: china_tao 收件人: user-zh 发送时间: 2020年8月17日(周一) 23:00 主题: Re: Print SQL connector无法正常使用 String createHbaseSql = CREATE TABLE dimension ( rowKey STRING, cf ROW, tas BIGINT ) WITH ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = ’test', 'connector.write.buffer-flush.max-rows' = '10', 'connector.zookeeper.quorum' = ‘IP:port', 'connector.zookeeper.znode.parent' = '/hbase', ); tableEnv.executeSql(createHbaseSql); Table queryTable = tableEnv.sqlQuery("select * from dimension"); tableEnv.toAppendStream(queryTable, Row.class).print(); 你先用这种方式,看看能不能打印出来,证明你hbase没有问题。然后在用print_table。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Print SQL connector无法正常使用
Hi All: 目前使用flink sql的Print SQL connector,想要将查询的结果打印出来,结果报错: Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in the classpath. 可以保证:HBase-connector是在lib包下存在的,是否我还需要在lib下添加什么依赖? 下面为执行的sql: CREATE TABLE dimension ( rowKey STRING, cf ROW, tas BIGINT ) WITH ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = ’test', 'connector.write.buffer-flush.max-rows' = '10', 'connector.zookeeper.quorum' = ‘IP:port', 'connector.zookeeper.znode.parent' = '/hbase', ); CREATE TABLE print_table ( f0 STRING, f1 INT, f2 BIGINT, f3 BIGINT ) WITH ( 'connector' = 'print' ); insert into print_table select rowKey, cf.age, cf.area, tas from dimension
答复: HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys
Hi Jark: 感谢回答,我发现是我join的时候,是想将hbase作为维表使用的,但是我遗漏了for system_time as of语句,添加后就不会再报这个错了。 另外有个问题想请教:1.11中新版hbase connector只是指with中指定version为1.4所创建的表吗,我发现使用1.4.3的版本,也是可以正常使用的。是不是说明pk在1.4和1.4.3两个版本上都是生效的? 再次感谢。 Best Xiao Cai 发送自 Windows 10 版邮件应用 发件人: Jark Wu 发送时间: 2020年8月14日 23:23 收件人: user-zh 主题: Re: HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys PK 的问题在1.11 已经解决了,你可以用下1.11 提供的新版 hbase connector,可以在 DDL 上指定 PK,所以 query 推导不出 PK 也不会报错了。 see more: https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html Best, Jark On Thu, 13 Aug 2020 at 14:27, xiao cai wrote: > Hi All: > 使用flink-sql写入hbase sink时报错: > UpsertStreamTableSink requires that Table has a full primary keys if it is > updated. > > > 我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表 > kafka source表与hbase 维表left join后的结果insert到hbase sink表中: > sql如下: > create table user_click_source( > `id` bigint, > `name` varchar, > `kafka_partition` int, > `event_time` bigint, > `write_time` bigint, > `snapshot_time` bigint, > `max_snapshot_time` bigint, > `catalog_id` int, > `device_id` int, > `user_id` int, > `proc_time` timestamp(3) > PRIMARY KEY (id) NOT ENFORCED > )with( > 'connector.type' = 'kafka', > …… > ) > ; > create table dim_user( > `rowkey` varchar, > cf ROW< > `id` int, > `name` varchar, > `kafka_partition` int, > `event_time` bigint, > `write_time` bigint, > `snapshot_time` bigint, > `max_snapshot_time` bigint > >, > ts bigint > )with( > 'connector.type'='hbase', > …… > ) > ; > > > create table dim_device( > `rowkey` varchar, > cf ROW< > `id` int, > `name` varchar, > `kafka_partition` int, > `event_time` bigint, > `write_time` bigint, > `snapshot_time` bigint, > `max_snapshot_time` bigint > > > )with( > 'connector.type'='hbase', > …… > ) > ; > > > create table dim_catalog( > `rowkey` varchar, > cf ROW< > `id` int, > `name` varchar, > `kafka_partition` int, > `event_time` bigint, > `write_time` bigint, > `snapshot_time` bigint, > `max_snapshot_time` bigint > > > )with( > 'connector.type'='hbase', > …… > ) > ; > create table hbase_full_user_click_case1_sink( > `rowkey` bigint, > cf ROW< > `click_id` bigint, > `click_name` varchar, > `click_partition` int, > `click_event_time` bigint, > `click_write_time` bigint, > `click_snapshot_time` bigint, > `click_max_snapshot_time` bigint, > `catalog_id` int, > `catalog_name` varchar, > `catalog_partition` int, > `catalog_event_time` bigint, > `catalog_write_time` bigint, > `catalog_snapshot_time` bigint, > `catalog_max_snapshot_time` bigint, > `device_id` int, > `device_name` varchar, > `device_partition` int, > `device_event_time` bigint, > `device_write_time` bigint, > `device_snapshot_time` bigint, > `device_max_snapshot_time` bigint, > `user_id` int, > `user_name` varchar, > `user_partition` int, > `user_event_time` bigint, > `user_write_time` bigint, > `user_snapshot_time` bigint, > `user_max_snapshot_time` bigint > >, > PRIMARY KEY (rowkey) NOT ENFORCED > )with( > 'connector.type'='hbase', > …… > ) > ; > insert into hbase_full_user_click_case1_sink > select > `click_id`, > ROW( > `click_id`, > `click_name`, > `click_partition`, > `click_event_time`, > `click_write_time`, > `click_snapshot_time`, > `click_max_snapshot_time`, > `catalog_id`, > `catalog_name`, > `catalog_partition`, > `catalog_event_time`, > `catalog_write_time`, > `catalog_snapshot_time`, > `catalog_max_snapshot_time`, > `device_id`, > `device_name`, > `device_partition`, > `device_event_time`, > `device_write_time`, > `device_snapshot_time`, > `device_max_snapshot_time`, > `user_id`, > `user_name`, > `user_partition`, > `user_event_time`, > `user_write_time`, > `user_snapshot_time`, > `user_max_snapshot_time` > ) > from (select > click.id as `click_id`, > click.name as `click_name`, > click.kafka_partition as `click_partition`, > click.event_time as `click_event_time`, > click.write_time as `click_write_time`, > click.snapshot_time as `click_snapshot_time`, > click.max_snapshot_time as `click_max_snapshot_time`, > cat.cf.id as `catalog_id`, > cat.cf.name as `catalog_name`, > cat.cf.kafka_partition as `catalog_partition`, > cat.cf.event_time as `catalog_event_time`, > cat.cf.write_time as `catalog_write_time`, > cat.cf.snapshot_time as `catalog_snapshot_time`, > cat.cf.max_snapshot_time as `catalog_max_snapshot_time`, > dev.cf.id as `device_
HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys
Hi All: 使用flink-sql写入hbase sink时报错: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. 我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表 kafka source表与hbase 维表left join后的结果insert到hbase sink表中: sql如下: create table user_click_source( `id` bigint, `name` varchar, `kafka_partition` int, `event_time` bigint, `write_time` bigint, `snapshot_time` bigint, `max_snapshot_time` bigint, `catalog_id` int, `device_id` int, `user_id` int, `proc_time` timestamp(3) PRIMARY KEY (id) NOT ENFORCED )with( 'connector.type' = 'kafka', …… ) ; create table dim_user( `rowkey` varchar, cf ROW< `id` int, `name` varchar, `kafka_partition` int, `event_time` bigint, `write_time` bigint, `snapshot_time` bigint, `max_snapshot_time` bigint >, ts bigint )with( 'connector.type'='hbase', …… ) ; create table dim_device( `rowkey` varchar, cf ROW< `id` int, `name` varchar, `kafka_partition` int, `event_time` bigint, `write_time` bigint, `snapshot_time` bigint, `max_snapshot_time` bigint > )with( 'connector.type'='hbase', …… ) ; create table dim_catalog( `rowkey` varchar, cf ROW< `id` int, `name` varchar, `kafka_partition` int, `event_time` bigint, `write_time` bigint, `snapshot_time` bigint, `max_snapshot_time` bigint > )with( 'connector.type'='hbase', …… ) ; create table hbase_full_user_click_case1_sink( `rowkey` bigint, cf ROW< `click_id` bigint, `click_name` varchar, `click_partition` int, `click_event_time` bigint, `click_write_time` bigint, `click_snapshot_time` bigint, `click_max_snapshot_time` bigint, `catalog_id` int, `catalog_name` varchar, `catalog_partition` int, `catalog_event_time` bigint, `catalog_write_time` bigint, `catalog_snapshot_time` bigint, `catalog_max_snapshot_time` bigint, `device_id` int, `device_name` varchar, `device_partition` int, `device_event_time` bigint, `device_write_time` bigint, `device_snapshot_time` bigint, `device_max_snapshot_time` bigint, `user_id` int, `user_name` varchar, `user_partition` int, `user_event_time` bigint, `user_write_time` bigint, `user_snapshot_time` bigint, `user_max_snapshot_time` bigint >, PRIMARY KEY (rowkey) NOT ENFORCED )with( 'connector.type'='hbase', …… ) ; insert into hbase_full_user_click_case1_sink select `click_id`, ROW( `click_id`, `click_name`, `click_partition`, `click_event_time`, `click_write_time`, `click_snapshot_time`, `click_max_snapshot_time`, `catalog_id`, `catalog_name`, `catalog_partition`, `catalog_event_time`, `catalog_write_time`, `catalog_snapshot_time`, `catalog_max_snapshot_time`, `device_id`, `device_name`, `device_partition`, `device_event_time`, `device_write_time`, `device_snapshot_time`, `device_max_snapshot_time`, `user_id`, `user_name`, `user_partition`, `user_event_time`, `user_write_time`, `user_snapshot_time`, `user_max_snapshot_time` ) from (select click.id as `click_id`, click.name as `click_name`, click.kafka_partition as `click_partition`, click.event_time as `click_event_time`, click.write_time as `click_write_time`, click.snapshot_time as `click_snapshot_time`, click.max_snapshot_time as `click_max_snapshot_time`, cat.cf.id as `catalog_id`, cat.cf.name as `catalog_name`, cat.cf.kafka_partition as `catalog_partition`, cat.cf.event_time as `catalog_event_time`, cat.cf.write_time as `catalog_write_time`, cat.cf.snapshot_time as `catalog_snapshot_time`, cat.cf.max_snapshot_time as `catalog_max_snapshot_time`, dev.cf.id as `device_id`, dev.cf.name as `device_name`, dev.cf.kafka_partition as `device_partition`, dev.cf.event_time as `device_event_time`, dev.cf.write_time as `device_write_time`, dev.cf.snapshot_time as `device_snapshot_time`, dev.cf.max_snapshot_time as `device_max_snapshot_time`, u.cf.id as `user_id`, u.cf.name as `user_name`, u.cf.kafka_partition as `user_partition`, u.cf.event_time as `user_event_time`, u.cf.write_time as `user_write_time`, u.cf.snapshot_time as `user_snapshot_time`, u.cf.max_snapshot_time as `user_max_snapshot_time` from (select id, `name`, `kafka_partition`, `event_time`, `write_time`, `snapshot_time`, `max_snapshot_time`, cast(catalog_id as varchar) as catalog_key, cast(device_id as varchar) as device_key, cast(user_id as varchar) as user_key, `catalog_id`, `device_id`, `user_id`, `proc_time`, `event_time`, FROM user_click_source GROUP BY TUMBLE(event_time, INTERVAL '1' SECOND), `id`, `name`, `kafka_partition`, `event_time`, `write_time`, `snapshot_time`, `max_snapshot_time`, `catalog_id`, `device_id`, `user_id`, `proc_time`) click left join dim_catalog cat on click.catalog_key = cat.rowkey left join dim_device dev on click.device_key = dev.rowkey left join dim_user u on click.user_key = u.rowkey and click.event_time = u.ts ) t
Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes
Dear Leonard Xu: 我会去关注这个issue,非常感谢答疑。 原始邮件 发件人: Leonard Xu 收件人: user-zh 发送时间: 2020年8月12日(周三) 16:05 主题: Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes Hi Group by 和 left join 都是会有 retract 消息的,这类消息需要UpsertStreamTableSink才能处理, Kafka connetor 目前的实现是AppendStreamTableSink,所以不能处理 社区已经有一个issue在处理这个问题了,应该1.12能提供这个功能。 Best Leonard [1]https://issues.apache.org/jira/browse/FLINK-18826 <">https://issues.apache.org/jira/browse/FLINK-18826> > 在 2020年8月12日,15:58,xiao cai 写道: > > Hi Jark: > 版本:1.11.0 > 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: > AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate > > > 我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka > > > 附上执行sql: > create table kafka_table_1 ( > `shop_id` varchar, > `user_id` bigint, > `category_id` int, > `ts` bigint, > `row_time` timestamp(3), > `proc_time` timestamp(3), > ) with ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'user_visit_1', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.bootstrap.servers' = 'ip:9092', > 'connector.properties.zookeeper.connect' = 'ip:2181', > 'update-mode' = 'append', > 'format.type' = 'avro-registry', > 'format.schema-subject' = 'user_visit', > 'format.schema-url'='http://ip:8081', > ) > > > CREATE TABLE hbase_table ( > rowKey STRING, > cf ROW > ) WITH ( > 'connector.type' = 'hbase', > 'connector.version' = '1.4.3', > 'connector.table-name' = 'hbase_table', > 'connector.zookeeper.quorum' = 'ip:2181', > 'connector.zookeeper.znode.parent' = '/hbase', > 'connector.write.buffer-flush.max-rows' = '1000' > ) > > > > > create table kafka_table_2 ( > `shop_id` varchar, > `age` varchar, > `area` varchar > ) with ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'user_visit_2', > 'connector.startup-mode' = 'latest-offset', > 'connector.properties.bootstrap.servers' = 'ip:9092', > 'connector.properties.zookeeper.connect' = 'ip:2181', > 'update-mode' = 'append', > 'format.type' = 'avro-registry', > 'format.schema-subject' = 'user_visit', > 'format.schema-url'='http://ip:8081', > ) > > > insert into kafka_table_2(shop_id, user_id, category_id, ts, row_time, proc_time) > select shop_id, age, area > from kafka_table_1 left join hbase_table > for system_time as of kafka_table_1.proc_time as temp on kafka_table_1.shop_id = temp.rowKey > group by shop_id, age, area > > > 原始邮件 > 发件人: xiao cai > 收件人: user-zh > 发送时间: 2020年8月12日(周三) 15:41 > 主题: AppendStreamTableSink doesn't support consuming update changes > > > Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate
使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes
Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate 我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka 附上执行sql: create table kafka_table_1 ( `shop_id` varchar, `user_id` bigint, `category_id` int, `ts` bigint, `row_time` timestamp(3), `proc_time` timestamp(3), ) with ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_visit_1', 'connector.startup-mode' = 'latest-offset', 'connector.properties.bootstrap.servers' = 'ip:9092', 'connector.properties.zookeeper.connect' = 'ip:2181', 'update-mode' = 'append', 'format.type' = 'avro-registry', 'format.schema-subject' = 'user_visit', 'format.schema-url'='http://ip:8081', ) CREATE TABLE hbase_table ( rowKey STRING, cf ROW ) WITH ( 'connector.type' = 'hbase', 'connector.version' = '1.4.3', 'connector.table-name' = 'hbase_table', 'connector.zookeeper.quorum' = 'ip:2181', 'connector.zookeeper.znode.parent' = '/hbase', 'connector.write.buffer-flush.max-rows' = '1000' ) create table kafka_table_2 ( `shop_id` varchar, `age` varchar, `area` varchar ) with ( 'connector.type' = 'kafka', 'connector.version' = 'universal', 'connector.topic' = 'user_visit_2', 'connector.startup-mode' = 'latest-offset', 'connector.properties.bootstrap.servers' = 'ip:9092', 'connector.properties.zookeeper.connect' = 'ip:2181', 'update-mode' = 'append', 'format.type' = 'avro-registry', 'format.schema-subject' = 'user_visit', 'format.schema-url'='http://ip:8081', ) insert into kafka_table_2(shop_id, user_id, category_id, ts, row_time, proc_time) select shop_id, age, area from kafka_table_1 left join hbase_table for system_time as of kafka_table_1.proc_time as temp on kafka_table_1.shop_id = temp.rowKey group by shop_id, age, area 原始邮件 发件人: xiao cai 收件人: user-zh 发送时间: 2020年8月12日(周三) 15:41 主题: AppendStreamTableSink doesn't support consuming update changes Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate
AppendStreamTableSink doesn't support consuming update changes
Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate
AppendStreamTableSink doesn't support consuming update changes
Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:
Re: flink row 类型
Hi ,Dream 比如你最终拿到的是Row(10),10表示有10个字段,这些字段的顺序是固定的,那么你可以把每个字段在row里的索引的映射关系保存下来,如下 map ,然后 row.getField(map.get(fieldName))获取你需要的值 原始邮件 发件人: Dream-底限 收件人: user-zh 发送时间: 2020年7月23日(周四) 14:57 主题: Re: flink row 类型 hi、xiao cai 可以说一下思路吗,我没太懂 》》可以考虑把字段索引值保存下来再获取 Dream-底限 于2020年7月23日周四 下午2:56写道: > hi、Jingsong Li > 我查看了对应的api,并运行了demo测试,通过CallContext我可以拿到对应的字段类型,但是无法拿到对应的字段名称 > > >>在TypeInference中有input的type,这个type应该是包含字段信息的。 > > xiao cai 于2020年7月23日周四 下午2:19写道: > >> 可以考虑把字段索引值保存下来再获取 >> >> >> 原始邮件 >> 发件人: Dream-底限 >> 收件人: user-zh >> 发送时间: 2020年7月23日(周四) 14:08 >> 主题: Re: flink row 类型 >> >> >> hi 是的,我们的数据场景比较尴尬,那我想其他方式实现一下 Benchao Li >> 于2020年7月23日周四 下午12:55写道: > 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。 > >> 你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。 > > Dream-底限 < >> zhan...@akulaku.com> 于2020年7月22日周三 下午7:22写道: > > > hi、 > > >> 我这面定义row数据,类型为ROW,可以通过 > > >> row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口 > > > > >> rule_key 转换为rule_key1,rulekey2 > > 1 > > 2 > > > > > -- > > Best, > Benchao >> Li > > >
Re: flink row 类型
可以考虑把字段索引值保存下来再获取 原始邮件 发件人: Dream-底限 收件人: user-zh 发送时间: 2020年7月23日(周四) 14:08 主题: Re: flink row 类型 hi 是的,我们的数据场景比较尴尬,那我想其他方式实现一下 Benchao Li 于2020年7月23日周四 下午12:55写道: > 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。 > 你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。 > > Dream-底限 于2020年7月22日周三 下午7:22写道: > > > hi、 > > 我这面定义row数据,类型为ROW,可以通过 > > row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口 > > > > rule_key 转换为rule_key1,rulekey2 > > 1 > > 2 > > > > > -- > > Best, > Benchao Li >
Re:回复:flink1.11 set yarn slots failed
可以看这里https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html 原始邮件 发件人: Zhou Zach 收件人: user-zh 发送时间: 2020年7月16日(周四) 15:28 主题: Re:回复:flink1.11 set yarn slots failed -D前缀好使,要设置yarn name用什么参数啊,1.11官网的文档有些都不好使了 在 2020-07-16 15:03:14,"flinkcx" 写道: >是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4 > > > 原始邮件 >发件人: Zhou Zach >收件人: Flink user-zh mailing list >发送时间: 2020年7月16日(周四) 14:51 >主题: flink1.11 set yarn slots failed > > >Hi all, 使用如下命令,设置Number of slots per TaskManager /opt/flink-1.11.0/bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -ys 4 \ 发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task Slots的值,现在每次提交作业都是0
Re:回复:flink1.11 set yarn slots failed
可以看这里 原始邮件 发件人: Zhou Zach 收件人: user-zh 发送时间: 2020年7月16日(周四) 15:28 主题: Re:回复:flink1.11 set yarn slots failed -D前缀好使,要设置yarn name用什么参数啊,1.11官网的文档有些都不好使了 在 2020-07-16 15:03:14,"flinkcx" 写道: >是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4 > > > 原始邮件 >发件人: Zhou Zach >收件人: Flink user-zh mailing list >发送时间: 2020年7月16日(周四) 14:51 >主题: flink1.11 set yarn slots failed > > >Hi all, 使用如下命令,设置Number of slots per TaskManager /opt/flink-1.11.0/bin/flink run-application -t yarn-application \ -Djobmanager.memory.process.size=1024m \ -Dtaskmanager.memory.process.size=2048m \ -ys 4 \ 发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task Slots的值,现在每次提交作业都是0