flink1.12.4 写入hdfs报错 java.lang.OutOfMemoryError: Direct buffer memory

2021-11-08 文章 xiao cai
通过flink 1.12.4 streaming file sink 写入hdfs,运行过程中抛出以下异常:


2021-11-08 20:39:05
java.io.IOException: java.lang.OutOfMemoryError: Direct buffer memory
at 
org.apache.hadoop.hdfs.DataStreamer$LastExceptionInStreamer.set(DataStreamer.java:299)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:820)
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:694)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.write(IOUtil.java:58)
at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:468)
at 
org.apache.hadoop.net.SocketOutputStream$Writer.performIO(SocketOutputStream.java:63)
at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
at 
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:159)
at 
org.apache.hadoop.net.SocketOutputStream.write(SocketOutputStream.java:117)
at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at org.apache.hadoop.hdfs.DFSPacket.writeTo(DFSPacket.java:180)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:765)

Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点

2020-12-21 文章 xiao cai
Hi
可以考虑使用yarn的node label特性,将flink的任务提交到特定的node上


 Original Message 
Sender: r pp
Recipient: user-zh
Date: Monday, Dec 21, 2020 21:25
Subject: Re: Flink on yarn 如何指定固定几台yarn节点当做flink任务的运行节点


嗯,指定几台机子 用于使用flink 运行,为什么 不在yarn 为flink 专门制定 一个队列呢?需要 网络隔离 。。。内网速度多大? 
 于2020年12月21日周一 下午5:48写道: > 通过yarn label可以实现 > > 
-邮件原件- > 发件人: user-zh-return-10095-afweijian=163@flink.apache.org > 
 代表 yujianbo > 发送时间: 
2020年12月21日 16:44 > 收件人: user-zh@flink.apache.org > 主题: Flink on yarn 
如何指定固定几台yarn节点当做flink任务的运行节点 > > 各位大佬好: > 请问Flink on yarn 
如何指定固定几台yarn节点当做flink任务的运行节点? > > > > -- > Sent from: 
http://apache-flink.147419.n8.nabble.com/ >

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-08 文章 xiao cai
Hi Jark
sorry,是1.12.0, 我打错了


 Original Message 
Sender: Jark Wu
Recipient: user-zh
Date: Wednesday, Dec 9, 2020 14:40
Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型


Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的? 1.12.0 
这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao cai  
wrote: > 好的,计划下周升级测试下,另:1.12.1计划何时发布呢 > > > Original Message > Sender: Jark 
Wu > Recipient: user-zh > Date: 
Tuesday, Dec 8, 2020 13:41 > Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 > > > 
hailong 说的定义成 STRING 是在1.12 版本上支持的, > 
https://issues.apache.org/jira/browse/FLINK-18002 1.12 > 
这两天就会发布,如果能升级的话,可以尝试一下。 Best, Jark On Tue, 8 Dec 2020 at 11:56, wxpcc < > 
wxp4...@outlook.com> wrote: > 可以使用字符串的方式,或者自定义 > 
String类型format,内部结构再通过udf去做后续的实现 > > > > -- > Sent from: > 
http://apache-flink.147419.n8.nabble.com/

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-08 文章 xiao cai
Hi  Jark 


 Original Message 
Sender: Jark Wu
Recipient: user-zh
Date: Wednesday, Dec 9, 2020 14:40
Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型


Hi 赵一旦, 这部分 jackson 组件已经自动处理了这部分逻辑。 Hi xiaocai, 你有什么 issue 是需要1.12.1的? 1.12.0 
这两天即将发布。 Best, Jark On Wed, 9 Dec 2020 at 14:34, xiao cai  
wrote: > 好的,计划下周升级测试下,另:1.12.1计划何时发布呢 > > > Original Message > Sender: Jark 
Wu > Recipient: user-zh > Date: 
Tuesday, Dec 8, 2020 13:41 > Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 > > > 
hailong 说的定义成 STRING 是在1.12 版本上支持的, > 
https://issues.apache.org/jira/browse/FLINK-18002 1.12 > 
这两天就会发布,如果能升级的话,可以尝试一下。 Best, Jark On Tue, 8 Dec 2020 at 11:56, wxpcc < > 
wxp4...@outlook.com> wrote: > 可以使用字符串的方式,或者自定义 > 
String类型format,内部结构再通过udf去做后续的实现 > > > > -- > Sent from: > 
http://apache-flink.147419.n8.nabble.com/

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-08 文章 xiao cai
好的,计划下周升级测试下,另:1.12.1计划何时发布呢


 Original Message 
Sender: Jark Wu
Recipient: user-zh
Date: Tuesday, Dec 8, 2020 13:41
Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型


hailong 说的定义成 STRING 是在1.12 版本上支持的, 
https://issues.apache.org/jira/browse/FLINK-18002 1.12 这两天就会发布,如果能升级的话,可以尝试一下。 
Best, Jark On Tue, 8 Dec 2020 at 11:56, wxpcc  wrote: > 
可以使用字符串的方式,或者自定义 String类型format,内部结构再通过udf去做后续的实现 > > > > -- > Sent from: 
http://apache-flink.147419.n8.nabble.com/

Re: Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-07 文章 xiao cai
如果我只是想要返回jsonObject.toString的内容呢?不需要解析嵌套结构


 Original Message 
Sender: 赵一旦
Recipient: user-zh
Date: Monday, Dec 7, 2020 21:13
Subject: Re: Re: FlinkSQL如何定义JsonObject数据的字段类型


flink sql 支持不了这个需要。最多支持到Map,Map内部继续嵌套是不支持的。 hailongwang <18868816...@163.com> 
于2020年12月7日周一 下午8:03写道: > > > Schema 不太确定的话,那么下游怎么用这个数据呢? > > > Best, > Hailong 
> > 在 2020-12-07 15:21:16,"xiao cai"  写道: > 
>ROW需要写明具体的字段类型,比如: > >ROW<`id` string, …>,但是我并没有办法知道jsonObject中具体的schema > > > 
> > > Original Message > >Sender: 李轲 > >Recipient: 
user-zh > >Date: Monday, Dec 7, 2020 16:14 > 
>Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 > > > > > >可以试试ROW 发自我的iPhone > 在 
2020年12月7日,15:43,xiao cai  写道: > > > String不行,取出来的值是null > > 
> Original Message > Sender: silence< > slle...@aliyun.com.INVALID> > 
Recipient: user-zh > > Date: Monday, Dec 7, 2020 
14:26 > Subject: Re: > FlinkSQL如何定义JsonObject数据的字段类型 > > > 可以用string -- Sent 
from: > http://apache-flink.147419.n8.nabble.com/ >

Re:Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-07 文章 xiao cai
Schema不确定,下游可以当做JsonString来处理,不同的业务下游处理各自上游的业务,这是由业务两端自己沟通确定的。但是通用的平台这边转的时候是不知道这个规则的,所以需要有一个字段,来统一提供一个字段给外部业务方填充、传递。


 Original Message 
Sender: hailongwang<18868816...@163.com>
Recipient: user-zh
Date: Monday, Dec 7, 2020 19:52
Subject: Re:Re: FlinkSQL如何定义JsonObject数据的字段类型


Schema 不太确定的话,那么下游怎么用这个数据呢? Best, Hailong 在 2020-12-07 15:21:16,"xiao cai" 
 写道: >ROW需要写明具体的字段类型,比如: >ROW<`id` string, 
…>,但是我并没有办法知道jsonObject中具体的schema > > > Original Message >Sender: 
李轲 >Recipient: user-zh >Date: 
Monday, Dec 7, 2020 16:14 >Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型 > > 
>可以试试ROW 发自我的iPhone > 在 2020年12月7日,15:43,xiao cai  写道: > > 
String不行,取出来的值是null > > > Original Message > Sender: 
silence > Recipient: 
user-zh > Date: Monday, Dec 7, 2020 14:26 > Subject: 
Re: FlinkSQL如何定义JsonObject数据的字段类型 > > > 可以用string -- Sent from: 
http://apache-flink.147419.n8.nabble.com/

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-07 文章 xiao cai
ROW需要写明具体的字段类型,比如:
ROW<`id` string, …>,但是我并没有办法知道jsonObject中具体的schema


 Original Message 
Sender: 李轲
Recipient: user-zh
Date: Monday, Dec 7, 2020 16:14
Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型


可以试试ROW 发自我的iPhone > 在 2020年12月7日,15:43,xiao cai  写道: > > 
String不行,取出来的值是null > > > Original Message > Sender: 
silence > Recipient: 
user-zh > Date: Monday, Dec 7, 2020 14:26 > Subject: 
Re: FlinkSQL如何定义JsonObject数据的字段类型 > > > 可以用string -- Sent from: 
http://apache-flink.147419.n8.nabble.com/

Re: FlinkSQL如何定义JsonObject数据的字段类型

2020-12-06 文章 xiao cai
String不行,取出来的值是null


 Original Message 
Sender: silence
Recipient: user-zh
Date: Monday, Dec 7, 2020 14:26
Subject: Re: FlinkSQL如何定义JsonObject数据的字段类型


可以用string -- Sent from: http://apache-flink.147419.n8.nabble.com/

FlinkSQL如何定义JsonObject数据的字段类型

2020-12-06 文章 xiao cai
Hi,
flink version: 1.11.2
api: flink-sql


场景:使用flink sql定义了一张kafka的source表,kafka中数据为json格式的字符串。
其中context是json的一个键,其值为jsonObject,数据示例如下:
{ 
 “id”: 1,
 "context”:  { 
  … 
(这里的数据为jsonObject,具体schema不确定,
由各个业务方自行确定,可能嵌套,也可能不嵌套,完全不可控)
   } 
}
建表语句为:
CREATE TABLE json_source ( 
id bigint,
context 
) WITH (
 'connector' = 'kafka’, 
 'format' = 'json’ 
);


问题: 该使用什么数据类型来指定类型呢?从目前的flink sql 的 data type 
里感觉没有很合适的匹配项,不管是ROW,或者MAP都不太合适。


请求指教,万分感谢!

Re:FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

2020-11-03 文章 xiao cai
Hi wang:
非常感谢解答,我先顺着你的思路去详细了解下这个过程。
Good luck.
Best, 
xiao


 原始邮件 
发件人: hailongwang<18868816...@163.com>
收件人: user-zh
发送时间: 2020年11月3日(周二) 21:42
主题: Re:FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题


Hi xiao, 从报错来看,这个 SQL 应该是 match 了 `StreamExecJoinRule`,而 regular join 不能有 
rowtime 属性。 应该是因为你的 kafkaSouce1 table 的 rowtime 经过 group by 后使用了 last_value 
导致不是时间属性类型->`TimeIndicatorRelDataType`,而在 rule 进行判断后没有 
windowBounds,所以就报了现在这个错误了。 Best, Hailong Wang 在 2020-11-03 18:27:51,"xiao cai" 
 写道: >Hi : >flink 版本 1.11.2 >问题:双流Join时,使用last_value + 
interval join,报错:Rowtime attributes must not be in the input rows of a regular 
join. As a workaround you can cast the time attributes of input tables to 
TIMESTAMP before. > > >代码: >// stream 1 >create table kafkaSource1 ( >id int, 
>field_1 int, >field_2 varchar, >ts1 timestamp(3), >watermark for `ts1` >) with 
( >connector = kafka >) >// stream 2 >create table kafkaSource2 ( >id int, 
>field_3 >ts2 timestamp(3), >watermark for `ts2` >) with ( >connector = kafka 
>) > > >//create view >create view kafkaSource1_view as >select >field_1 as 
field_1, >last_value(field_2) as field_2, >last_value(ts1) as ts1 >from 
kafkaSouce1 >group by field_1 > > >// query >insert into sinkTable >select 
>a.field_1, >b.field_3 >from kafkaSource2 a join kafkaSource1_view b >on a.id = 
b.id >and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY

FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

2020-11-03 文章 xiao cai
Hi :
flink 版本 1.11.2
问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in 
the input rows of a regular join. As a workaround you can cast the time 
attributes of input tables to TIMESTAMP before.


代码:
// stream 1
create table kafkaSource1 (
id int,
field_1 int,
field_2 varchar,
ts1 timestamp(3),
watermark for `ts1` 
) with (
connector = kafka
)
// stream 2
create table kafkaSource2 (
id int,
field_3
ts2 timestamp(3),
watermark for `ts2` 
) with (
connector = kafka
)


//create view
create view kafkaSource1_view as 
select 
field_1 as field_1,
last_value(field_2) as field_2,
last_value(ts1) as ts1
from kafkaSouce1 
group by field_1


// query 
insert into sinkTable 
select 
a.field_1,
b.field_3
from kafkaSource2 a join kafkaSource1_view b
on a.id = b.id
and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY

FlinkSLQ双流join使用LAST_VALUE + INTERVAL JOIN时遇到问题

2020-11-02 文章 xiao cai
Hi :
flink 版本 1.11.2
问题:双流Join时,使用last_value + interval join,报错:Rowtime attributes must not be in 
the input rows of a regular join. As a workaround you can cast the time 
attributes of input tables to TIMESTAMP before.


代码:
// stream 1
create table kafkaSource1 (
id int,
field_1 int,
field_2 varchar,
ts1 timestamp(3),
watermark for `ts1` 
) with (
connector = kafka
)
// stream 2
create table kafkaSource2 (
id int,
field_3
ts2 timestamp(3),
watermark for `ts2` 
) with (
connector = kafka
)


//create view
create view kafkaSource1_view as 
select 
field_1 as field_1,
last_value(field_2) as field_2,
last_value(ts1) as ts1
from kafkaSouce1 
group by field_1


// query 
insert into sinkTable 
select 
a.field_1,
b.field_3
from kafkaSource2 a join kafkaSource1_view b
on a.id = b.id
and a.ts >= b.ts - INTERVAL ‘1’ HOUR and a.ts < b.ts + INTERVAL ‘2' DAY

如何为每个flink任务分别设置metrics的reporter

2020-10-12 文章 xiao cai
Hi:
已知的设置metrics reporter的方式是在conf/flink-conf.yaml中,如果想要为每个任务分别设置不同的metrics 
reporter或者设置不同的参数,比如设置prometheus pushgateway的多个自定义的k=v,该如何设置呢?


Best xiao.

Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-29 文章 xiao cai
这个我们有尝试,遇到了classpath的问题,导致包冲突,无法启动进程,你们有遇到过相关的情况吗?


 原始邮件 
发件人: todd
收件人: user-zh
发送时间: 2020年9月29日(周二) 17:36
主题: Re: 怎么样在Flink中使用java代码提交job到yarn


https://github.com/todd5167/flink-spark-submiter 可以参考这个案例,用ClusterCLient提交。 -- 
Sent from: http://apache-flink.147419.n8.nabble.com/

回复: Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-28 文章 xiao cai
非常感谢建议,有zeeplin api的相关文档吗




 原始邮件 
发件人: chengyanan1...@foxmail.com
收件人: user-zh
发送时间: 2020年9月29日(周二) 09:54
主题: 回复: Re: 怎么样在Flink中使用java代码提交job到yarn


我们项目中也是用到了这个,我也是暂时采用的捕获日志来解析得到yarn application id 和 flink job id的 
后期重点研究一下zeeplin,或许可以修改一下源码来镶嵌到我们自己的系统中或者直接调用zeeplin的api 发件人: xushanshan 发送时间: 
2020-09-25 16:42 收件人: user-zh 主题: Re: 怎么样在Flink中使用java代码提交job到yarn 
可以捕获控制台打印出来的日志,flink相关日志的格式很固定,字符串截取就能获得 yarn application id 和 flink job id > 在 
2020年9月25日,下午4:23,xiao cai  写道: > > Hi all: > 大家好,我目前遇到一个flink 
任务提交方面的困扰: > 想要在自己的项目中(springboot)提交flink 
job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包
 > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢 > > > 
best, > xiao

Re:Re:HistoryServer完成任务丢失的问题

2020-09-27 文章 xiao cai
貌似是个bug


 原始邮件 
发件人: xiao cai
收件人: user-zh
发送时间: 2020年9月27日(周日) 18:31
主题: Re:Re:HistoryServer完成任务丢失的问题


是在history server中没有,但是yarn 
logs还是可以看到的,我理解是任务结束前,jobManager没有upload文件到指定目录,所以history 
server没有拉取到。但是为何没有upload,我通过jobManager的日志也没看出来,没有任何报错。 原始邮件 发件人: Michael 
Ran 收件人: user-zh 发送时间: 
2020年9月27日(周日) 17:06 主题: Re:Re:HistoryServer完成任务丢失的问题 你的意思是,日志彻底消失了?完全找不到? 
不会是你任务有问题,压根就没成功,没产生吧。 理论上日志不可能平白无故消失的 在 2020-09-27 17:03:45,"xiao cai" 
 写道: >是的,默认是10s一次,但是这个是去jobmanager的归档目录里拉取的间隔。 
>问题是cancel的那次job,并没有上传日志信息到归档目录里。 > > > 原始邮件 >发件人: Michael 
Ran >收件人: user-zh >发送时间: 
2020年9月27日(周日) 16:45 >主题: Re:HistoryServer完成任务丢失的问题 > > >history 记得是定时拉取的,有延迟过去 
在 2020-09-27 16:40:27,"xiao cai"  写道: >Hi: >flink 1.11.0 
>我启动了一个任务,当这个任务进行checkpoint的时候,我在webui中点击了cancel,任务在yarn中的状态时killed,但是再到history 
server中却找不到这个任务。同时我尝试了再yarn中kill 
application的方式,也是无法发现。这个问题有点随机,不是每次都会出现,有时候执行cancel,在history 
server又能看到。希望了解history serve相关原理的同学给予帮助。 >非常感谢。 > > > > >best, >xiao.

Re:Re:HistoryServer完成任务丢失的问题

2020-09-27 文章 xiao cai
貌似是个bug,我的版本是1.11.0


https://issues.apache.org/jira/browse/FLINK-18959?jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20Bug%20AND%20text%20~%20%22history%20server%22


 原始邮件 
发件人: xiao cai
收件人: user-zh
发送时间: 2020年9月27日(周日) 18:41
主题: Re:Re:HistoryServer完成任务丢失的问题


貌似是个bug


 原始邮件 
发件人: xiao cai
收件人: user-zh
发送时间: 2020年9月27日(周日) 18:31
主题: Re:Re:HistoryServer完成任务丢失的问题


是在history server中没有,但是yarn 
logs还是可以看到的,我理解是任务结束前,jobManager没有upload文件到指定目录,所以history 
server没有拉取到。但是为何没有upload,我通过jobManager的日志也没看出来,没有任何报错。 原始邮件 发件人: Michael 
Ran 收件人: user-zh 发送时间: 
2020年9月27日(周日) 17:06 主题: Re:Re:HistoryServer完成任务丢失的问题 你的意思是,日志彻底消失了?完全找不到? 
不会是你任务有问题,压根就没成功,没产生吧。 理论上日志不可能平白无故消失的 在 2020-09-27 17:03:45,"xiao cai" 
 写道: >是的,默认是10s一次,但是这个是去jobmanager的归档目录里拉取的间隔。 
>问题是cancel的那次job,并没有上传日志信息到归档目录里。 > > > 原始邮件 >发件人: Michael 
Ran >收件人: user-zh >发送时间: 
2020年9月27日(周日) 16:45 >主题: Re:HistoryServer完成任务丢失的问题 > > >history 记得是定时拉取的,有延迟过去 
在 2020-09-27 16:40:27,"xiao cai"  写道: >Hi: >flink 1.11.0 
>我启动了一个任务,当这个任务进行checkpoint的时候,我在webui中点击了cancel,任务在yarn中的状态时killed,但是再到history 
server中却找不到这个任务。同时我尝试了再yarn中kill 
application的方式,也是无法发现。这个问题有点随机,不是每次都会出现,有时候执行cancel,在history 
server又能看到。希望了解history serve相关原理的同学给予帮助。 >非常感谢。 > > > > >best, >xiao.

Re:Re:HistoryServer完成任务丢失的问题

2020-09-27 文章 xiao cai
是在history server中没有,但是yarn 
logs还是可以看到的,我理解是任务结束前,jobManager没有upload文件到指定目录,所以history 
server没有拉取到。但是为何没有upload,我通过jobManager的日志也没看出来,没有任何报错。


 原始邮件 
发件人: Michael Ran
收件人: user-zh
发送时间: 2020年9月27日(周日) 17:06
主题: Re:Re:HistoryServer完成任务丢失的问题


你的意思是,日志彻底消失了?完全找不到? 不会是你任务有问题,压根就没成功,没产生吧。 理论上日志不可能平白无故消失的 在 2020-09-27 
17:03:45,"xiao cai"  写道: 
>是的,默认是10s一次,但是这个是去jobmanager的归档目录里拉取的间隔。 >问题是cancel的那次job,并没有上传日志信息到归档目录里。 > > 
> 原始邮件 >发件人: Michael Ran >收件人: 
user-zh >发送时间: 2020年9月27日(周日) 16:45 >主题: 
Re:HistoryServer完成任务丢失的问题 > > >history 记得是定时拉取的,有延迟过去 在 2020-09-27 
16:40:27,"xiao cai"  写道: >Hi: >flink 1.11.0 
>我启动了一个任务,当这个任务进行checkpoint的时候,我在webui中点击了cancel,任务在yarn中的状态时killed,但是再到history 
server中却找不到这个任务。同时我尝试了再yarn中kill 
application的方式,也是无法发现。这个问题有点随机,不是每次都会出现,有时候执行cancel,在history 
server又能看到。希望了解history serve相关原理的同学给予帮助。 >非常感谢。 > > > > >best, >xiao.

Re:HistoryServer完成任务丢失的问题

2020-09-27 文章 xiao cai
是的,默认是10s一次,但是这个是去jobmanager的归档目录里拉取的间隔。
问题是cancel的那次job,并没有上传日志信息到归档目录里。


 原始邮件 
发件人: Michael Ran
收件人: user-zh
发送时间: 2020年9月27日(周日) 16:45
主题: Re:HistoryServer完成任务丢失的问题


history 记得是定时拉取的,有延迟过去 在 2020-09-27 16:40:27,"xiao cai"  写道: 
>Hi: >flink 1.11.0 
>我启动了一个任务,当这个任务进行checkpoint的时候,我在webui中点击了cancel,任务在yarn中的状态时killed,但是再到history 
server中却找不到这个任务。同时我尝试了再yarn中kill 
application的方式,也是无法发现。这个问题有点随机,不是每次都会出现,有时候执行cancel,在history 
server又能看到。希望了解history serve相关原理的同学给予帮助。 >非常感谢。 > > > > >best, >xiao.

HistoryServer完成任务丢失的问题

2020-09-27 文章 xiao cai
Hi:
flink 1.11.0
我启动了一个任务,当这个任务进行checkpoint的时候,我在webui中点击了cancel,任务在yarn中的状态时killed,但是再到history 
server中却找不到这个任务。同时我尝试了再yarn中kill 
application的方式,也是无法发现。这个问题有点随机,不是每次都会出现,有时候执行cancel,在history 
server又能看到。希望了解history serve相关原理的同学给予帮助。
非常感谢。




best,
xiao.

Flink SQL如何设置checkpoint的TTL

2020-09-27 文章 xiao cai
Hi:
目前想了解下载Flink SQL下该如何设置checkpoint的TTL。
非常感谢指教


Best,
xiao.

Re:Re:Re: Re: Flink SQL撤回流问题

2020-09-27 文章 xiao cai
Hi Michael Ran: 
是的,我其实也不需要id,但是dba建表要求必须有自增id,所以才发现这个问题。我去查了mysql的文档,是innodb对auto_increment做了设置,默认就会对所有insert执行auto_increment
 + 1操作,可以通过修改innodb的配置来避免这个情况,但是会引擎写入性能的下降(有锁)。这个问题确实很隐含,很难发现,非常感谢解答。


但是,对于insert into on dumplicate 
key的方式还是有质疑,感觉如果可以将insert和update明确的区分开,这样会更加好。再次感谢。
 原始邮件 
发件人: Michael Ran
收件人: user-zh
发送时间: 2020年9月27日(周日) 15:03
主题: Re:Re:Re: Re: Flink SQL撤回流问题


感觉这不是flink的问题,我们也有类似场景,dt 按天其实并不多,直接就没要id了,如果你非要id,而且数量变化巨大,那么用integer 
,当然还是有可能超。 auto 一般适用数据量不大的单表场景。分布式大数据量场景,都是自己设计id,或者不要id 在 2020-09-27 
14:56:06,"xiao cai"  写道: >Hi Ran: >非常感谢,我试了insert into ON 
DUPLICATE KEY UPDATE dt=“dt"的方式,确实是会出现update的始终是id=1,但是auto_increment 
却一直增加的情况。感觉这样不是很合理,因为随着数据量的增加,迟早会出现数值越界的情形。 > > > 原始邮件 >发件人: Michael 
Ran >收件人: user-zh >发送时间: 
2020年9月27日(周日) 14:37 >主题: Re:Re: Re: Flink SQL撤回流问题 > > >没有传入id,始终是1 ? 
那就是第一次insert update 之后,生成的1.后面都是insert into table(dt,num) 
values(dt,新数量) ON DUPLICATE KEY UPDATE dt=values(dt)你模拟下这个语句呢,看看id成为1 
之后,是不是就不变了 在 2020-09-27 14:32:57,"xiao cai"  写道: >Hi lec ssmi: 
> insert的时候没有指定id,只指定了dt和cnt,因为id在mysql的sink表里是自增的主键,所以flink sql里并没有指定。 > 原始邮件 
>发件人: lec ssmi >收件人: 
flink-user-cn >发送时间: 2020年9月27日(周日) 14:25 >主题: Re: 
Re: Flink SQL撤回流问题 > > >你那个主键=1,应该是传进去的,不是mysql自动生成的吧。所以auto_increment才会一直增长啊。 
kandy.wang  于2020年9月27日周日 下午2:01写道: > > > > > > > hi > 
你建mysql要指定主键,另外创建flink表时也要指定一下主键 > > PRIMARY KEY (id) NOT 
ENFORCED,这样就会根据主键upsert了 > > > > > > > > > 在 2020-09-27 13:36:25,"xiao cai" 
 写道: > > 
>如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。 > 
>这是我很困惑的地方。 > > > > > > 原始邮件 > >发件人: lec ssmi > >收件人: 
flink-user-cn > >发送时间: 2020年9月27日(周日) 13:06 > >主题: 
Re: Flink SQL撤回流问题 > > > > > >是不是底层的sink在处理retract的时候,使用的是先delte再insert , 
而不是直接update呢。 xiao cai < > flin...@163.com> 于2020年9月27日周日 下午12:08写道: > 场景如下: > 
source table: kafka > > sink table: mysql schem(id, dt, cnt) > > > insert : > 
insert into sink > > select dt,count(distinct id) from source group by dt; > > 
> > 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show 
> create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > > 
发件人: Michael Ran > 收件人: user-zh< > 
user-zh@flink.apache.org> > 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink > 
SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai"  > 
写道: >Hi: > >使用Flink SQL撤回流写入MySQL,表的auto_increment > > 
越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。 >

Re:Re: Re: Flink SQL撤回流问题

2020-09-26 文章 xiao cai
Hi Ran:
非常感谢,我试了insert into ON DUPLICATE KEY UPDATE 
dt=“dt"的方式,确实是会出现update的始终是id=1,但是auto_increment 
却一直增加的情况。感觉这样不是很合理,因为随着数据量的增加,迟早会出现数值越界的情形。


 原始邮件 
发件人: Michael Ran
收件人: user-zh
发送时间: 2020年9月27日(周日) 14:37
主题: Re:Re: Re: Flink SQL撤回流问题


没有传入id,始终是1 ? 那就是第一次insert update 之后,生成的1.后面都是insert into 
table(dt,num) values(dt,新数量) ON DUPLICATE KEY UPDATE 
dt=values(dt)你模拟下这个语句呢,看看id成为1 之后,是不是就不变了 在 2020-09-27 14:32:57,"xiao 
cai"  写道: >Hi lec ssmi: > 
insert的时候没有指定id,只指定了dt和cnt,因为id在mysql的sink表里是自增的主键,所以flink sql里并没有指定。 > 原始邮件 
>发件人: lec ssmi >收件人: 
flink-user-cn >发送时间: 2020年9月27日(周日) 14:25 >主题: Re: 
Re: Flink SQL撤回流问题 > > >你那个主键=1,应该是传进去的,不是mysql自动生成的吧。所以auto_increment才会一直增长啊。 
kandy.wang  于2020年9月27日周日 下午2:01写道: > > > > > > > hi > 
你建mysql要指定主键,另外创建flink表时也要指定一下主键 > > PRIMARY KEY (id) NOT 
ENFORCED,这样就会根据主键upsert了 > > > > > > > > > 在 2020-09-27 13:36:25,"xiao cai" 
 写道: > > 
>如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。 > 
>这是我很困惑的地方。 > > > > > > 原始邮件 > >发件人: lec ssmi > >收件人: 
flink-user-cn > >发送时间: 2020年9月27日(周日) 13:06 > >主题: 
Re: Flink SQL撤回流问题 > > > > > >是不是底层的sink在处理retract的时候,使用的是先delte再insert , 
而不是直接update呢。 xiao cai < > flin...@163.com> 于2020年9月27日周日 下午12:08写道: > 场景如下: > 
source table: kafka > > sink table: mysql schem(id, dt, cnt) > > > insert : > 
insert into sink > > select dt,count(distinct id) from source group by dt; > > 
> > 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show 
> create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > > 
发件人: Michael Ran > 收件人: user-zh< > 
user-zh@flink.apache.org> > 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink > 
SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai"  > 
写道: >Hi: > >使用Flink SQL撤回流写入MySQL,表的auto_increment > > 
越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。 >

Re: Re: Flink SQL撤回流问题

2020-09-26 文章 xiao cai
Hi lec ssmi:
  insert的时候没有指定id,只指定了dt和cnt,因为id在mysql的sink表里是自增的主键,所以flink sql里并没有指定。
 原始邮件 
发件人: lec ssmi
收件人: flink-user-cn
发送时间: 2020年9月27日(周日) 14:25
主题: Re: Re: Flink SQL撤回流问题


你那个主键=1,应该是传进去的,不是mysql自动生成的吧。所以auto_increment才会一直增长啊。 kandy.wang 
 于2020年9月27日周日 下午2:01写道: > > > > > > > hi > 
你建mysql要指定主键,另外创建flink表时也要指定一下主键 > > PRIMARY KEY (id) NOT 
ENFORCED,这样就会根据主键upsert了 > > > > > > > > > 在 2020-09-27 13:36:25,"xiao cai" 
 写道: > > 
>如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。 > 
>这是我很困惑的地方。 > > > > > > 原始邮件 > >发件人: lec ssmi > >收件人: 
flink-user-cn > >发送时间: 2020年9月27日(周日) 13:06 > >主题: 
Re: Flink SQL撤回流问题 > > > > > >是不是底层的sink在处理retract的时候,使用的是先delte再insert , 
而不是直接update呢。 xiao cai < > flin...@163.com> 于2020年9月27日周日 下午12:08写道: > 场景如下: > 
source table: kafka > > sink table: mysql schem(id, dt, cnt) > > > insert : > 
insert into sink > > select dt,count(distinct id) from source group by dt; > > 
> > 这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show 
> create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > > 
发件人: Michael Ran > 收件人: user-zh< > 
user-zh@flink.apache.org> > 发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink > 
SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai"  > 
写道: >Hi: > >使用Flink SQL撤回流写入MySQL,表的auto_increment > > 
越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。 >

Re:Re: Flink SQL撤回流问题

2020-09-26 文章 xiao cai
Hi kandy.wang:
忘记说明,我指定了dt为primary 
key,按理说会按照dt做update,但是为何auto_increment会不断的变大呢,而id也没有变化,id字段值始终为1。还望解惑。


 原始邮件 
发件人: kandy.wang
收件人: user-zh
发送时间: 2020年9月27日(周日) 14:01
主题: Re:Re: Flink SQL撤回流问题


hi 你建mysql要指定主键,另外创建flink表时也要指定一下主键 PRIMARY KEY (id) NOT 
ENFORCED,这样就会根据主键upsert了 在 2020-09-27 13:36:25,"xiao cai"  写道: 
>如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。 
>这是我很困惑的地方。 > > > 原始邮件 >发件人: lec ssmi >收件人: 
flink-user-cn >发送时间: 2020年9月27日(周日) 13:06 >主题: Re: 
Flink SQL撤回流问题 > > >是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。 
xiao cai  于2020年9月27日周日 下午12:08写道: > 场景如下: > source table: 
kafka > sink table: mysql schem(id, dt, cnt) > > > insert : > insert into sink 
> select dt,count(distinct id) from source group by dt; > > > 
这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show 
create table sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > 
发件人: Michael Ran > 收件人: user-zh > 
发送时间: 2020年9月27日(周日) 11:51 > 主题: Re:Flink SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 
11:48:36,"xiao cai"  写道: >Hi: > >使用Flink 
SQL撤回流写入MySQL,表的auto_increment > 
越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。

Re: Flink SQL撤回流问题

2020-09-26 文章 xiao cai
如果是先delete再insert,为何主键id一直都是1呢,如果delete在insert时指定了id值,那么auto_increment是不会变的。
这是我很困惑的地方。


 原始邮件 
发件人: lec ssmi
收件人: flink-user-cn
发送时间: 2020年9月27日(周日) 13:06
主题: Re: Flink SQL撤回流问题


是不是底层的sink在处理retract的时候,使用的是先delte再insert , 而不是直接update呢。 xiao cai 
 于2020年9月27日周日 下午12:08写道: > 场景如下: > source table: kafka > sink 
table: mysql schem(id, dt, cnt) > > > insert : > insert into sink > select 
dt,count(distinct id) from source group by dt; > > > 这时mysql对应sink表中有一条数据(1, 
2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变 > show create table 
sink可以发现auto_increment在不断的变大。 > 当超过id的取值范围,就会报错了。 > > > 原始邮件 > 发件人: Michael 
Ran > 收件人: user-zh > 发送时间: 
2020年9月27日(周日) 11:51 > 主题: Re:Flink SQL撤回流问题 > > > 详细场景描述下呢 在 2020-09-27 
11:48:36,"xiao cai"  写道: >Hi: > >使用Flink 
SQL撤回流写入MySQL,表的auto_increment > 
越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。

Re:Flink SQL撤回流问题

2020-09-26 文章 xiao cai
场景如下:
source table:   kafka
sink table:   mysql  schem(id, dt, cnt)


insert :
insert into sink 
select dt,count(distinct id) from source group by dt;


这时mysql对应sink表中有一条数据(1, 2020-09-25, 100),随着事件消费,cnt字段会不断变化,id,dt保持不变
show create table sink可以发现auto_increment在不断的变大。
当超过id的取值范围,就会报错了。


 原始邮件 
发件人: Michael Ran
收件人: user-zh
发送时间: 2020年9月27日(周日) 11:51
主题: Re:Flink SQL撤回流问题


详细场景描述下呢 在 2020-09-27 11:48:36,"xiao cai"  写道: >Hi: >使用Flink 
SQL撤回流写入MySQL,表的auto_increment 
越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。

Flink SQL撤回流问题

2020-09-26 文章 xiao cai
Hi:
使用Flink SQL撤回流写入MySQL,表的auto_increment 
越来越大是为什么,我的输出结果只有一条,mysql表里也只有一条数据,自增主键id的值一直为1,但是auto_increment却越来越大。求解答。

Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-25 文章 xiao cai
Hi zilong:
这种方式我考虑过,个人认为平台层面如果有业务逻辑的侵入,会影响后续的升级。所以我们是在标注输出中正则匹配出jobId和applicationId。你了解YarnClusterDescripto吗?之前社区看到有人用这个提交的。


 原始邮件 
发件人: zilong xiao
收件人: user-zh
发送时间: 2020年9月25日(周五) 17:12
主题: Re: 怎么样在Flink中使用java代码提交job到yarn


你提交的任务是可以指定job name的呀,你的job name和你的业务主键绑定就可以做到唯一了,然后根据这个关系查询即可,没记错-ynm 是指定job 
name的 xiao cai  于2020年9月25日周五 下午5:01写道: > hi zilong: > 
通过process提交任务以后,通过rest > 
api,如何知道哪一个是我提交的呢?如果这时有多个请求过来同时都提交了任务,怎么知道rest返回的application应该与哪一次提交对应呢? > > > 
原始邮件 > 发件人: zilong xiao > 收件人: 
user-zh > 发送时间: 2020年9月25日(周五) 16:55 > 主题: Re: 
怎么样在Flink中使用java代码提交job到yarn > > > 我知道呀,你不是从输出里抠的嘛?你想要更优雅的方式可以用rest 
api啊,这些信息都是可以拿到的 xiao cai < > flin...@163.com> 于2020年9月25日周五 下午4:53写道: > hi 
zilong: > > 你说的这些信息我目前都能拿到,已经实现了。只是实现的方式很不优雅,而且提交的速度很慢。 > > > 原始邮件 > 发件人: 
zilong xiao< > acidzz...@gmail.com> > 收件人: user-zh > 
发送时间: > 2020年9月25日(周五) 16:48 > 主题: Re: 怎么样在Flink中使用java代码提交job到yarn > > > > 
JobId是说Flink的JobId还是yarn上面的application ID呢?containerID可以通过yarn rest api拿到呀 > > 
xiao cai  于2020年9月25日周五 下午4:43写道: > > > > 
使用process没办法拿到当前提交任务的JobId和提交到Yarn上的对应的containerId,自能从标准输出中使用正则解析出来。我们目前使用的就是这种方式,已经实现了。但是这样很不优雅,而且提交的速度也会比较慢。
 > > > > > 原始邮件 > 发件人: zilong xiao > 收件人: user-zh< > > 
user-zh@flink.apache.org> > 发送时间: 2020年9月25日(周五) 16:32 > 主题: Re: > > 
怎么样在Flink中使用java代码提交job到yarn > > > > > 
Java程序用process调用脚本提交任务没啥问题吧,获取jobId&containerId的问题,我理解可以用yarn rest api & > > > 
flink rest api来完成,希望对你有帮助,祝好~ xiao cai  于2020年9月25日周五 > > > 
下午4:23写道: > Hi all: > 大家好,我目前遇到一个flink 任务提交方面的困扰: > > > > 
想要在自己的项目中(springboot)提交flink > > > > 
job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包
 > > > > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢 > > 
> > > > best, > xiao

Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-25 文章 xiao cai
hi zilong:
通过process提交任务以后,通过rest 
api,如何知道哪一个是我提交的呢?如果这时有多个请求过来同时都提交了任务,怎么知道rest返回的application应该与哪一次提交对应呢?


 原始邮件 
发件人: zilong xiao
收件人: user-zh
发送时间: 2020年9月25日(周五) 16:55
主题: Re: 怎么样在Flink中使用java代码提交job到yarn


我知道呀,你不是从输出里抠的嘛?你想要更优雅的方式可以用rest api啊,这些信息都是可以拿到的 xiao cai  
于2020年9月25日周五 下午4:53写道: > hi zilong: > 
你说的这些信息我目前都能拿到,已经实现了。只是实现的方式很不优雅,而且提交的速度很慢。 > > > 原始邮件 > 发件人: zilong 
xiao > 收件人: user-zh > 发送时间: 
2020年9月25日(周五) 16:48 > 主题: Re: 怎么样在Flink中使用java代码提交job到yarn > > > 
JobId是说Flink的JobId还是yarn上面的application ID呢?containerID可以通过yarn rest api拿到呀 > 
xiao cai  于2020年9月25日周五 下午4:43写道: > > > 
使用process没办法拿到当前提交任务的JobId和提交到Yarn上的对应的containerId,自能从标准输出中使用正则解析出来。我们目前使用的就是这种方式,已经实现了。但是这样很不优雅,而且提交的速度也会比较慢。
 > > > > 原始邮件 > 发件人: zilong xiao > 收件人: user-zh< > 
user-zh@flink.apache.org> > 发送时间: 2020年9月25日(周五) 16:32 > 主题: Re: > 
怎么样在Flink中使用java代码提交job到yarn > > > > 
Java程序用process调用脚本提交任务没啥问题吧,获取jobId&containerId的问题,我理解可以用yarn rest api & > > 
flink rest api来完成,希望对你有帮助,祝好~ xiao cai  于2020年9月25日周五 > > 
下午4:23写道: > Hi all: > 大家好,我目前遇到一个flink 任务提交方面的困扰: > > > 
想要在自己的项目中(springboot)提交flink > > > 
job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包
 > > > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢 > > > 
> > best, > xiao

Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-25 文章 xiao cai
hi zilong:
你说的这些信息我目前都能拿到,已经实现了。只是实现的方式很不优雅,而且提交的速度很慢。


 原始邮件 
发件人: zilong xiao
收件人: user-zh
发送时间: 2020年9月25日(周五) 16:48
主题: Re: 怎么样在Flink中使用java代码提交job到yarn


JobId是说Flink的JobId还是yarn上面的application ID呢?containerID可以通过yarn rest api拿到呀 xiao 
cai  于2020年9月25日周五 下午4:43写道: > > 
使用process没办法拿到当前提交任务的JobId和提交到Yarn上的对应的containerId,自能从标准输出中使用正则解析出来。我们目前使用的就是这种方式,已经实现了。但是这样很不优雅,而且提交的速度也会比较慢。
 > > > 原始邮件 > 发件人: zilong xiao > 收件人: 
user-zh > 发送时间: 2020年9月25日(周五) 16:32 > 主题: Re: 
怎么样在Flink中使用java代码提交job到yarn > > > 
Java程序用process调用脚本提交任务没啥问题吧,获取jobId&containerId的问题,我理解可以用yarn rest api & > 
flink rest api来完成,希望对你有帮助,祝好~ xiao cai  于2020年9月25日周五 > 
下午4:23写道: > Hi all: > 大家好,我目前遇到一个flink 任务提交方面的困扰: > > 
想要在自己的项目中(springboot)提交flink > > 
job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包
 > > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢 > > > > 
best, > xiao

Re: 怎么样在Flink中使用java代码提交job到yarn

2020-09-25 文章 xiao cai
使用process没办法拿到当前提交任务的JobId和提交到Yarn上的对应的containerId,自能从标准输出中使用正则解析出来。我们目前使用的就是这种方式,已经实现了。但是这样很不优雅,而且提交的速度也会比较慢。


 原始邮件 
发件人: zilong xiao
收件人: user-zh
发送时间: 2020年9月25日(周五) 16:32
主题: Re: 怎么样在Flink中使用java代码提交job到yarn


Java程序用process调用脚本提交任务没啥问题吧,获取jobId&containerId的问题,我理解可以用yarn rest api & flink 
rest api来完成,希望对你有帮助,祝好~ xiao cai  于2020年9月25日周五 下午4:23写道: > Hi 
all: > 大家好,我目前遇到一个flink 任务提交方面的困扰: > 想要在自己的项目中(springboot)提交flink > 
job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包
 > 所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。 > 非常感谢 > > > 
best, > xiao

怎么样在Flink中使用java代码提交job到yarn

2020-09-25 文章 xiao cai
Hi all:
大家好,我目前遇到一个flink 任务提交方面的困扰:
想要在自己的项目中(springboot)提交flink 
job到yarn上。目前采用的方式是process类在本地起进程,调用bin/flink来启动。这样不是很优雅,而且比较难获取到提交的jobId和containerId。看到网上有博客使用ClusterClient的方式来提交,但是遇到了classpath的问题,会缺失一些FLINK_HOME/lib下的jar包
所以我想问,应该如何在自己的应用中提交任务到yarn,既能拿到任务信息,又可以解决classpath为指定带来的困扰。
非常感谢


best,
xiao

Re: 如何在启动taskmanager时传入自定义的java参数

2020-09-15 文章 xiao cai
学习了,非常感谢~


 原始邮件 
发件人: wch...@163.com
收件人: user-zh@flink.apache.org
发送时间: 2020年9月15日(周二) 19:18
主题: Re: 如何在启动taskmanager时传入自定义的java参数


官网有相关配置 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jvm-and-logging-options
 taskmanager自定义参数应该是使用 env.java.opts.taskmanager 下面是我的部分启动启动参数: 
/data1/flink-1.10.0/bin/flink run -m yarn-cluster -ynm smartStareJob -yjm 2048 
-ytm 4096 -ys 2 -p 6 \ -yD 
env.java.opts="-Dxdiamond.server.host=daily.inzwc.com 
-Dxdiamond.project.profile=daily" \ -yD zookeeper.sasl.disable=true \ -yD 
taskmanager.exit-on-fatal-akka-erro=ture \ -yD 
taskmanager.network.netty.client.numThreads=2 \ -yD 
taskmanager.network.netty.server.numThreads=2 \ -c 
com.hstong.fintech.cep.main.SmartStareJob /data0/www/quant-cep.jar \ --profile 
daily --channalName smartStareChannal wch...@163.com 发件人: xiao cai 发送时间: 
2020-09-15 17:46 收件人: user-zh 主题: 如何在启动taskmanager时传入自定义的java参数 Hi: 
我修改了flink的一些源码,需要通过外部-Dkey=value的形式动态将值传入,试了下无法直接通过bin/flink run 
后加-D的方式来添加,有什么好的办法吗?

Re: 如何在启动taskmanager时传入自定义的java参数

2020-09-15 文章 xiao cai
解决了我的问题,非常感谢。


 原始邮件 
发件人: zilong xiao
收件人: user-zh
发送时间: 2020年9月15日(周二) 18:23
主题: Re: 如何在启动taskmanager时传入自定义的java参数


可以在flink-conf.yaml里设置,例如: env.java.opts: -Djob.name={{job_name}} xiao cai 
 于2020年9月15日周二 下午5:46写道: > Hi: > 
我修改了flink的一些源码,需要通过外部-Dkey=value的形式动态将值传入,试了下无法直接通过bin/flink run > 
后加-D的方式来添加,有什么好的办法吗?

如何在启动taskmanager时传入自定义的java参数

2020-09-15 文章 xiao cai
Hi:
我修改了flink的一些源码,需要通过外部-Dkey=value的形式动态将值传入,试了下无法直接通过bin/flink run 
后加-D的方式来添加,有什么好的办法吗?

FlinkSQL如何处理上游的表结构变更

2020-09-03 文章 xiao cai
Hi all:
flink version : 1.11.0
场景:上游的数据来自binlog,当发生表结构变更时,希望能够实时的变动flink内部表的schema,但是目前来看,表的schema都是create 
table时写死的,有什么办法可以处理这种场景呢

Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据

2020-09-01 文章 xiao cai
Hi shizk233:
我这边也复现了你说的情况,一模一样。
   
可以尝试使用定时调度任务检查flink任务的执行情况,当不再处于运行状态时,主动调用pushgateway的delete方法来删除pushgetway的metrics。




 原始邮件 
发件人: shizk233
收件人: user-zh@flink.apache.org
发送时间: 2020年9月1日(周二) 19:10
主题: Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据


Hi Xiao, 我这边实践过程中发现,该参数只能删除jobmanager对应的metrics group,不能删除tm的。 
我们开启了randomJobNameSuffix,该参数会让JM和TM的metrics信息分属不同metrics group。 感觉这可能是一个bug? 
xiao cai  于2020年9月1日周二 下午4:57写道: > Hi: > 
可以试试在flink-conf.yaml中添加: > metrics.reporter.promgateway.deleteOnShutdown: true 
> > > Best, > Xiao > 原始邮件 > 发件人: bradyMk > 收件人: 
user-zh > 发送时间: 2020年9月1日(周二) 16:50 > 主题: Re: 
回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据 > > > 
您好,我不是很懂您的意思,例如我这边有一个这样的指标:flink_jobmanager_job_uptime 监控一个任务的运行时长; > 
如果该任务被kill掉,那么这个指标的数值会变成一个不变的量,一直显示在grafana中。我不太会promeQL,我尝试这样: > 
flink_jobmanager_job_uptime[1m],这样是个非法查询命令,按照您的意思,应该怎么改呢? - Best Wishes > 
-- Sent from: http://apache-flink.147419.n8.nabble.com/

回复:关于FlinkSQL的窗口和触发

2020-09-01 文章 xiao cai
Hi:
可以试试增加如下配置:
table.exec.emit.early-fire.enabled = true
  table.exec.emit.early-fire.delay = 1


Best,
Xiao


 原始邮件 
发件人: Cayden chen<1193216...@qq.com>
收件人: user-zh
发送时间: 2020年9月1日(周二) 17:10
主题: 回复:关于FlinkSQL的窗口和触发


hi,目前不支持。sql语义只支持窗口结束触发计算 -- 原始邮件 -- 
发件人: "user-zh" 

Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据

2020-09-01 文章 xiao cai
Hi:
可以试试在flink-conf.yaml中添加:
metrics.reporter.promgateway.deleteOnShutdown: true


Best,
Xiao
 原始邮件 
发件人: bradyMk
收件人: user-zh
发送时间: 2020年9月1日(周二) 16:50
主题: Re: 回复:flink1.9.1采用Prometheus Pushgateway监控,当任务被kill掉,但grafana还是可以监控到数据


您好,我不是很懂您的意思,例如我这边有一个这样的指标:flink_jobmanager_job_uptime 监控一个任务的运行时长; 
如果该任务被kill掉,那么这个指标的数值会变成一个不变的量,一直显示在grafana中。我不太会promeQL,我尝试这样: 
flink_jobmanager_job_uptime[1m],这样是个非法查询命令,按照您的意思,应该怎么改呢? - Best Wishes -- 
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-24 文章 xiao cai
Hi
确实可以稳定复现,failover后就会出现找不到lib包中的jar文件里的class文件,只能重启。不过我是cli模式启动的on-yarn,没有试过per-job和application,计划这两天尝试下application指定jar包地址到hdfs上,看是否能够复现。


Best,
xiao cai


 原始邮件 
发件人: Congxian Qiu
收件人: user-zh
发送时间: 2020年8月24日(周一) 20:39
主题: Re: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件


Hi 理论上第一次能启动,后续的 failover 也应该是可以正常恢复的。你这边是稳定复现吗?如果能够稳定复现的话,有可能是 bug Best, 
Congxian xiao cai  于2020年8月20日周四 下午2:27写道: > Hi: > 
感谢答复,确实是个思路。 > > 
不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。
 > > > Best, > xiao cai > > > 原始邮件 > 发件人: 范超 > 收件人: 
user-zh@flink.apache.org > 发送时间: 2020年8月20日(周四) 09:11 
> 主题: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 > > > 
我之前开启job的failover > 
restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task
 > executor No TaskExecutor registered under containe_. > 
我这边干脆写了个脚本通过savepoint的方式来reload应用了 希望对你有帮助 -邮件原件- 发件人: xiao cai > 
[mailto:flin...@163.com] 发送时间: 2020年8月19日 星期三 12:50 收件人: user-zh < > 
user-zh@flink.apache.org> 主题: Flink on Yarn > 
启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 如题:link on Yarn > 
启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我的任务时kafka source -> hbase sink > 
任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。
 > Best xiao cai 错误堆栈: 2020-08-19 11:23:08,099 INFO > 
org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers. > 
2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] - > 
Received 1 containers with resource , 1 pending > 
container requests. 2020-08-19 11:23:08,100 INFO > 
org.apache.flink.yarn.YarnResourceManager [] - TaskExecutor > 
container_e07_1596440446172_0094_01_69 will be started on 10.3.15.22 > with 
TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb > (134217728 
bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), > 
taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, > 
networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb > 
(536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), > 
jvmOverheadSize=192.000mb (201326592 bytes)}. 2020-08-19 11:23:08,101 INFO > 
org.apache.flink.yarn.YarnResourceManager [] - Creating container launch > 
context for TaskManagers 2020-08-19 11:23:08,101 INFO > 
org.apache.flink.yarn.YarnResourceManager [] - Starting TaskManagers > 
2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - > 
Removing container request Capability[]Priority[1]. > 
2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - > 
Accepted 1 requested containers, returned 0 excess containers, 0 pending > 
container requests of resource . 2020-08-19 > 
11:23:08,102 INFO > 
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - > 
Processing Event EventType: START_CONTAINER for Container > 
container_e07_1596440446172_0094_01_69 2020-08-19 11:23:10,851 ERROR > 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler > 
[] - Unhandled exception. > 
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
 > No TaskExecutor registered under > 
container_e07_1596440446172_0094_01_68. at > 
org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560)
 > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > 
sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at > 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 > ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498) > 
~[?:1.8.0_191] at > 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
 > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
 > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at > 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 > ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at > 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at > 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > 
~[flink-dist_2.1

DDL中声明主键会报类型不匹配

2020-08-20 文章 xiao cai
Hi:
flink版本1.11.0 connector为kafka
DDL中声明某个字段为primary key时,会报类型不匹配,去掉primary key constraint就可以正常执行。
把shop_id设置为 varchar not null也不行。


org.apache.flink.table.api.ValidationException: Type STRING NOT NULL of table 
field 'shop_id' does not match with the physical type STRING of the 'shop_id' 
field of the TableSource return type.


SQL如下:
create table source_0 (  
  `shop_id` varchar,  
  `user_id` bigint,  
  `category_id` int,  
  `ts` bigint,  
  `proc_time` as PROCTIME(),  
  `event_time` AS TO_TIMESTAMP(FROM_UNIXTIME(`ts` / 1000, '-MM-dd 
HH:mm:ss')),  
  watermark for event_time AS event_time, 
  PRIMARY KEY (shop_id, user_id) NOT ENFORCED 
  ) with (  
  'connector.type' = 'kafka',  


  )

回复:答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-19 文章 xiao cai
Hi:
感谢答复,确实是个思路。
不过个人感觉能够在启动第一个container的时候,将本地的lib中的jar文件上传到hdfs中,然后后续的failover的container能够统一从hdfs中获取,这样应该就不会有这个问题了。貌似社区在1.11版本针对jar的拷贝做了优化,我还在看这方面的内容。有进展再交流。


Best,
xiao cai


 原始邮件 
发件人: 范超
收件人: user-zh@flink.apache.org
发送时间: 2020年8月20日(周四) 09:11
主题: 答复: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件


我之前开启job的failover 
restart,结果也是发现yarn直接申请新的container且旧的container并没有进一步进行处理,一直导致你这样子的报错,旧的container没有绑定的task
 executor No TaskExecutor registered under containe_. 
我这边干脆写了个脚本通过savepoint的方式来reload应用了 希望对你有帮助 -邮件原件- 发件人: xiao cai 
[mailto:flin...@163.com] 发送时间: 2020年8月19日 星期三 12:50 收件人: user-zh 
 主题: Flink on Yarn 
启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 如题:link on Yarn 
启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 我的任务时kafka source -> hbase sink 
任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。
 Best xiao cai 错误堆栈: 2020-08-19 11:23:08,099 INFO 
org.apache.flink.yarn.YarnResourceManager [] - Received 1 containers. 
2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager [] - 
Received 1 containers with resource , 1 pending 
container requests. 2020-08-19 11:23:08,100 INFO 
org.apache.flink.yarn.YarnResourceManager [] - TaskExecutor 
container_e07_1596440446172_0094_01_69 will be started on 10.3.15.22 with 
TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb (134217728 
bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), 
taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, 
networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb 
(536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), 
jvmOverheadSize=192.000mb (201326592 bytes)}. 2020-08-19 11:23:08,101 INFO 
org.apache.flink.yarn.YarnResourceManager [] - Creating container launch 
context for TaskManagers 2020-08-19 11:23:08,101 INFO 
org.apache.flink.yarn.YarnResourceManager [] - Starting TaskManagers 2020-08-19 
11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - Removing 
container request Capability[]Priority[1]. 2020-08-19 
11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager [] - Accepted 1 
requested containers, returned 0 excess containers, 0 pending container 
requests of resource . 2020-08-19 11:23:08,102 INFO 
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Processing 
Event EventType: START_CONTAINER for Container 
container_e07_1596440446172_0094_01_69 2020-08-19 11:23:10,851 ERROR 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler [] 
- Unhandled exception. 
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
 No TaskExecutor registered under container_e07_1596440446172_0094_01_68. 
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_191] at java.lang.reflect.Method.invoke(Method.java:498) 
~[?:1.8.0_191] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.actor.ActorCell.invoke(ActorCell.scala:561) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
~[flink-dist_2.11-1.11.0.jar:1.11.0] at 
akka.dispatch.Mailbox.run(Mailbox.scala:225) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]

回复: flinkSQL eventtime问题

2020-08-19 文章 xiao cai
Hi:
create table kafka (
nested_field Row(event_time timestamp(3), other_field string),
watermark for nested_field.event_time as {watermark_definition}
) with (
‘connector' = ‘kafka'
)
看看这样能够使用


Best,
xiao cai


 原始邮件 
发件人: ★猛★
收件人: user-zh
发送时间: 2020年8月19日(周三) 17:48
主题: 回复: flinkSQL eventtime问题


我是通过kafka直接注册tablesource ,数据是avro的。 
descriptor.inAppendMode().registerTableSource(source_table_name); 
我们在想怎么把avro里嵌套的某个字段用作eventime 
-- 原始邮件 -- 发件人: "user-zh" 


Re: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-18 文章 xiao cai
Hi
Flink版本是 1.11.0
期望的行为是将kafka中的数据实时写入hbase表
xxx这个class是在lib下的某个jar中的
任务初始运行都是正常的,jar包也是可以找到的,运行期间失败了,然后进入了restarting状态,就不停的在running和restarting状态切换
我提交任务的节点是20,然后container运行的节点是22,lib中的jar都在20节点上,所以猜测是任务运行过程中,重新分配新的container时
丢失了lib中jar资源。


Best,
xiao cai


 原始邮件 
发件人: Congxian Qiu
收件人: user-zh
发送时间: 2020年8月19日(周三) 13:34
主题: Re: Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件


Hi 你的 Flink 是哪个版本,期望的行为是什么样的? 从你给的日志看,是没有  这个 class,这个  是在你放到 lib 下的某个 
jar 包里面吗?另外你这个作业第一次运行的时候失败,还是中间中间 failover 之后恢复回来的时候失败呢? Best, Congxian xiao 
cai  于2020年8月19日周三 下午12:50写道: > 如题:link on Yarn 
启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件 > > > 我的任务时kafka source -> hbase sink > > 
> > 
任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。
 > > > Best > xiao cai > > > 错误堆栈: > 2020-08-19 11:23:08,099 INFO 
org.apache.flink.yarn.YarnResourceManager > [] - Received 1 containers. > 
2020-08-19 11:23:08,100 INFO org.apache.flink.yarn.YarnResourceManager > [] - 
Received 1 containers with resource  vCores:4>, 1 pending 
container requests. > 2020-08-19 11:23:08,100 INFO 
org.apache.flink.yarn.YarnResourceManager > [] - TaskExecutor > 
container_e07_1596440446172_0094_01_69 will be started on 10.3.15.22 > with 
TaskExecutorProcessSpec {cpuCores=4.0, frameworkHeapSize=128.000mb > (134217728 
bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), > 
taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 bytes, > 
networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb > 
(536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), > 
jvmOverheadSize=192.000mb (201326592 bytes)}. > 2020-08-19 11:23:08,101 INFO 
org.apache.flink.yarn.YarnResourceManager > [] - Creating container launch 
context for TaskManagers > 2020-08-19 11:23:08,101 INFO 
org.apache.flink.yarn.YarnResourceManager > [] - Starting TaskManagers > 
2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager > [] - 
Removing container request Capability[ vCores:4>]Priority[1]. > 
2020-08-19 11:23:08,102 INFO org.apache.flink.yarn.YarnResourceManager > [] - 
Accepted 1 requested containers, returned 0 excess > containers, 0 pending 
container requests of resource  vCores:4>. > 2020-08-19 
11:23:08,102 INFO > 
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - > 
Processing Event EventType: START_CONTAINER for Container > 
container_e07_1596440446172_0094_01_69 > 2020-08-19 11:23:10,851 ERROR > 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler > 
[] - Unhandled exception. > 
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
 > No TaskExecutor registered under container_e07_1596440446172_0094_01_68. 
> at > 
org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560)
 > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at 
sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?] > at > 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 > ~[?:1.8.0_191] > at java.lang.reflect.Method.invoke(Method.java:498) 
~[?:1.8.0_191] > at > 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
 > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at > 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
 > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at > 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at > 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 > ~[flink-dist_2.11-1.11.0.jar:1.11.0] > at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] > at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] > at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] > at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] > at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] > at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] > at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] > at 
akka.actor.Actor$class.aroundReceive(Actor.scala:517) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] > at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > 
~[flink-dist_2.11-1.11.0.jar:1.11.0] > at 
akka.actor.Act

Flink on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件

2020-08-18 文章 xiao cai
如题:link on Yarn 启动任务,任务重启后貌似taskmanager中会丢失lib中的jar文件


我的任务时kafka source -> hbase sink


任务申请到新的container后,启动任务时会丢失原本存在的class文件,怀疑是重新申请的container中没有获取到lib中的资源,是否应该将lib中资源放到hdfs?该如何配置。


Best
xiao cai


错误堆栈:
2020-08-19 11:23:08,099 INFO  org.apache.flink.yarn.YarnResourceManager 
   [] - Received 1 containers.
2020-08-19 11:23:08,100 INFO  org.apache.flink.yarn.YarnResourceManager 
   [] - Received 1 containers with resource , 1 
pending container requests.
2020-08-19 11:23:08,100 INFO  org.apache.flink.yarn.YarnResourceManager 
   [] - TaskExecutor container_e07_1596440446172_0094_01_69 will be 
started on 10.3.15.22 with TaskExecutorProcessSpec {cpuCores=4.0, 
frameworkHeapSize=128.000mb (134217728 bytes), frameworkOffHeapSize=128.000mb 
(134217728 bytes), taskHeapSize=384.000mb (402653174 bytes), taskOffHeapSize=0 
bytes, networkMemSize=128.000mb (134217730 bytes), managedMemorySize=512.000mb 
(536870920 bytes), jvmMetaspaceSize=256.000mb (268435456 bytes), 
jvmOverheadSize=192.000mb (201326592 bytes)}.
2020-08-19 11:23:08,101 INFO  org.apache.flink.yarn.YarnResourceManager 
   [] - Creating container launch context for TaskManagers
2020-08-19 11:23:08,101 INFO  org.apache.flink.yarn.YarnResourceManager 
   [] - Starting TaskManagers
2020-08-19 11:23:08,102 INFO  org.apache.flink.yarn.YarnResourceManager 
   [] - Removing container request Capability[]Priority[1].
2020-08-19 11:23:08,102 INFO  org.apache.flink.yarn.YarnResourceManager 
   [] - Accepted 1 requested containers, returned 0 excess containers, 
0 pending container requests of resource .
2020-08-19 11:23:08,102 INFO  
org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl [] - Processing 
Event EventType: START_CONTAINER for Container 
container_e07_1596440446172_0094_01_69
2020-08-19 11:23:10,851 ERROR 
org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerDetailsHandler [] 
- Unhandled exception.
org.apache.flink.runtime.resourcemanager.exceptions.UnknownTaskExecutorException:
 No TaskExecutor registered under container_e07_1596440446172_0094_01_68.
at 
org.apache.flink.runtime.resourcemanager.ResourceManager.requestTaskManagerInfo(ResourceManager.java:560)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source) ~[?:?]
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_191]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 ~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
~[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.11-1.11.0.jar:1.11.0]
2020-08-19 11:23:10,

Re: Print SQL connector无法正常使用

2020-08-17 文章 xiao cai
Hi china_tao:
你好,HBase肯定没有问题的,请问你可以正常使用print connector吗,能否让我看看正确的使用姿势,感谢


 原始邮件 
发件人: china_tao
收件人: user-zh
发送时间: 2020年8月17日(周一) 23:00
主题: Re: Print SQL connector无法正常使用


String createHbaseSql = CREATE TABLE dimension ( rowKey STRING, cf ROW, tas BIGINT ) WITH ( 'connector.type' = 'hbase', 
'connector.version' = '1.4.3', 'connector.table-name' = ’test', 
'connector.write.buffer-flush.max-rows' = '10', 'connector.zookeeper.quorum' = 
‘IP:port', 'connector.zookeeper.znode.parent' = '/hbase', ); 
tableEnv.executeSql(createHbaseSql); Table queryTable = 
tableEnv.sqlQuery("select * from dimension"); 
tableEnv.toAppendStream(queryTable, Row.class).print(); 
你先用这种方式,看看能不能打印出来,证明你hbase没有问题。然后在用print_table。 -- Sent from: 
http://apache-flink.147419.n8.nabble.com/

Print SQL connector无法正常使用

2020-08-16 文章 xiao cai
Hi All:
目前使用flink sql的Print SQL connector,想要将查询的结果打印出来,结果报错:
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.TableSinkFactory' in
the classpath.


可以保证:HBase-connector是在lib包下存在的,是否我还需要在lib下添加什么依赖?


下面为执行的sql:


CREATE TABLE dimension ( 
rowKey STRING, 
cf ROW, 
tas BIGINT 
) WITH ( 
'connector.type' = 'hbase', 
'connector.version' = '1.4.3', 
'connector.table-name' = ’test', 
'connector.write.buffer-flush.max-rows' = '10', 
'connector.zookeeper.quorum' = ‘IP:port', 
'connector.zookeeper.znode.parent' = '/hbase', 
);


CREATE TABLE print_table (
 f0 STRING,
 f1 INT,
 f2 BIGINT,
 f3 BIGINT
) WITH (
 'connector' = 'print'
);


insert into print_table
select rowKey, cf.age, cf.area, tas
from dimension

答复: HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys

2020-08-14 文章 xiao cai
Hi Jark:
感谢回答,我发现是我join的时候,是想将hbase作为维表使用的,但是我遗漏了for system_time as 
of语句,添加后就不会再报这个错了。
另外有个问题想请教:1.11中新版hbase 
connector只是指with中指定version为1.4所创建的表吗,我发现使用1.4.3的版本,也是可以正常使用的。是不是说明pk在1.4和1.4.3两个版本上都是生效的?
再次感谢。


Best
Xiao Cai

发送自 Windows 10 版邮件应用

发件人: Jark Wu
发送时间: 2020年8月14日 23:23
收件人: user-zh
主题: Re: HBase Sink报错:UpsertStreamTableSink requires that Table has a full 
primary keys

 PK 的问题在1.11 已经解决了,你可以用下1.11 提供的新版 hbase connector,可以在 DDL 上指定 PK,所以 query
推导不出 PK 也不会报错了。
 see more:
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/hbase.html


Best,
Jark


On Thu, 13 Aug 2020 at 14:27, xiao cai  wrote:

> Hi All:
> 使用flink-sql写入hbase sink时报错:
> UpsertStreamTableSink requires that Table has a full primary keys if it is
> updated.
>
>
> 我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表
> kafka source表与hbase 维表left join后的结果insert到hbase sink表中:
> sql如下:
> create table user_click_source(
> `id` bigint,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint,
> `catalog_id` int,
> `device_id` int,
> `user_id` int,
> `proc_time` timestamp(3)
> PRIMARY KEY (id) NOT ENFORCED
> )with(
> 'connector.type' = 'kafka',
> ……
> )
> ;
> create table dim_user(
> `rowkey` varchar,
> cf ROW<
> `id` int,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint
> >,
> ts bigint
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
>
>
> create table dim_device(
> `rowkey` varchar,
> cf ROW<
> `id` int,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint
> >
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
>
>
> create table dim_catalog(
> `rowkey` varchar,
> cf ROW<
> `id` int,
> `name` varchar,
> `kafka_partition` int,
> `event_time` bigint,
> `write_time` bigint,
> `snapshot_time` bigint,
> `max_snapshot_time` bigint
> >
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
> create table hbase_full_user_click_case1_sink(
> `rowkey` bigint,
> cf ROW<
> `click_id` bigint,
> `click_name` varchar,
> `click_partition` int,
> `click_event_time` bigint,
> `click_write_time` bigint,
> `click_snapshot_time` bigint,
> `click_max_snapshot_time` bigint,
> `catalog_id` int,
> `catalog_name` varchar,
> `catalog_partition` int,
> `catalog_event_time` bigint,
> `catalog_write_time` bigint,
> `catalog_snapshot_time` bigint,
> `catalog_max_snapshot_time` bigint,
> `device_id` int,
> `device_name` varchar,
> `device_partition` int,
> `device_event_time` bigint,
> `device_write_time` bigint,
> `device_snapshot_time` bigint,
> `device_max_snapshot_time` bigint,
> `user_id` int,
> `user_name` varchar,
> `user_partition` int,
> `user_event_time` bigint,
> `user_write_time` bigint,
> `user_snapshot_time` bigint,
> `user_max_snapshot_time` bigint
> >,
> PRIMARY KEY (rowkey) NOT ENFORCED
> )with(
> 'connector.type'='hbase',
> ……
> )
> ;
> insert into hbase_full_user_click_case1_sink
> select
> `click_id`,
> ROW(
> `click_id`,
> `click_name`,
> `click_partition`,
> `click_event_time`,
> `click_write_time`,
> `click_snapshot_time`,
> `click_max_snapshot_time`,
> `catalog_id`,
> `catalog_name`,
> `catalog_partition`,
> `catalog_event_time`,
> `catalog_write_time`,
> `catalog_snapshot_time`,
> `catalog_max_snapshot_time`,
> `device_id`,
> `device_name`,
> `device_partition`,
> `device_event_time`,
> `device_write_time`,
> `device_snapshot_time`,
> `device_max_snapshot_time`,
> `user_id`,
> `user_name`,
> `user_partition`,
> `user_event_time`,
> `user_write_time`,
> `user_snapshot_time`,
> `user_max_snapshot_time`
> )
> from (select
> click.id as `click_id`,
> click.name as `click_name`,
> click.kafka_partition as `click_partition`,
> click.event_time as `click_event_time`,
> click.write_time as `click_write_time`,
> click.snapshot_time as `click_snapshot_time`,
> click.max_snapshot_time as `click_max_snapshot_time`,
> cat.cf.id as `catalog_id`,
> cat.cf.name as `catalog_name`,
> cat.cf.kafka_partition as `catalog_partition`,
> cat.cf.event_time as `catalog_event_time`,
> cat.cf.write_time as `catalog_write_time`,
> cat.cf.snapshot_time as `catalog_snapshot_time`,
> cat.cf.max_snapshot_time as `catalog_max_snapshot_time`,
> dev.cf.id as `device_

HBase Sink报错:UpsertStreamTableSink requires that Table has a full primary keys

2020-08-12 文章 xiao cai
Hi All:
使用flink-sql写入hbase sink时报错:
UpsertStreamTableSink requires that Table has a full primary keys if it is 
updated.


我共创建了4张表,1张kafka source表,3张hbase 维表,1张hbase sink表
kafka source表与hbase 维表left join后的结果insert到hbase sink表中:
sql如下:
create table user_click_source(
`id` bigint,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint,
`catalog_id` int,
`device_id` int,
`user_id` int,
`proc_time` timestamp(3)
PRIMARY KEY (id) NOT ENFORCED
)with(
'connector.type' = 'kafka',
……
)
;
create table dim_user(
`rowkey` varchar,
cf ROW<
`id` int,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint
>,
ts bigint
)with(
'connector.type'='hbase',
……
)
;


create table dim_device(
`rowkey` varchar,
cf ROW<
`id` int,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint
>
)with(
'connector.type'='hbase',
……
)
;


create table dim_catalog(
`rowkey` varchar,
cf ROW<
`id` int,
`name` varchar,
`kafka_partition` int,
`event_time` bigint,
`write_time` bigint,
`snapshot_time` bigint,
`max_snapshot_time` bigint
>
)with(
'connector.type'='hbase',
……
)
;
create table hbase_full_user_click_case1_sink(
`rowkey` bigint,
cf ROW<
`click_id` bigint,
`click_name` varchar,
`click_partition` int,
`click_event_time` bigint,
`click_write_time` bigint,
`click_snapshot_time` bigint,
`click_max_snapshot_time` bigint,
`catalog_id` int,
`catalog_name` varchar,
`catalog_partition` int,
`catalog_event_time` bigint,
`catalog_write_time` bigint,
`catalog_snapshot_time` bigint,
`catalog_max_snapshot_time` bigint,
`device_id` int,
`device_name` varchar,
`device_partition` int,
`device_event_time` bigint,
`device_write_time` bigint,
`device_snapshot_time` bigint,
`device_max_snapshot_time` bigint,
`user_id` int,
`user_name` varchar,
`user_partition` int,
`user_event_time` bigint,
`user_write_time` bigint,
`user_snapshot_time` bigint,
`user_max_snapshot_time` bigint
>,
PRIMARY KEY (rowkey) NOT ENFORCED
)with(
'connector.type'='hbase',
……
)
;
insert into hbase_full_user_click_case1_sink
select
`click_id`,
ROW(
`click_id`,
`click_name`,
`click_partition`,
`click_event_time`,
`click_write_time`,
`click_snapshot_time`,
`click_max_snapshot_time`,
`catalog_id`,
`catalog_name`,
`catalog_partition`,
`catalog_event_time`,
`catalog_write_time`,
`catalog_snapshot_time`,
`catalog_max_snapshot_time`,
`device_id`,
`device_name`,
`device_partition`,
`device_event_time`,
`device_write_time`,
`device_snapshot_time`,
`device_max_snapshot_time`,
`user_id`,
`user_name`,
`user_partition`,
`user_event_time`,
`user_write_time`,
`user_snapshot_time`,
`user_max_snapshot_time`
)
from (select
click.id as `click_id`,
click.name as `click_name`,
click.kafka_partition as `click_partition`,
click.event_time as `click_event_time`,
click.write_time as `click_write_time`,
click.snapshot_time as `click_snapshot_time`,
click.max_snapshot_time as `click_max_snapshot_time`,
cat.cf.id as `catalog_id`,
cat.cf.name as `catalog_name`,
cat.cf.kafka_partition as `catalog_partition`,
cat.cf.event_time as `catalog_event_time`,
cat.cf.write_time as `catalog_write_time`,
cat.cf.snapshot_time as `catalog_snapshot_time`,
cat.cf.max_snapshot_time as `catalog_max_snapshot_time`,
dev.cf.id as `device_id`,
dev.cf.name as `device_name`,
dev.cf.kafka_partition as `device_partition`,
dev.cf.event_time as `device_event_time`,
dev.cf.write_time as `device_write_time`,
dev.cf.snapshot_time as `device_snapshot_time`,
dev.cf.max_snapshot_time as `device_max_snapshot_time`,
u.cf.id as `user_id`,
u.cf.name as `user_name`,
u.cf.kafka_partition as `user_partition`,
u.cf.event_time as `user_event_time`,
u.cf.write_time as `user_write_time`,
u.cf.snapshot_time as `user_snapshot_time`,
u.cf.max_snapshot_time as `user_max_snapshot_time`


from (select
id,
`name`,
`kafka_partition`,
`event_time`,
`write_time`,
`snapshot_time`,
`max_snapshot_time`,
cast(catalog_id as varchar) as catalog_key,
cast(device_id as varchar) as device_key,
cast(user_id as varchar) as user_key,
`catalog_id`,
`device_id`,
`user_id`,
`proc_time`,
`event_time`,
FROM user_click_source
GROUP BY TUMBLE(event_time, INTERVAL '1' SECOND),
`id`,
`name`,
`kafka_partition`,
`event_time`,
`write_time`,
`snapshot_time`,
`max_snapshot_time`,
`catalog_id`,
`device_id`,
`user_id`,
`proc_time`) click


left join dim_catalog cat on click.catalog_key = cat.rowkey
left join dim_device dev on click.device_key = dev.rowkey
left join dim_user u on click.user_key = u.rowkey and click.event_time = u.ts
) t

Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 xiao cai
Dear Leonard Xu:
我会去关注这个issue,非常感谢答疑。


 原始邮件 
发件人: Leonard Xu
收件人: user-zh
发送时间: 2020年8月12日(周三) 16:05
主题: Re: 使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update 
changes


Hi Group by 和 left join 都是会有 retract 消息的,这类消息需要UpsertStreamTableSink才能处理, Kafka 
connetor 目前的实现是AppendStreamTableSink,所以不能处理 社区已经有一个issue在处理这个问题了,应该1.12能提供这个功能。 
Best Leonard [1]https://issues.apache.org/jira/browse/FLINK-18826 
<">https://issues.apache.org/jira/browse/FLINK-18826> > 在 2020年8月12日,15:58,xiao 
cai  写道: > > Hi Jark: > 版本:1.11.0 > 问题:flink-sql,数据经过group by 
和left join后写入kafka sink,会在语法校验阶段报错: > AppendStreamTableSink doesn't support 
consuming update changes which is produced by node GroupAggregate > > > 
我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka > > > 附上执行sql: > 
create table kafka_table_1 ( > `shop_id` varchar, > `user_id` bigint, > 
`category_id` int, > `ts` bigint, > `row_time` timestamp(3), > `proc_time` 
timestamp(3), > ) with ( > 'connector.type' = 'kafka', > 'connector.version' = 
'universal', > 'connector.topic' = 'user_visit_1', > 'connector.startup-mode' = 
'latest-offset', > 'connector.properties.bootstrap.servers' = 'ip:9092', > 
'connector.properties.zookeeper.connect' = 'ip:2181', > 'update-mode' = 
'append', > 'format.type' = 'avro-registry', > 'format.schema-subject' = 
'user_visit', > 'format.schema-url'='http://ip:8081', > ) > > > CREATE TABLE 
hbase_table ( > rowKey STRING, > cf ROW > ) WITH ( > 
'connector.type' = 'hbase', > 'connector.version' = '1.4.3', > 
'connector.table-name' = 'hbase_table', > 'connector.zookeeper.quorum' = 
'ip:2181', > 'connector.zookeeper.znode.parent' = '/hbase', > 
'connector.write.buffer-flush.max-rows' = '1000' > ) > > > > > create table 
kafka_table_2 ( > `shop_id` varchar, > `age` varchar, > `area` varchar > ) with 
( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 
'connector.topic' = 'user_visit_2', > 'connector.startup-mode' = 
'latest-offset', > 'connector.properties.bootstrap.servers' = 'ip:9092', > 
'connector.properties.zookeeper.connect' = 'ip:2181', > 'update-mode' = 
'append', > 'format.type' = 'avro-registry', > 'format.schema-subject' = 
'user_visit', > 'format.schema-url'='http://ip:8081', > ) > > > insert into 
kafka_table_2(shop_id, user_id, category_id, ts, row_time, proc_time) > select 
shop_id, age, area > from kafka_table_1 left join hbase_table > for system_time 
as of kafka_table_1.proc_time as temp on kafka_table_1.shop_id = temp.rowKey > 
group by shop_id, age, area > > > 原始邮件 > 发件人: xiao cai > 收件人: 
user-zh > 发送时间: 2020年8月12日(周三) 15:41 > 主题: 
AppendStreamTableSink doesn't support consuming update changes > > > Hi Jark: 
版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错: 
AppendStreamTableSink doesn't support consuming update changes which is 
produced by node GroupAggregate

使用Kafka Sink时报错:AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 xiao cai
Hi Jark:
版本:1.11.0
问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:
AppendStreamTableSink doesn't support consuming update changes which is 
produced by node GroupAggregate


我希望能够在sql校验时也能使upsert操作作用于kafka sink,或者等upsert完成,再写入kafka


附上执行sql:
create table kafka_table_1 (  
`shop_id` varchar,  
`user_id` bigint,  
`category_id` int, 
`ts` bigint,  
`row_time` timestamp(3), 
`proc_time` timestamp(3), 
) with (  
'connector.type' = 'kafka',  
'connector.version' = 'universal',  
'connector.topic' = 'user_visit_1',  
'connector.startup-mode' = 'latest-offset',  
'connector.properties.bootstrap.servers' = 'ip:9092',  
'connector.properties.zookeeper.connect' = 'ip:2181',  
'update-mode' = 'append', 
'format.type' = 'avro-registry', 
'format.schema-subject' = 'user_visit', 
'format.schema-url'='http://ip:8081', 
)


CREATE TABLE hbase_table ( 
rowKey STRING, 
cf ROW 
) WITH ( 
'connector.type' = 'hbase', 
'connector.version' = '1.4.3', 
'connector.table-name' = 'hbase_table', 
'connector.zookeeper.quorum' = 'ip:2181', 
'connector.zookeeper.znode.parent' = '/hbase', 
'connector.write.buffer-flush.max-rows' = '1000' 
)




create table kafka_table_2 (  
`shop_id` varchar,  
`age` varchar,  
`area` varchar
) with (  
'connector.type' = 'kafka',  
'connector.version' = 'universal',  
'connector.topic' = 'user_visit_2',  
'connector.startup-mode' = 'latest-offset',  
'connector.properties.bootstrap.servers' = 'ip:9092',  
'connector.properties.zookeeper.connect' = 'ip:2181',  
    'update-mode' = 'append', 
'format.type' = 'avro-registry', 
'format.schema-subject' = 'user_visit', 
'format.schema-url'='http://ip:8081', 
)


insert into kafka_table_2(shop_id, user_id, category_id, ts, row_time, 
proc_time)
select shop_id, age, area 
from kafka_table_1 left join hbase_table
for system_time as of kafka_table_1.proc_time as temp on kafka_table_1.shop_id 
= temp.rowKey
group by shop_id, age, area


 原始邮件 
发件人: xiao cai
收件人: user-zh
发送时间: 2020年8月12日(周三) 15:41
主题: AppendStreamTableSink doesn't support consuming update changes


Hi Jark: 版本:1.11.0 问题:flink-sql,数据经过group by 和left join后写入kafka 
sink,会在语法校验阶段报错: AppendStreamTableSink doesn't support consuming update changes 
which is produced by node GroupAggregate

AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 xiao cai
Hi Jark:


版本:1.11.0
问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:
AppendStreamTableSink doesn't support consuming update changes which is 
produced by node GroupAggregate

AppendStreamTableSink doesn't support consuming update changes

2020-08-12 文章 xiao cai
Hi Jark:


版本:1.11.0
问题:flink-sql,数据经过group by 和left join后写入kafka sink,会在语法校验阶段报错:

Re: flink row 类型

2020-07-23 文章 xiao cai
Hi ,Dream


比如你最终拿到的是Row(10),10表示有10个字段,这些字段的顺序是固定的,那么你可以把每个字段在row里的索引的映射关系保存下来,如下 
map ,然后 row.getField(map.get(fieldName))获取你需要的值


 原始邮件 
发件人: Dream-底限
收件人: user-zh
发送时间: 2020年7月23日(周四) 14:57
主题: Re: flink row 类型


hi、xiao cai 可以说一下思路吗,我没太懂 》》可以考虑把字段索引值保存下来再获取 Dream-底限  
于2020年7月23日周四 下午2:56写道: > hi、Jingsong Li > 
我查看了对应的api,并运行了demo测试,通过CallContext我可以拿到对应的字段类型,但是无法拿到对应的字段名称 > > 
>>在TypeInference中有input的type,这个type应该是包含字段信息的。 > > xiao cai  
于2020年7月23日周四 下午2:19写道: > >> 可以考虑把字段索引值保存下来再获取 >> >> >> 原始邮件 >> 发件人: 
Dream-底限 >> 收件人: user-zh >> 
发送时间: 2020年7月23日(周四) 14:08 >> 主题: Re: flink row 类型 >> >> >> hi 
是的,我们的数据场景比较尴尬,那我想其他方式实现一下 Benchao Li  >> 于2020年7月23日周四 
下午12:55写道: > 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。 > >> 
你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。 > > Dream-底限 < >> 
zhan...@akulaku.com> 于2020年7月22日周三 下午7:22写道: > > > hi、 > > >> 
我这面定义row数据,类型为ROW,可以通过 > > >> 
row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口 > > > > >> 
rule_key 转换为rule_key1,rulekey2 > > 1 > > 2 > > > > > -- > > Best, > Benchao >> 
Li > > >

Re: flink row 类型

2020-07-22 文章 xiao cai
可以考虑把字段索引值保存下来再获取


 原始邮件 
发件人: Dream-底限
收件人: user-zh
发送时间: 2020年7月23日(周四) 14:08
主题: Re: flink row 类型


hi 是的,我们的数据场景比较尴尬,那我想其他方式实现一下 Benchao Li  于2020年7月23日周四 
下午12:55写道: > 这个应该是做不到的。name只是SQL plan过程的东西,在运行时它就没有什么实际意义了。 > 
你是想在udf里面获取row里面每个字段的名字是吧?如果是的话,我理解现在应该是做不到的。 > > Dream-底限 
 于2020年7月22日周三 下午7:22写道: > > > hi、 > > 
我这面定义row数据,类型为ROW,可以通过 > > 
row.getField(i)获取到对应的值,但是我想获取对应的row_name名称要怎么操作,貌似没有报漏获取名称的接口 > > > > rule_key 
转换为rule_key1,rulekey2 > > 1 > > 2 > > > > > -- > > Best, > Benchao Li >

Re:回复:flink1.11 set yarn slots failed

2020-07-16 文章 xiao cai
可以看这里https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html

 原始邮件 
发件人: Zhou Zach
收件人: user-zh
发送时间: 2020年7月16日(周四) 15:28
主题: Re:回复:flink1.11 set yarn slots failed


-D前缀好使,要设置yarn name用什么参数啊,1.11官网的文档有些都不好使了 在 2020-07-16 15:03:14,"flinkcx" 
 写道: >是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4 > > 
> 原始邮件 >发件人: Zhou Zach >收件人: Flink user-zh mailing 
list >发送时间: 2020年7月16日(周四) 14:51 >主题: flink1.11 set 
yarn slots failed > > >Hi all, 使用如下命令,设置Number of slots per TaskManager 
/opt/flink-1.11.0/bin/flink run-application -t yarn-application \ 
-Djobmanager.memory.process.size=1024m \ 
-Dtaskmanager.memory.process.size=2048m \ -ys 4 \ 
发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number
 of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task 
Slots的值,现在每次提交作业都是0

Re:回复:flink1.11 set yarn slots failed

2020-07-16 文章 xiao cai
可以看这里


 原始邮件 
发件人: Zhou Zach
收件人: user-zh
发送时间: 2020年7月16日(周四) 15:28
主题: Re:回复:flink1.11 set yarn slots failed


-D前缀好使,要设置yarn name用什么参数啊,1.11官网的文档有些都不好使了 在 2020-07-16 15:03:14,"flinkcx" 
 写道: >是不是应该用-D作为前缀来设置,比如-Dtaskmanager.numberOfTaskSlots=4 > > 
> 原始邮件 >发件人: Zhou Zach >收件人: Flink user-zh mailing 
list >发送时间: 2020年7月16日(周四) 14:51 >主题: flink1.11 set 
yarn slots failed > > >Hi all, 使用如下命令,设置Number of slots per TaskManager 
/opt/flink-1.11.0/bin/flink run-application -t yarn-application \ 
-Djobmanager.memory.process.size=1024m \ 
-Dtaskmanager.memory.process.size=2048m \ -ys 4 \ 
发现并不能override/opt/flink-1.11.0/bin/flink/conf/flink-conf.yaml中的默认值,每次要调整只能通过更改flink-conf.yaml的方式才能生效,请问使用run-application的方式,怎样设置Number
 of slots per TaskManager? 另外,有哪些方式可以增Flink UI中的大Available Task 
Slots的值,现在每次提交作业都是0