Re: flink 1.11 rest api saveppoint接口 异常

2020-07-28 文章 Congxian Qiu
Hi
   创建了一个 Issue[1] 来跟进这个问题

[1] https://issues.apache.org/jira/browse/FLINK-18748
Best,
Congxian


Congxian Qiu  于2020年7月29日周三 下午1:24写道:

> Hi  taowang
>感谢你的更新,这个地方应该是 savepoint trigger 的逻辑有问题,现在确实
> setMinPauseBetweenCheckpoints 会影响 savepoint,我创建一个 issue 来跟进一下这个问题
>
> Best,
> Congxian
>
>
> taowang  于2020年7月29日周三 下午12:29写道:
>
>> 我再次确认了一下,可能是因为我设置了checkpoint的setMinPauseBetweenCheckpoints,所以在上一次
>> checkpoint 和这个间隔之间触发 savepoint 不会生效,但是接口返回了IN_PROGRESS 的状态,我觉得这里应该是有点问题的。
>>
>>
>>  原始邮件
>> 发件人: taowang
>> 收件人: user-zh
>> 发送时间: 2020年7月28日(周二) 18:53
>> 主题: Re: flink 1.11 rest api saveppoint接口 异常
>>
>>
>> 是的,其实无论是否开启了unaligned checkpoint,我在调用这个接口的时候都没有 checkpoint 在做。
>> 而且等待的话,我认为如果有正在做的,那么正在做的 checkpoint执行完成之后新的 savepoint
>> 应该会开始执行吧,但我看到的现象是等了半个小时依旧是 IN_PROGRESS状态,正常状态下,一个 checkpoint 的执行时间也就几秒钟,正常的
>> savpoint 执行完成最多也只需要几分钟。 原始邮件 发件人: Congxian Qiu
>> 收件人: user-zh 发送时间: 2020年7月28日(周二) 18:09 主题:
>> Re: flink 1.11 rest api saveppoint接口 异常 Hi 开启 unalign checkpoint 的情况下,如果有
>> checkpoint 正在做的话,那么 savepoint 会等待的[1],但是把 unaligned checkpoint
>> 关闭之后,还有这个现象看上去不太符合预期。关闭之后这种现象出现的时候,也有 checkpoint 正在做吗? [1]
>> https://issues.apache.org/jira/browse/FLINK-17342 Best, Congxian taowang
>>  于2020年7月28日周二 下午5:05写道: > 在升级了 flink
>> 1.11之后,我在使用的时候发现 rest api 的 /jobs/:jobid/savepoints 接口表现有点异常: > 在 flink
>> 1.10 时:当请求该接口后,在 flink ui 可以看到 savepoint >
>> 被触发,/jobs/:jobid/savepoints/:triggerid 返回IN_PROGRESS,等 savepoint >
>> 成功之后jobs/:jobid/savepoints/:triggerid返回COMPLETED。 > 但是在flink
>> 1.11中:经常出现(不是必现,但是概率也不低) /jobs/:jobid/savepoints >
>> 接口正常返回,/jobs/:jobid/savepoints/:triggerid 也返回IN_PROGRESS,但是在flink ui 中看不到 >
>> savepoint 被触发,而且/jobs/:jobid/savepoints/:triggerid 一直返回IN_PROGRESS。 >
>> 我怀疑这个是不是和我开了 unaligned checkpoint 有关,但是我在 >
>> flink-config.yaml中把execution.checkpointing.unaligned设置为false还是会出现这种问题,请问大家有什么了解吗?
>> > > > rest api flink docs 链接: >
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/rest_api.html#jobs-jobid-savepoints
>> > > > 祝好~
>
>


Re: flink 1.11 rest api saveppoint接口 异常

2020-07-28 文章 Congxian Qiu
Hi  taowang
   感谢你的更新,这个地方应该是 savepoint trigger 的逻辑有问题,现在确实
setMinPauseBetweenCheckpoints 会影响 savepoint,我创建一个 issue 来跟进一下这个问题

Best,
Congxian


taowang  于2020年7月29日周三 下午12:29写道:

> 我再次确认了一下,可能是因为我设置了checkpoint的setMinPauseBetweenCheckpoints,所以在上一次
> checkpoint 和这个间隔之间触发 savepoint 不会生效,但是接口返回了IN_PROGRESS 的状态,我觉得这里应该是有点问题的。
>
>
>  原始邮件
> 发件人: taowang
> 收件人: user-zh
> 发送时间: 2020年7月28日(周二) 18:53
> 主题: Re: flink 1.11 rest api saveppoint接口 异常
>
>
> 是的,其实无论是否开启了unaligned checkpoint,我在调用这个接口的时候都没有 checkpoint 在做。
> 而且等待的话,我认为如果有正在做的,那么正在做的 checkpoint执行完成之后新的 savepoint
> 应该会开始执行吧,但我看到的现象是等了半个小时依旧是 IN_PROGRESS状态,正常状态下,一个 checkpoint 的执行时间也就几秒钟,正常的
> savpoint 执行完成最多也只需要几分钟。 原始邮件 发件人: Congxian Qiu
> 收件人: user-zh 发送时间: 2020年7月28日(周二) 18:09 主题: Re:
> flink 1.11 rest api saveppoint接口 异常 Hi 开启 unalign checkpoint 的情况下,如果有
> checkpoint 正在做的话,那么 savepoint 会等待的[1],但是把 unaligned checkpoint
> 关闭之后,还有这个现象看上去不太符合预期。关闭之后这种现象出现的时候,也有 checkpoint 正在做吗? [1]
> https://issues.apache.org/jira/browse/FLINK-17342 Best, Congxian taowang <
> taow...@deepglint.com> 于2020年7月28日周二 下午5:05写道: > 在升级了 flink
> 1.11之后,我在使用的时候发现 rest api 的 /jobs/:jobid/savepoints 接口表现有点异常: > 在 flink
> 1.10 时:当请求该接口后,在 flink ui 可以看到 savepoint >
> 被触发,/jobs/:jobid/savepoints/:triggerid 返回IN_PROGRESS,等 savepoint >
> 成功之后jobs/:jobid/savepoints/:triggerid返回COMPLETED。 > 但是在flink
> 1.11中:经常出现(不是必现,但是概率也不低) /jobs/:jobid/savepoints >
> 接口正常返回,/jobs/:jobid/savepoints/:triggerid 也返回IN_PROGRESS,但是在flink ui 中看不到 >
> savepoint 被触发,而且/jobs/:jobid/savepoints/:triggerid 一直返回IN_PROGRESS。 >
> 我怀疑这个是不是和我开了 unaligned checkpoint 有关,但是我在 >
> flink-config.yaml中把execution.checkpointing.unaligned设置为false还是会出现这种问题,请问大家有什么了解吗?
> > > > rest api flink docs 链接: >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/rest_api.html#jobs-jobid-savepoints
> > > > 祝好~


Re: flink1.11.1启动问题

2020-07-28 文章 Xintong Song
这个是 hadoop 2.x 的已知设计缺陷。

hadoop 2.x 中,container request 没有唯一的标识,且分配下来的 container
的资源和请求的资源也可能不同,为了将分下来的 container 对应到之前的 request,flink 不得不去进行归一化的计算。如果 yarn
集群 RM 和 AM 机器上的配置不一致,是有潜在风险的。hadoop 3.x 中改进了这一问题,每个 container request 都有一个
id,可以用来将分配到的 container 与之前的 request 对应起来。出于对旧版本 hadoop 的兼容性考虑,flink
仍然采用的是计算资源的方式匹配 container。

flink 1.9 当中没有遇到这个问题,是因为默认所有 container 都是相同规格的,所以省略了匹配过程。目前 flink
社区正在开发支持申请不同规格 container 调度能力,因此在 1.11 种增加了验证 container 资源的逻辑。

Thank you~

Xintong Song



On Tue, Jul 28, 2020 at 2:46 PM 酷酷的浑蛋  wrote:

> 谢谢你,我将flink-conf.yaml的taskmanager.memory.process.size由1728m调成2048m,这个问题解决了,
> 但是我觉得,这个问题应该很常见才对,怎么偏被我碰上了,flink这个设计是正常的吗?
>
>
>
>
> 在2020年07月28日 10:26,Xintong Song 写道:
> 建议确认一下 Yarn 的配置 “yarn.scheduler.minimum-allocation-mb” 在 Yarn RM 和 Flink JM
> 这两台机器上是否一致。
>
> Yarn 会对 container request 做归一化。例如你请求的 TM container 是 1728m
> (taskmanager.memory.process.size) ,如果 minimum-allocation-mb 是 1024m,那么实际得到的
> container 大小必须是 minimum-allocation-mb 的整数倍,也就是 2048m。Flink 会去获取 Yarn 的配置,计算
> container request 实际分到的 container 应该多大,并对分到的 container 进行检查。现在看 JM 日志,分下来的
> container 并没有通过这个检查,造成 Flink 认为 container 规格不匹配。这里最可能的原因是 Flink 拿到的
> minimum-allocation-mb 和 Yarn RM 实际使用的不一致。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Mon, Jul 27, 2020 at 7:42 PM 酷酷的浑蛋  wrote:
>
>
> 首先,flink1.9提交到yarn集群是没有问题的,同等的配置提交flink1.11.1到yarn集群就报下面的错误
> 2020-07-27 17:08:14,661 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>
> 
> 2020-07-27 17:08:14,665 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> Starting YarnJobClusterEntrypoint (Version: 1.11.1, Scala: 2.11,
> Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)
> 2020-07-27 17:08:14,665 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  OS
> current user: hadoop
> 2020-07-27 17:08:15,417 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Current
> Hadoop/Kerberos user: wangty
> 2020-07-27 17:08:15,418 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  JVM:
> Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.191-b12
> 2020-07-27 17:08:15,418 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Maximum
> heap size: 429 MiBytes
> 2020-07-27 17:08:15,418 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> JAVA_HOME: /usr/local/jdk/
> 2020-07-27 17:08:15,419 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Hadoop
> version: 2.7.7
> 2020-07-27 17:08:15,419 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  JVM
> Options:
> 2020-07-27 17:08:15,419 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> -Xmx469762048
> 2020-07-27 17:08:15,419 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> -Xms469762048
> 2020-07-27 17:08:15,419 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> -XX:MaxMetaspaceSize=268435456
> 2020-07-27 17:08:15,419 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
>
> -Dlog.file=/data/emr/yarn/logs/application_1568724479991_18850539/container_e25_1568724479991_18850539_01_01/jobmanager.log
> 2020-07-27 17:08:15,419 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> -Dlog4j.configuration=file:log4j.properties
> 2020-07-27 17:08:15,419 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> -Dlog4j.configurationFile=file:log4j.properties
> 2020-07-27 17:08:15,419 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Program
> Arguments: (none)
> 2020-07-27 17:08:15,419 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
> Classpath:
>
> 

Re: Sql往kafka表写聚合数据报错

2020-07-28 文章 Benchao Li
你可以用Canal或者Debezium format来写入kafka,那样就支持update和delete消息了。

op <520075...@qq.com> 于2020年7月29日周三 上午11:59写道:

> 如下,想用sql直接往kafka写聚合结果,版本是1.11,请问能有什么办法解决,还是只能转换成datastream??
>
>
> 谢谢
>
>
> Exception in thread "main" org.apache.flink.table.api.TableException:
> Table sink 'default_catalog.default_database.mvp_rtdwb_user_business'
> doesn't support consuming update changes which is produced by node
> GroupAggregate(groupBy=[dt, user_id], select=[dt, user_id, SUM($f2) AS
> text_feed_count, SUM($f3) AS picture_feed_count, SUM($f4) AS
> be_comment_forward_user_count, SUM($f5) AS share_link_count, SUM($f6) AS
> share_music_count, SUM($f7) AS share_video_count, SUM($f8) AS follow_count,
> SUM($f9) AS direct_post_count, SUM($f10) AS comment_post_count, SUM($f11)
> AS comment_count, SUM($f12) AS fans_count, MAX(event_time) AS event_time])



-- 

Best,
Benchao Li


Re: flink 1.11 rest api saveppoint接口 异常

2020-07-28 文章 taowang
我再次确认了一下,可能是因为我设置了checkpoint的setMinPauseBetweenCheckpoints,所以在上一次 checkpoint 
和这个间隔之间触发 savepoint 不会生效,但是接口返回了IN_PROGRESS 的状态,我觉得这里应该是有点问题的。


 原始邮件 
发件人: taowang
收件人: user-zh
发送时间: 2020年7月28日(周二) 18:53
主题: Re: flink 1.11 rest api saveppoint接口 异常


是的,其实无论是否开启了unaligned checkpoint,我在调用这个接口的时候都没有 checkpoint 在做。 
而且等待的话,我认为如果有正在做的,那么正在做的 checkpoint执行完成之后新的 savepoint 
应该会开始执行吧,但我看到的现象是等了半个小时依旧是 IN_PROGRESS状态,正常状态下,一个 checkpoint 的执行时间也就几秒钟,正常的 
savpoint 执行完成最多也只需要几分钟。 原始邮件 发件人: Congxian Qiu 收件人: 
user-zh 发送时间: 2020年7月28日(周二) 18:09 主题: Re: flink 1.11 
rest api saveppoint接口 异常 Hi 开启 unalign checkpoint 的情况下,如果有 checkpoint 正在做的话,那么 
savepoint 会等待的[1],但是把 unaligned checkpoint 
关闭之后,还有这个现象看上去不太符合预期。关闭之后这种现象出现的时候,也有 checkpoint 正在做吗? [1] 
https://issues.apache.org/jira/browse/FLINK-17342 Best, Congxian taowang 
 于2020年7月28日周二 下午5:05写道: > 在升级了 flink 1.11之后,我在使用的时候发现 
rest api 的 /jobs/:jobid/savepoints 接口表现有点异常: > 在 flink 1.10 时:当请求该接口后,在 flink 
ui 可以看到 savepoint > 被触发,/jobs/:jobid/savepoints/:triggerid 返回IN_PROGRESS,等 
savepoint > 成功之后jobs/:jobid/savepoints/:triggerid返回COMPLETED。 > 但是在flink 
1.11中:经常出现(不是必现,但是概率也不低) /jobs/:jobid/savepoints > 
接口正常返回,/jobs/:jobid/savepoints/:triggerid 也返回IN_PROGRESS,但是在flink ui 中看不到 > 
savepoint 被触发,而且/jobs/:jobid/savepoints/:triggerid 一直返回IN_PROGRESS。 > 
我怀疑这个是不是和我开了 unaligned checkpoint 有关,但是我在 > 
flink-config.yaml中把execution.checkpointing.unaligned设置为false还是会出现这种问题,请问大家有什么了解吗?
 > > > rest api flink docs 链接: > 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/rest_api.html#jobs-jobid-savepoints
 > > > 祝好~

Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 文章 godfrey he
Yes, The pr still needs to be improved.
In most cases, there are more than one statement in the sql file,
so -f option should support multiple statements.
however, a related PR [1] has not completed yet.

[1] https://github.com/apache/flink/pull/8738

Best,
Godfrey

Jun Zhang  于2020年7月29日周三 上午10:17写道:

> hi,godfrey:
> Thanks for your reply
>
> 1. I have seen the -u parameter, but my sql file may not only include
> 'insert into select ', but also SET, DDL, etc.
>
> 2. I may not have noticed this issue. I took a look at this issue. I think
> this issue may have some problems. For example, he finally called the
> CliClient.callCommand method.
> But I think that many options in callCommand are not completely suitable
> for sql files, such as HELP, CLEAR, SELECT, etc. The select operation opens
> a window to display the results, obviously this is not suitable for
> executing sql files
>
> godfrey he  于2020年7月29日周三 上午9:56写道:
>
>> hi Jun,
>>
>> Currently, sql client has supported -u option, just like:
>>  ./bin/sql-client.sh embedded -u "insert_statement".
>>
>> There is already a JIRA [1] that wants to support -f option
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12828
>>
>> Best,
>> Godfrey
>>
>> Jun Zhang  于2020年7月29日周三 上午9:22写道:
>>
>>> I want to execute some flink sql batch jobs regularly, such as 'insert
>>> into
>>> select .', but I can't find a suitable method so far, so reference
>>>  hive, I changed the source code and add a  '--filename'  parameter  so
>>> that we can execute a sql file.
>>>
>>> like this:
>>>
>>> /home/flink/bin/sql-client.sh embedded -f flink.sql
>>>
>>> what about any ideas or plans for this feature community?
>>>
>>


Flink提交任务时可以使用-djs参数,但没有看到相关介绍

2020-07-28 文章 zhoujibo
 

提交任务时可以使用-dis 传递依赖的jar包,如下命令:

./bin/flink run -c com.test.TestData -p 2 -djs $(echo ./test/libs/*.jar | tr
' ' ',') ./test/test-data-1.0.jar 

 

但-djs这个参数官网上貌似没有相关介绍,参数有什么文档介绍吗

 

 



Sql??kafka????????????????

2020-07-28 文章 op
??sql??kafka??1.11??datastream





Exception in thread "main" org.apache.flink.table.api.TableException: Table 
sink 'default_catalog.default_database.mvp_rtdwb_user_business' doesn't support 
consuming update changes which is produced by node GroupAggregate(groupBy=[dt, 
user_id], select=[dt, user_id, SUM($f2) AS text_feed_count, SUM($f3) AS 
picture_feed_count, SUM($f4) AS be_comment_forward_user_count, SUM($f5) AS 
share_link_count, SUM($f6) AS share_music_count, SUM($f7) AS share_video_count, 
SUM($f8) AS follow_count, SUM($f9) AS direct_post_count, SUM($f10) AS 
comment_post_count, SUM($f11) AS comment_count, SUM($f12) AS fans_count, 
MAX(event_time) AS event_time])

Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-28 文章 Dream-底限
hi 鱼子酱、
我当初这样用的时候状态也不清理(子查询+时间窗口+union),后来把时间窗口改成全局group函数,union改成订阅topic列表后,设置状态过期时间状态才清理。。。
后来看资料有的说分区数据不均衡导致水印不推进的话可能导致这种状态不清理的问题,但是我感觉不是水印导致的,水印导致的窗口应该不触发计算吧,感觉这里面有些bug,需要专业人士定位一下

鱼子酱 <384939...@qq.com> 于2020年7月29日周三 上午9:53写道:

> 您好:
>
> 我按照您说的试了看了一下watermark,
> 发现可以 正常更新,相关的计算结果也没发现问题。
> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
> 
> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
> 
> 
>
>
>
> Congxian Qiu wrote
> > Hi
> > SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state
> 越来越大的情况,或许可以检查下
> > watermark[1]
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
> >
> > Best,
> > Congxian
> >
> >
> > 鱼子酱 <
>
> > 384939718@
>
> >> 于2020年7月28日周二 下午2:45写道:
> >
> >> Hi,社区的各位大家好:
> >> 我目前生产上面使用的是1.8.2版本,相对稳定
> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本
> >>
> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
> >> 状态后端使用的是rocksdb 的增量模式
> >> StateBackend backend =new
> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
> >> 设置了官网文档中找到的删除策略:
> >> TableConfig tableConfig = streamTableEnvironment.getConfig();
> >> tableConfig.setIdleStateRetentionTime(Time.minutes(2),
> >> Time.minutes(7));
> >>
> >> 请问是我使用的方式不对吗?
> >>
> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
> >>
> >>
> >>
> >> 版本影响:flink1.10.1 flink1.11.1
> >> planner:blink planner
> >> source : kafka source
> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>
> >>
> >>
> >>
> >>
> >> sql:
> >> insert into  result
> >> select request_time ,request_id ,request_cnt ,avg_resptime
> >> ,stddev_resptime ,terminal_cnt
> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)
> >> from
> >> (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> >> ,commandId as request_id
> >> ,count(*) as request_cnt
> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
> >> stddev_resptime
> >> from log
> >> where
> >> commandId in (104005 ,204005 ,404005)
> >> and errCode=0 and attr=0
> >> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
> >>
> >> union all
> >>
> >> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> >> ,
> >> ,count(*) as request_cnt
> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
> >> stddev_resptime
> >> from log
> >> where
> >> commandId in (104005 ,204005 ,404005)
> >> and errCode=0 and attr=0
> >> group by TUMBLE(times, INTERVAL '1' MINUTE)
> >> )
> >>
> >>
> >> source:
> >>
> >> create table log (
> >>   eventTime bigint
> >>   ,times timestamp(3)
> >>   
> >>   ,commandId integer
> >>   ,watermark for times as times - interval '5' second
> >> )
> >> with(
> >>  'connector' = 'kafka-0.10',
> >>  'topic' = '……',
> >>  'properties.bootstrap.servers' = '……',
> >>  'properties.group.id' = '……',
> >>  'scan.startup.mode' = 'latest-offset',
> >>  'format' = 'json'
> >> )
> >>
> >> sink1:
> >> create table result (
> >>   request_time varchar
> >>   ,request_id integer
> >>   ,request_cnt bigint
> >>   ,avg_resptime double
> >>   ,stddev_resptime double
> >>   ,insert_time varchar
> >> ) with (
> >>   'connector' = 'kafka-0.10',
> >>   'topic' = '……',
> >>   'properties.bootstrap.servers' = '……',
> >>   'properties.group.id' = '……',
> >>   'scan.startup.mode' = 'latest-offset',
> >>   'format' = 'json'
> >> )
> >>
> >>
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-28 文章 Benchao Li
这个问题我建议先区分下是SQL operator里面没有清理state,还是state backend本身没有清理state。
这样你是否可以尝试下其他的state backend,以及非增量模式的rocksdb等?如果在所有state backend场景下,
state都是一直上涨的,那有可能某个SQL operator里面对state的清理可能有些问题。

鱼子酱 <384939...@qq.com> 于2020年7月29日周三 上午9:47写道:

> 您好:
>
> 我按照您说的试了看了一下watermark,
> 发现可以 正常更新,相关的计算结果也没发现问题。
> 1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
> 
> 2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
> 
> 
>
>
>
> Congxian Qiu wrote
> > Hi
> > SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state
> 越来越大的情况,或许可以检查下
> > watermark[1]
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
> >
> > Best,
> > Congxian
> >
> >
> > 鱼子酱 <
>
> > 384939718@
>
> >> 于2020年7月28日周二 下午2:45写道:
> >
> >> Hi,社区的各位大家好:
> >> 我目前生产上面使用的是1.8.2版本,相对稳定
> >> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
> >> 截止目前先后研究了1.10.1 1.11.1共2个大版本
> >>
> >> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
> >> 状态后端使用的是rocksdb 的增量模式
> >> StateBackend backend =new
> >> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
> >> 设置了官网文档中找到的删除策略:
> >> TableConfig tableConfig = streamTableEnvironment.getConfig();
> >> tableConfig.setIdleStateRetentionTime(Time.minutes(2),
> >> Time.minutes(7));
> >>
> >> 请问是我使用的方式不对吗?
> >>
> >> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
> >>
> >>
> >>
> >> 版本影响:flink1.10.1 flink1.11.1
> >> planner:blink planner
> >> source : kafka source
> >> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> >>
> >>
> >>
> >>
> >>
> >> sql:
> >> insert into  result
> >> select request_time ,request_id ,request_cnt ,avg_resptime
> >> ,stddev_resptime ,terminal_cnt
> >> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)
> >> from
> >> (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> >> ,commandId as request_id
> >> ,count(*) as request_cnt
> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
> >> stddev_resptime
> >> from log
> >> where
> >> commandId in (104005 ,204005 ,404005)
> >> and errCode=0 and attr=0
> >> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
> >>
> >> union all
> >>
> >> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> >> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> >> ,
> >> ,count(*) as request_cnt
> >> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> >> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
> >> stddev_resptime
> >> from log
> >> where
> >> commandId in (104005 ,204005 ,404005)
> >> and errCode=0 and attr=0
> >> group by TUMBLE(times, INTERVAL '1' MINUTE)
> >> )
> >>
> >>
> >> source:
> >>
> >> create table log (
> >>   eventTime bigint
> >>   ,times timestamp(3)
> >>   
> >>   ,commandId integer
> >>   ,watermark for times as times - interval '5' second
> >> )
> >> with(
> >>  'connector' = 'kafka-0.10',
> >>  'topic' = '……',
> >>  'properties.bootstrap.servers' = '……',
> >>  'properties.group.id' = '……',
> >>  'scan.startup.mode' = 'latest-offset',
> >>  'format' = 'json'
> >> )
> >>
> >> sink1:
> >> create table result (
> >>   request_time varchar
> >>   ,request_id integer
> >>   ,request_cnt bigint
> >>   ,avg_resptime double
> >>   ,stddev_resptime double
> >>   ,insert_time varchar
> >> ) with (
> >>   'connector' = 'kafka-0.10',
> >>   'topic' = '……',
> >>   'properties.bootstrap.servers' = '……',
> >>   'properties.group.id' = '……',
> >>   'scan.startup.mode' = 'latest-offset',
> >>   'format' = 'json'
> >> )
> >>
> >>
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-flink.147419.n8.nabble.com/
> >>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li


Re: 解析kafka的mysql binlog问题

2020-07-28 文章 admin
直接转成string1.11版本还不支持,会在1.12修复,参考jira[1]

[1]https://issues.apache.org/jira/browse/FLINK-18002 


> 2020年7月28日 下午5:20,air23  写道:
> 
> 你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致
> 另外想请教下 1.11 版本  datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的,
> 但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-28 16:02:18,"Jark Wu"  写道:
>> 因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
>> 1.12 中已经支持读取复杂结构为 string 类型了。
>> 
>> Best,
>> Jark
>> 
>> On Tue, 28 Jul 2020 at 15:36, air23  wrote:
>> 
>>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>> 
>>> 
>>> {
>>>"data":[
>>>{
>>>"op_id":"97037138",
>>>"order_id":"84172164"
>>>}
>>>],
>>>"database":"order_11",
>>>"es":1595720375000,
>>>"id":17469027,
>>>"isDdl":false,
>>>"mysqlType":{
>>>"op_id":"int(11)",
>>>"order_id":"int(11)"
>>>},
>>>"old":null,
>>>"pkNames":[
>>>"op_id"
>>>],
>>>"sql":"",
>>>"sqlType":{
>>>"op_id":4,
>>>"order_id":4
>>>},
>>>"table":"order_product",
>>>"ts":1595720375837,
>>>"type":"INSERT"
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-07-28 14:44:35,"Jark Wu"  写道:
 有kafka 中json 数据的样例不?
 有没有看过 TaskManager 中有没有异常 log 信息?
 
 
 
 On Tue, 28 Jul 2020 at 09:40, air23  wrote:
 
> 你好 测试代码如下
> 
> 
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'source_databases'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
> public static void main(String[] args) throws Exception {
> 
> 
> //bink table
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>EnvironmentSettings bsSettings =
> 
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
> 
>TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
> 
> 
> tableResult.print();
> 
>Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
> 
> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
> 
> bsEnv.execute("aa");
> 
> }
> 
> 
> 
> 
> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
> ,order_operation_time
> ,inventory_batch_log
> ,order_log
> ,order_address_book
> ,product_inventory
> ,order_physical_relation
> ,bil_business_attach
> ,picking_detail
> ,picking_detail
> ,orders
> 
> 
> 
> 
> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
> 看到例子都是useOldPlanner 来转table的。
> 致谢
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
>> 抱歉,还是没有看到附件。
>> 如果是文本的话,你可以直接贴到邮件里。
>> 
>> On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>> 
>>> 我再上传一次
>>> 
>>> 在2020年07月27日 18:55,Jark Wu  写道:
>>> 
>>> Hi,
>>> 你的附件好像没有上传。
>>> 
>>> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>>> 
 *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
>>> 不能取到data呢?*
 
 private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
>>> (\n"
> +
" `data` VARCHAR , " +
" `table` VARCHAR " +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'order_source'," +
" 'properties.bootstrap.servers' = '***'," +
" 'properties.group.id' = 'real1'," +
" 'format' = 'json'," +
" 'scan.startup.mode' = 'earliest-offset'" +
")";
 
 
 具体见附件 有打印
 
 
 
 
 
>>> 
>>> 
> 
>>> 



Re: [DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 文章 Jun Zhang
hi,godfrey:
Thanks for your reply

1. I have seen the -u parameter, but my sql file may not only include
'insert into select ', but also SET, DDL, etc.

2. I may not have noticed this issue. I took a look at this issue. I think
this issue may have some problems. For example, he finally called the
CliClient.callCommand method.
But I think that many options in callCommand are not completely suitable
for sql files, such as HELP, CLEAR, SELECT, etc. The select operation opens
a window to display the results, obviously this is not suitable for
executing sql files

godfrey he  于2020年7月29日周三 上午9:56写道:

> hi Jun,
>
> Currently, sql client has supported -u option, just like:
>  ./bin/sql-client.sh embedded -u "insert_statement".
>
> There is already a JIRA [1] that wants to support -f option
>
> [1] https://issues.apache.org/jira/browse/FLINK-12828
>
> Best,
> Godfrey
>
> Jun Zhang  于2020年7月29日周三 上午9:22写道:
>
>> I want to execute some flink sql batch jobs regularly, such as 'insert
>> into
>> select .', but I can't find a suitable method so far, so reference
>>  hive, I changed the source code and add a  '--filename'  parameter  so
>> that we can execute a sql file.
>>
>> like this:
>>
>> /home/flink/bin/sql-client.sh embedded -f flink.sql
>>
>> what about any ideas or plans for this feature community?
>>
>


flink1.11 可以使用processtime开窗,但是无法使用eventtime开窗

2020-07-28 文章 111






您好,请教一个问题,谢谢:
很简单的json,
{"num":100,"ts":1595949526874,"vin":""}
{"num":200,"ts":1595949528874,"vin":""}
{"num":200,"ts":1595949530880,"vin":""}
{"num":300,"ts":1595949532883,"vin":""}
{"num":100,"ts":1595949534888,"vin":""}
{"num":300,"ts":1595949536892,"vin":""}
我就想使用eventtime开窗,但是亲测使用procetime可以,但是eventtime死活都不行,郁闷,望指教。
public class FlinkKafka {
public static void main(String[] args) throws Exception{
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, 
settings);

String kafkaSourceTable = "CREATE TABLE kafkaSourceTable (\n" +
" ts BIGINT,\n" +
" num INT ,\n" +
" vin STRING ,\n" +
" pts AS PROCTIME() ,  \n" +  //处理时间
" rowtime AS TO_TIMESTAMP(FROM_UNIXTIME(ts / 1000, '-MM-dd HH:mm:ss')), \n 
" +
"  WATERMARK FOR rowtime AS rowtime - INTERVAL '1' SECOND \n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'kkb',\n" +
" 'properties.bootstrap.servers' = 'node01:9092,node02:9092,node03:9092',\n" +
" 'properties.group.id' = 'mm',\n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'latest-offset' \n" +
")";
tableEnv.executeSql(kafkaSourceTable);

String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group 
by ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)";
final Table windowAllTable = tableEnv.sqlQuery(queryWindowAllDataSql);

windowAllTable.printSchema();
tableEnv.toAppendStream(windowAllTable, Row.class).print();

System.out.println("--");
env.execute("job");

}

}


---
请看,我这里String queryWindowAllDataSql = "SELECT * from kafkaSourceTable  group by 
ts,num,vin,pts,rowtime, TUMBLE(pts, INTERVAL '5' SECOND)"
如果使用TUMBLE(pts, INTERVAL '5' SECOND)",即使用processtime就没有任何问题,可以每格几秒输出所有的内容。
打印结果:
root
 |-- ts: BIGINT
 |-- num: INT
 |-- vin: STRING
 |-- pts: TIMESTAMP(3) NOT NULL *PROCTIME*
 |-- rowtime: TIMESTAMP(3) *ROWTIME*


--
11> 1595949629063,500,,2020-07-28T15:20:29.066,2020-07-28T23:20:29
7> 1595949627062,500,,2020-07-28T15:20:27.101,2020-07-28T23:20:27
7> 1595949631067,100,,2020-07-28T15:20:31.071,2020-07-28T23:20:31
12> 1595949633072,500,,2020-07-28T15:20:33.077,2020-07-28T23:20:33
11> 1595949637081,400,,2020-07-28T15:20:37.085,2020-07-28T23:20:37
2> 1595949635077,400,,2020-07-28T15:20:35.082,2020-07-28T23:20:35
11> 1595949639085,100,,2020-07-28T15:20:39.089,2020-07-28T23:20:39
1> 1595949643093,200,,2020-07-28T15:20:43.096,2020-07-28T23:20:43


但是如果我使用TUMBLE(rowtime, INTERVAL '5' SECOND),也就是想使用eventtime开窗,就没有任何的结果输出,一直在等待。
版本是flink1.11.0


望指教,谢谢!




Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-28 文章 鱼子酱
您好:

我按照您说的试了看了一下watermark,
发现可以 正常更新,相关的计算结果也没发现问题。
1. 刚刚截了图在下面,时间因为时区的问题-8就正常了
 
2. checkpoint里面的信息,能看出大小是线性增长的,然后主要集中在2个窗口和group里面。
 
 



Congxian Qiu wrote
> Hi
> SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state 越来越大的情况,或许可以检查下
> watermark[1]
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html
> 
> Best,
> Congxian
> 
> 
> 鱼子酱 <

> 384939718@

>> 于2020年7月28日周二 下午2:45写道:
> 
>> Hi,社区的各位大家好:
>> 我目前生产上面使用的是1.8.2版本,相对稳定
>> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
>> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>>
>> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
>> 状态后端使用的是rocksdb 的增量模式
>> StateBackend backend =new
>> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
>> 设置了官网文档中找到的删除策略:
>> TableConfig tableConfig = streamTableEnvironment.getConfig();
>> tableConfig.setIdleStateRetentionTime(Time.minutes(2),
>> Time.minutes(7));
>>
>> 请问是我使用的方式不对吗?
>>
>> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>>
>>
>>
>> 版本影响:flink1.10.1 flink1.11.1
>> planner:blink planner
>> source : kafka source
>> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>
>>
>>
>>
>>
>> sql:
>> insert into  result
>> select request_time ,request_id ,request_cnt ,avg_resptime
>> ,stddev_resptime ,terminal_cnt
>> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19)
>> from
>> (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> ,commandId as request_id
>> ,count(*) as request_cnt
>> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> stddev_resptime
>> from log
>> where
>> commandId in (104005 ,204005 ,404005)
>> and errCode=0 and attr=0
>> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>>
>> union all
>>
>> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
>> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
>> ,
>> ,count(*) as request_cnt
>> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
>> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as
>> stddev_resptime
>> from log
>> where
>> commandId in (104005 ,204005 ,404005)
>> and errCode=0 and attr=0
>> group by TUMBLE(times, INTERVAL '1' MINUTE)
>> )
>>
>>
>> source:
>>
>> create table log (
>>   eventTime bigint
>>   ,times timestamp(3)
>>   
>>   ,commandId integer
>>   ,watermark for times as times - interval '5' second
>> )
>> with(
>>  'connector' = 'kafka-0.10',
>>  'topic' = '……',
>>  'properties.bootstrap.servers' = '……',
>>  'properties.group.id' = '……',
>>  'scan.startup.mode' = 'latest-offset',
>>  'format' = 'json'
>> )
>>
>> sink1:
>> create table result (
>>   request_time varchar
>>   ,request_id integer
>>   ,request_cnt bigint
>>   ,avg_resptime double
>>   ,stddev_resptime double
>>   ,insert_time varchar
>> ) with (
>>   'connector' = 'kafka-0.10',
>>   'topic' = '……',
>>   'properties.bootstrap.servers' = '……',
>>   'properties.group.id' = '……',
>>   'scan.startup.mode' = 'latest-offset',
>>   'format' = 'json'
>> )
>>
>>
>>
>>
>>
>> --
>> Sent from: http://apache-flink.147419.n8.nabble.com/
>>





--
Sent from: http://apache-flink.147419.n8.nabble.com/


[DISCUSS] add a '--filename' parameter for sql-client

2020-07-28 文章 Jun Zhang
I want to execute some flink sql batch jobs regularly, such as 'insert into
select .', but I can't find a suitable method so far, so reference
 hive, I changed the source code and add a  '--filename'  parameter  so
that we can execute a sql file.

like this:

/home/flink/bin/sql-client.sh embedded -f flink.sql

what about any ideas or plans for this feature community?


Re: Flink SQL 解析复杂(嵌套)JSON的问题 以及写入到hive类型映射问题

2020-07-28 文章 Leonard Xu
Hello

> 问题是:
> 如果json array 里还有一个array 也是继续嵌套定义吗? 这个数据是要写入到hive,该怎么映射,array  
> ,怎么映射成Hive类型,比如映射成array,这种情况的json该如何处理? 有没有什么办法直接把json 
> array,直接映射成array,试了一下发现不行,该如何处理这种复杂类型。


Json format有一个issue在解这个问题[1],可以把jsonNode强制转成 string, 1.12里会支持,可以看下.

Best
Leonard
[1] https://issues.apache.org/jira/browse/FLINK-18002 


Re: flink作业提交到集群执行异常

2020-07-28 文章 hechuan
定位到问题了,我这里是scala的版本不一致导致的
部分maven引用的2.11,部分引用的2.12,统一版本后这个报错就不存在了




--
Sent from: http://apache-flink.147419.n8.nabble.com/

Flink SQL 解析复杂(嵌套)JSON的问题 以及写入到hive类型映射问题

2020-07-28 文章 kandy.wang
json格式,如果是一个json array 该如何定义 schema,array里还可能存在嵌套json array的情况。

如数据:
{"user_info":{"user_id":"0111","name":"xxx"},"timestam":1586676835655,"id":"10001","jsonArray":[{"name222":"xxx","user_id222":"0022"},{"name333":"name","user_id222":"user"},{"cc":"xxx333","user_id444":"user","name444":"name"}]}


参照:https://www.cnblogs.com/Springmoon-venn/p/12664547.html
需要schema这样定义:
user_info 定义成:ROW
jsonArray 定义成 : ARRAY>


问题是:
如果json array 里还有一个array 也是继续嵌套定义吗? 这个数据是要写入到hive,该怎么映射,array  
,怎么映射成Hive类型,比如映射成array,这种情况的json该如何处理? 有没有什么办法直接把json 
array,直接映射成array,试了一下发现不行,该如何处理这种复杂类型。

回复:flink 聚合 job 重启问题

2020-07-28 文章 郑斌斌
 需要通过checkpoint恢复启动才没有问题,不知道为什么是这样
--
发件人:RS 
发送时间:2020年7月27日(星期一) 15:50
收件人:user-zh@flink.apache.org ; 郑斌斌 

主 题:Re:flink 聚合 job 重启问题

伪代码发下看看?看下jdbc sink的配置,是不是支持删除记录,更新的时候旧记录被删除了

在 2020-07-27 11:33:31,"郑斌斌"  写道:
>hi all :
>
> 请教个问题,我通过程序拉取kafka消息后,注册为flink流表。然后执行sql: "select user_id, count(*)cnt 
> from 流表", 将结果写入到mysql 聚合表中(SINK组件为:flink1.11版本JdbcUpsertTableSink)。
>但问题是,每次JOB重启后,之前mysql 聚合表结果会被清空。我设置了checkpoint和racksdbbackendstate.
>
>Thanks
>
>





Re: flink 1.11 rest api saveppoint接口 异常

2020-07-28 文章 taowang
是的,其实无论是否开启了unaligned checkpoint,我在调用这个接口的时候都没有 checkpoint 在做。
而且等待的话,我认为如果有正在做的,那么正在做的 checkpoint执行完成之后新的 savepoint 
应该会开始执行吧,但我看到的现象是等了半个小时依旧是 IN_PROGRESS状态,正常状态下,一个 checkpoint 的执行时间也就几秒钟,正常的 
savpoint 执行完成最多也只需要几分钟。




 原始邮件 
发件人: Congxian Qiu
收件人: user-zh
发送时间: 2020年7月28日(周二) 18:09
主题: Re: flink 1.11 rest api saveppoint接口 异常


Hi 开启 unalign checkpoint 的情况下,如果有 checkpoint 正在做的话,那么 savepoint 会等待的[1],但是把 
unaligned checkpoint 关闭之后,还有这个现象看上去不太符合预期。关闭之后这种现象出现的时候,也有 checkpoint 正在做吗? [1] 
https://issues.apache.org/jira/browse/FLINK-17342 Best, Congxian taowang 
 于2020年7月28日周二 下午5:05写道: > 在升级了 flink 1.11之后,我在使用的时候发现 
rest api 的 /jobs/:jobid/savepoints 接口表现有点异常: > 在 flink 1.10 时:当请求该接口后,在 flink 
ui 可以看到 savepoint > 被触发,/jobs/:jobid/savepoints/:triggerid 返回IN_PROGRESS,等 
savepoint > 成功之后jobs/:jobid/savepoints/:triggerid返回COMPLETED。 > 但是在flink 
1.11中:经常出现(不是必现,但是概率也不低) /jobs/:jobid/savepoints > 
接口正常返回,/jobs/:jobid/savepoints/:triggerid 也返回IN_PROGRESS,但是在flink ui 中看不到 > 
savepoint 被触发,而且/jobs/:jobid/savepoints/:triggerid 一直返回IN_PROGRESS。 > 
我怀疑这个是不是和我开了 unaligned checkpoint 有关,但是我在 > 
flink-config.yaml中把execution.checkpointing.unaligned设置为false还是会出现这种问题,请问大家有什么了解吗?
 > > > rest api flink docs 链接: > 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/rest_api.html#jobs-jobid-savepoints
 > > > 祝好~

Re: flink 1.11 rest api saveppoint接口 异常

2020-07-28 文章 Congxian Qiu
Hi
   开启 unalign checkpoint 的情况下,如果有 checkpoint 正在做的话,那么 savepoint 会等待的[1],但是把
unaligned checkpoint 关闭之后,还有这个现象看上去不太符合预期。关闭之后这种现象出现的时候,也有 checkpoint 正在做吗?

[1] https://issues.apache.org/jira/browse/FLINK-17342
Best,
Congxian


taowang  于2020年7月28日周二 下午5:05写道:

> 在升级了 flink 1.11之后,我在使用的时候发现 rest api 的 /jobs/:jobid/savepoints 接口表现有点异常:
> 在 flink 1.10 时:当请求该接口后,在 flink ui 可以看到 savepoint
> 被触发,/jobs/:jobid/savepoints/:triggerid 返回IN_PROGRESS,等 savepoint
> 成功之后jobs/:jobid/savepoints/:triggerid返回COMPLETED。
> 但是在flink 1.11中:经常出现(不是必现,但是概率也不低) /jobs/:jobid/savepoints
> 接口正常返回,/jobs/:jobid/savepoints/:triggerid 也返回IN_PROGRESS,但是在flink ui 中看不到
> savepoint 被触发,而且/jobs/:jobid/savepoints/:triggerid 一直返回IN_PROGRESS。
> 我怀疑这个是不是和我开了 unaligned checkpoint 有关,但是我在
> flink-config.yaml中把execution.checkpointing.unaligned设置为false还是会出现这种问题,请问大家有什么了解吗?
>
>
> rest api flink docs 链接:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/rest_api.html#jobs-jobid-savepoints
>
>
> 祝好~


Flink操作 kafka ,hive遇到kerberos过期

2020-07-28 文章 lydata
 请问下 Flink操作 kafka ,hive遇到kerberos过期  有什么解决方法吗?

Re: flink作业提交到集群执行异常

2020-07-28 文章 hechuan
Hi,
遇到了同样的问题,请教下是如何解决的?
编译jar包为单独的jar,非jar-with-dependencies的方式,依赖的jar包放到了自定义一个目录lib1
修改了bin/flink脚本,CC_CLASSPATH追加了自定义jar目录lib1,lib1目录下能找到这个类
$ grep org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer *
Binary file flink-sql-connector-kafka_2.12-1.11.1.jar matches

flink/lib目录和自定义的lib目录里面没有重复的文件

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load
user class: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
ClassLoader info: URL ClassLoader:
file:
'/tmp/blobStore-72796b24-2be1-4bc7-ac86-7acd1fe16b48/job_f532b11a7342424cdc0695126071f96e/blob_p-3dc0956f6379ef65c8f54997d7fe4a4d0918064c-b96f6980c6e06d9618abd63d25c1cee6'
(valid JAR)
Class not resolvable through given classloader.
at
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:288)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.runtime.tasks.OperatorChain.(OperatorChain.java:126)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:453)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_251]
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
~[?:1.8.0_251]
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
~[?:1.8.0_251]
at
org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
~[flink-dist_2.11-1.11.1.jar:1.11.1]




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re:Re: Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 air23
你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致
另外想请教下 1.11 版本  datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的,
但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。

















在 2020-07-28 16:02:18,"Jark Wu"  写道:
>因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
>1.12 中已经支持读取复杂结构为 string 类型了。
>
>Best,
>Jark
>
>On Tue, 28 Jul 2020 at 15:36, air23  wrote:
>
>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>
>>
>> {
>> "data":[
>> {
>> "op_id":"97037138",
>> "order_id":"84172164"
>> }
>> ],
>> "database":"order_11",
>> "es":1595720375000,
>> "id":17469027,
>> "isDdl":false,
>> "mysqlType":{
>> "op_id":"int(11)",
>> "order_id":"int(11)"
>> },
>> "old":null,
>> "pkNames":[
>> "op_id"
>> ],
>> "sql":"",
>> "sqlType":{
>> "op_id":4,
>> "order_id":4
>> },
>> "table":"order_product",
>> "ts":1595720375837,
>> "type":"INSERT"
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-28 14:44:35,"Jark Wu"  写道:
>> >有kafka 中json 数据的样例不?
>> >有没有看过 TaskManager 中有没有异常 log 信息?
>> >
>> >
>> >
>> >On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>> >
>> >> 你好 测试代码如下
>> >>
>> >>
>> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> >> " `data` VARCHAR , " +
>> >> " `table` VARCHAR " +
>> >> ") WITH (" +
>> >> " 'connector' = 'kafka'," +
>> >> " 'topic' = 'source_databases'," +
>> >> " 'properties.bootstrap.servers' = '***'," +
>> >> " 'properties.group.id' = 'real1'," +
>> >> " 'format' = 'json'," +
>> >> " 'scan.startup.mode' = 'earliest-offset'" +
>> >> ")";
>> >> public static void main(String[] args) throws Exception {
>> >>
>> >>
>> >> //bink table
>> >> StreamExecutionEnvironment bsEnv =
>> >> StreamExecutionEnvironment.getExecutionEnvironment();
>> >> EnvironmentSettings bsSettings =
>> >>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> >> StreamTableEnvironment bsTableEnv =
>> >> StreamTableEnvironment.create(bsEnv, bsSettings);
>> >>
>> >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>> >>
>> >>
>> >> tableResult.print();
>> >>
>> >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>> >>
>> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>> >>
>> >> bsEnv.execute("aa");
>> >>
>> >> }
>> >>
>> >>
>> >>
>> >>
>> >> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>> >> ,order_operation_time
>> >> ,inventory_batch_log
>> >> ,order_log
>> >> ,order_address_book
>> >> ,product_inventory
>> >> ,order_physical_relation
>> >> ,bil_business_attach
>> >> ,picking_detail
>> >> ,picking_detail
>> >> ,orders
>> >>
>> >>
>> >>
>> >>
>> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> >> 看到例子都是useOldPlanner 来转table的。
>> >> 致谢
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
>> >> >抱歉,还是没有看到附件。
>> >> >如果是文本的话,你可以直接贴到邮件里。
>> >> >
>> >> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>> >> >
>> >> >> 我再上传一次
>> >> >>
>> >> >> 在2020年07月27日 18:55,Jark Wu  写道:
>> >> >>
>> >> >> Hi,
>> >> >> 你的附件好像没有上传。
>> >> >>
>> >> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>> >> >>
>> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
>> 不能取到data呢?*
>> >> >> >
>> >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
>> (\n"
>> >> +
>> >> >> > " `data` VARCHAR , " +
>> >> >> > " `table` VARCHAR " +
>> >> >> > ") WITH (" +
>> >> >> > " 'connector' = 'kafka'," +
>> >> >> > " 'topic' = 'order_source'," +
>> >> >> > " 'properties.bootstrap.servers' = '***'," +
>> >> >> > " 'properties.group.id' = 'real1'," +
>> >> >> > " 'format' = 'json'," +
>> >> >> > " 'scan.startup.mode' = 'earliest-offset'" +
>> >> >> > ")";
>> >> >> >
>> >> >> >
>> >> >> > 具体见附件 有打印
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >>
>> >> >>
>> >>
>>


Re:回复: Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 air23
你好 使用的是1.11.1版本的 



















在 2020-07-28 16:02:30,"明启 孙" <374060...@qq.com> 写道:
>你的flink什么版本
>
>发送自 Windows 10 版邮件应用
>
>发件人: air23
>发送时间: 2020年7月28日 15:36
>收件人: user-zh@flink.apache.org
>主题: Re:Re: Re: 解析kafka的mysql binlog问题
>
>格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>
>
>{
>"data":[
>{
>"op_id":"97037138",
>"order_id":"84172164"
>}
>],
>"database":"order_11",
>"es":1595720375000,
>"id":17469027,
>"isDdl":false,
>"mysqlType":{
>"op_id":"int(11)",
>"order_id":"int(11)"
>},
>"old":null,
>"pkNames":[
>"op_id"
>],
>"sql":"",
>"sqlType":{
>"op_id":4,
>"order_id":4
>},
>"table":"order_product",
>"ts":1595720375837,
>"type":"INSERT"
>}
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-07-28 14:44:35,"Jark Wu"  写道:
>>有kafka 中json 数据的样例不?
>>有没有看过 TaskManager 中有没有异常 log 信息?
>>
>>
>>
>>On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>>
>>> 你好 测试代码如下
>>>
>>>
>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>> " `data` VARCHAR , " +
>>> " `table` VARCHAR " +
>>> ") WITH (" +
>>> " 'connector' = 'kafka'," +
>>> " 'topic' = 'source_databases'," +
>>> " 'properties.bootstrap.servers' = '***'," +
>>> " 'properties.group.id' = 'real1'," +
>>> " 'format' = 'json'," +
>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>> ")";
>>> public static void main(String[] args) throws Exception {
>>>
>>>
>>> //bink table
>>> StreamExecutionEnvironment bsEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> EnvironmentSettings bsSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>> StreamTableEnvironment bsTableEnv =
>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>>
>>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>>
>>>
>>> tableResult.print();
>>>
>>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>>
>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>>
>>> bsEnv.execute("aa");
>>>
>>> }
>>>
>>>
>>>
>>>
>>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>>> ,order_operation_time
>>> ,inventory_batch_log
>>> ,order_log
>>> ,order_address_book
>>> ,product_inventory
>>> ,order_physical_relation
>>> ,bil_business_attach
>>> ,picking_detail
>>> ,picking_detail
>>> ,orders
>>>
>>>
>>>
>>>
>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>> 看到例子都是useOldPlanner 来转table的。
>>> 致谢
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
>>> >抱歉,还是没有看到附件。
>>> >如果是文本的话,你可以直接贴到邮件里。
>>> >
>>> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>>> >
>>> >> 我再上传一次
>>> >>
>>> >> 在2020年07月27日 18:55,Jark Wu  写道:
>>> >>
>>> >> Hi,
>>> >> 你的附件好像没有上传。
>>> >>
>>> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>>> >>
>>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>>> >> >
>>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>>> +
>>> >> > " `data` VARCHAR , " +
>>> >> > " `table` VARCHAR " +
>>> >> > ") WITH (" +
>>> >> > " 'connector' = 'kafka'," +
>>> >> > " 'topic' = 'order_source'," +
>>> >> > " 'properties.bootstrap.servers' = '***'," +
>>> >> > " 'properties.group.id' = 'real1'," +
>>> >> > " 'format' = 'json'," +
>>> >> > " 'scan.startup.mode' = 'earliest-offset'" +
>>> >> > ")";
>>> >> >
>>> >> >
>>> >> > 具体见附件 有打印
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >>
>>> >>
>>>
>


Re:Re: 解析kafka的mysql binlog问题

2020-07-28 文章 air23
你好。
我猜测 是有可能是这个问题。但是我这个topic是 读取的一个库的binlog。有很多表   所以ARRAY>
这种 里面 不是固定的
 所以我想用datastream 解析 然后在根据表不同 解析成不同的table。但是发现blinkplaner 好像不可以datastream 
转换为table。或者是我没有发现这个例子
谢谢

















在 2020-07-28 16:05:55,"admin" <17626017...@163.com> 写道:
>data格式不是string,可以定义为ARRAY>
>
>> 2020年7月28日 下午3:35,air23  写道:
>> 
>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>> 
>> 
>> {
>>"data":[
>>{
>>"op_id":"97037138",
>>"order_id":"84172164"
>>}
>>],
>>"database":"order_11",
>>"es":1595720375000,
>>"id":17469027,
>>"isDdl":false,
>>"mysqlType":{
>>"op_id":"int(11)",
>>"order_id":"int(11)"
>>},
>>"old":null,
>>"pkNames":[
>>"op_id"
>>],
>>"sql":"",
>>"sqlType":{
>>"op_id":4,
>>"order_id":4
>>},
>>"table":"order_product",
>>"ts":1595720375837,
>>"type":"INSERT"
>> }
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-07-28 14:44:35,"Jark Wu"  写道:
>>> 有kafka 中json 数据的样例不?
>>> 有没有看过 TaskManager 中有没有异常 log 信息?
>>> 
>>> 
>>> 
>>> On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>>> 
 你好 测试代码如下
 
 
 private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
 " `data` VARCHAR , " +
 " `table` VARCHAR " +
 ") WITH (" +
 " 'connector' = 'kafka'," +
 " 'topic' = 'source_databases'," +
 " 'properties.bootstrap.servers' = '***'," +
 " 'properties.group.id' = 'real1'," +
 " 'format' = 'json'," +
 " 'scan.startup.mode' = 'earliest-offset'" +
 ")";
 public static void main(String[] args) throws Exception {
 
 
 //bink table
 StreamExecutionEnvironment bsEnv =
 StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
 EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv =
 StreamTableEnvironment.create(bsEnv, bsSettings);
 
TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
 
 
 tableResult.print();
 
Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
 
 bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
 
 bsEnv.execute("aa");
 
 }
 
 
 
 
 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
 ,order_operation_time
 ,inventory_batch_log
 ,order_log
 ,order_address_book
 ,product_inventory
 ,order_physical_relation
 ,bil_business_attach
 ,picking_detail
 ,picking_detail
 ,orders
 
 
 
 
 另外再问个问题。1.11版本 blink 不能datastream转table吗?
 看到例子都是useOldPlanner 来转table的。
 致谢
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 在 2020-07-27 19:44:10,"Jark Wu"  写道:
> 抱歉,还是没有看到附件。
> 如果是文本的话,你可以直接贴到邮件里。
> 
> On Mon, 27 Jul 2020 at 19:22, air23  wrote:
> 
>> 我再上传一次
>> 
>> 在2020年07月27日 18:55,Jark Wu  写道:
>> 
>> Hi,
>> 你的附件好像没有上传。
>> 
>> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>> 
>>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>>> 
>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
 +
>>>" `data` VARCHAR , " +
>>>" `table` VARCHAR " +
>>>") WITH (" +
>>>" 'connector' = 'kafka'," +
>>>" 'topic' = 'order_source'," +
>>>" 'properties.bootstrap.servers' = '***'," +
>>>" 'properties.group.id' = 'real1'," +
>>>" 'format' = 'json'," +
>>>" 'scan.startup.mode' = 'earliest-offset'" +
>>>")";
>>> 
>>> 
>>> 具体见附件 有打印
>>> 
>>> 
>>> 
>>> 
>>> 
>> 
>> 
 


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-28 文章 Congxian Qiu
Hi
SQL 部分不太熟,根据以往的经验,对于 event time 情况下 window 的某个算子 state 越来越大的情况,或许可以检查下
watermark[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/monitoring/debugging_event_time.html

Best,
Congxian


鱼子酱 <384939...@qq.com> 于2020年7月28日周二 下午2:45写道:

> Hi,社区的各位大家好:
> 我目前生产上面使用的是1.8.2版本,相对稳定
> 为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
> 截止目前先后研究了1.10.1 1.11.1共2个大版本
>
> 在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
> 状态后端使用的是rocksdb 的增量模式
> StateBackend backend =new
> RocksDBStateBackend("hdfs:///checkpoints-data/",true);
> 设置了官网文档中找到的删除策略:
> TableConfig tableConfig = streamTableEnvironment.getConfig();
> tableConfig.setIdleStateRetentionTime(Time.minutes(2),
> Time.minutes(7));
>
> 请问是我使用的方式不对吗?
>
> 通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator
>
>
>
> 版本影响:flink1.10.1 flink1.11.1
> planner:blink planner
> source : kafka source
> 时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>
>
>
>
>
> sql:
> insert into  result
> select request_time ,request_id ,request_cnt ,avg_resptime
> ,stddev_resptime ,terminal_cnt
> ,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19) from
> (   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> ,commandId as request_id
> ,count(*) as request_cnt
> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime
> from log
> where
> commandId in (104005 ,204005 ,404005)
> and errCode=0 and attr=0
> group by TUMBLE(times, INTERVAL '1' MINUTE),commandId
>
> union all
>
> select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
> MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
> ,
> ,count(*) as request_cnt
> ,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
> ,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime
> from log
> where
> commandId in (104005 ,204005 ,404005)
> and errCode=0 and attr=0
> group by TUMBLE(times, INTERVAL '1' MINUTE)
> )
>
>
> source:
>
> create table log (
>   eventTime bigint
>   ,times timestamp(3)
>   
>   ,commandId integer
>   ,watermark for times as times - interval '5' second
> )
> with(
>  'connector' = 'kafka-0.10',
>  'topic' = '……',
>  'properties.bootstrap.servers' = '……',
>  'properties.group.id' = '……',
>  'scan.startup.mode' = 'latest-offset',
>  'format' = 'json'
> )
>
> sink1:
> create table result (
>   request_time varchar
>   ,request_id integer
>   ,request_cnt bigint
>   ,avg_resptime double
>   ,stddev_resptime double
>   ,insert_time varchar
> ) with (
>   'connector' = 'kafka-0.10',
>   'topic' = '……',
>   'properties.bootstrap.servers' = '……',
>   'properties.group.id' = '……',
>   'scan.startup.mode' = 'latest-offset',
>   'format' = 'json'
> )
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


flink 1.11 rest api saveppoint接口 异常

2020-07-28 文章 taowang
在升级了 flink 1.11之后,我在使用的时候发现 rest api 的 /jobs/:jobid/savepoints 接口表现有点异常:
在 flink 1.10 时:当请求该接口后,在 flink ui 可以看到 savepoint 
被触发,/jobs/:jobid/savepoints/:triggerid 返回IN_PROGRESS,等 savepoint 
成功之后jobs/:jobid/savepoints/:triggerid返回COMPLETED。
但是在flink 1.11中:经常出现(不是必现,但是概率也不低) /jobs/:jobid/savepoints 
接口正常返回,/jobs/:jobid/savepoints/:triggerid 也返回IN_PROGRESS,但是在flink ui 中看不到 
savepoint 被触发,而且/jobs/:jobid/savepoints/:triggerid 一直返回IN_PROGRESS。
我怀疑这个是不是和我开了 unaligned checkpoint 有关,但是我在 
flink-config.yaml中把execution.checkpointing.unaligned设置为false还是会出现这种问题,请问大家有什么了解吗?


rest api flink docs 
链接:https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/rest_api.html#jobs-jobid-savepoints


祝好~

Re: 解析kafka的mysql binlog问题

2020-07-28 文章 admin
data格式不是string,可以定义为ARRAY>

> 2020年7月28日 下午3:35,air23  写道:
> 
> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
> 
> 
> {
>"data":[
>{
>"op_id":"97037138",
>"order_id":"84172164"
>}
>],
>"database":"order_11",
>"es":1595720375000,
>"id":17469027,
>"isDdl":false,
>"mysqlType":{
>"op_id":"int(11)",
>"order_id":"int(11)"
>},
>"old":null,
>"pkNames":[
>"op_id"
>],
>"sql":"",
>"sqlType":{
>"op_id":4,
>"order_id":4
>},
>"table":"order_product",
>"ts":1595720375837,
>"type":"INSERT"
> }
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 在 2020-07-28 14:44:35,"Jark Wu"  写道:
>> 有kafka 中json 数据的样例不?
>> 有没有看过 TaskManager 中有没有异常 log 信息?
>> 
>> 
>> 
>> On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>> 
>>> 你好 测试代码如下
>>> 
>>> 
>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>> " `data` VARCHAR , " +
>>> " `table` VARCHAR " +
>>> ") WITH (" +
>>> " 'connector' = 'kafka'," +
>>> " 'topic' = 'source_databases'," +
>>> " 'properties.bootstrap.servers' = '***'," +
>>> " 'properties.group.id' = 'real1'," +
>>> " 'format' = 'json'," +
>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>> ")";
>>> public static void main(String[] args) throws Exception {
>>> 
>>> 
>>> //bink table
>>> StreamExecutionEnvironment bsEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>EnvironmentSettings bsSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>StreamTableEnvironment bsTableEnv =
>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>> 
>>>TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>> 
>>> 
>>> tableResult.print();
>>> 
>>>Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>> 
>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>> 
>>> bsEnv.execute("aa");
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>>> ,order_operation_time
>>> ,inventory_batch_log
>>> ,order_log
>>> ,order_address_book
>>> ,product_inventory
>>> ,order_physical_relation
>>> ,bil_business_attach
>>> ,picking_detail
>>> ,picking_detail
>>> ,orders
>>> 
>>> 
>>> 
>>> 
>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>> 看到例子都是useOldPlanner 来转table的。
>>> 致谢
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
 抱歉,还是没有看到附件。
 如果是文本的话,你可以直接贴到邮件里。
 
 On Mon, 27 Jul 2020 at 19:22, air23  wrote:
 
> 我再上传一次
> 
> 在2020年07月27日 18:55,Jark Wu  写道:
> 
> Hi,
> 你的附件好像没有上传。
> 
> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
> 
>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> 
>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>>> +
>>" `data` VARCHAR , " +
>>" `table` VARCHAR " +
>>") WITH (" +
>>" 'connector' = 'kafka'," +
>>" 'topic' = 'order_source'," +
>>" 'properties.bootstrap.servers' = '***'," +
>>" 'properties.group.id' = 'real1'," +
>>" 'format' = 'json'," +
>>" 'scan.startup.mode' = 'earliest-offset'" +
>>")";
>> 
>> 
>> 具体见附件 有打印
>> 
>> 
>> 
>> 
>> 
> 
> 
>>> 



Re: Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 Jark Wu
因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
1.12 中已经支持读取复杂结构为 string 类型了。

Best,
Jark

On Tue, 28 Jul 2020 at 15:36, air23  wrote:

> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>
>
> {
> "data":[
> {
> "op_id":"97037138",
> "order_id":"84172164"
> }
> ],
> "database":"order_11",
> "es":1595720375000,
> "id":17469027,
> "isDdl":false,
> "mysqlType":{
> "op_id":"int(11)",
> "order_id":"int(11)"
> },
> "old":null,
> "pkNames":[
> "op_id"
> ],
> "sql":"",
> "sqlType":{
> "op_id":4,
> "order_id":4
> },
> "table":"order_product",
> "ts":1595720375837,
> "type":"INSERT"
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-28 14:44:35,"Jark Wu"  写道:
> >有kafka 中json 数据的样例不?
> >有没有看过 TaskManager 中有没有异常 log 信息?
> >
> >
> >
> >On Tue, 28 Jul 2020 at 09:40, air23  wrote:
> >
> >> 你好 测试代码如下
> >>
> >>
> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> >> " `data` VARCHAR , " +
> >> " `table` VARCHAR " +
> >> ") WITH (" +
> >> " 'connector' = 'kafka'," +
> >> " 'topic' = 'source_databases'," +
> >> " 'properties.bootstrap.servers' = '***'," +
> >> " 'properties.group.id' = 'real1'," +
> >> " 'format' = 'json'," +
> >> " 'scan.startup.mode' = 'earliest-offset'" +
> >> ")";
> >> public static void main(String[] args) throws Exception {
> >>
> >>
> >> //bink table
> >> StreamExecutionEnvironment bsEnv =
> >> StreamExecutionEnvironment.getExecutionEnvironment();
> >> EnvironmentSettings bsSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >> StreamTableEnvironment bsTableEnv =
> >> StreamTableEnvironment.create(bsEnv, bsSettings);
> >>
> >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
> >>
> >>
> >> tableResult.print();
> >>
> >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
> >>
> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
> >>
> >> bsEnv.execute("aa");
> >>
> >> }
> >>
> >>
> >>
> >>
> >> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
> >> ,order_operation_time
> >> ,inventory_batch_log
> >> ,order_log
> >> ,order_address_book
> >> ,product_inventory
> >> ,order_physical_relation
> >> ,bil_business_attach
> >> ,picking_detail
> >> ,picking_detail
> >> ,orders
> >>
> >>
> >>
> >>
> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
> >> 看到例子都是useOldPlanner 来转table的。
> >> 致谢
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
> >> >抱歉,还是没有看到附件。
> >> >如果是文本的话,你可以直接贴到邮件里。
> >> >
> >> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
> >> >
> >> >> 我再上传一次
> >> >>
> >> >> 在2020年07月27日 18:55,Jark Wu  写道:
> >> >>
> >> >> Hi,
> >> >> 你的附件好像没有上传。
> >> >>
> >> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
> >> >>
> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
> 不能取到data呢?*
> >> >> >
> >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
> (\n"
> >> +
> >> >> > " `data` VARCHAR , " +
> >> >> > " `table` VARCHAR " +
> >> >> > ") WITH (" +
> >> >> > " 'connector' = 'kafka'," +
> >> >> > " 'topic' = 'order_source'," +
> >> >> > " 'properties.bootstrap.servers' = '***'," +
> >> >> > " 'properties.group.id' = 'real1'," +
> >> >> > " 'format' = 'json'," +
> >> >> > " 'scan.startup.mode' = 'earliest-offset'" +
> >> >> > ")";
> >> >> >
> >> >> >
> >> >> > 具体见附件 有打印
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >>
> >> >>
> >>
>


回复: Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 明启 孙
你的flink什么版本

发送自 Windows 10 版邮件应用

发件人: air23
发送时间: 2020年7月28日 15:36
收件人: user-zh@flink.apache.org
主题: Re:Re: Re: 解析kafka的mysql binlog问题

格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来


{
"data":[
{
"op_id":"97037138",
"order_id":"84172164"
}
],
"database":"order_11",
"es":1595720375000,
"id":17469027,
"isDdl":false,
"mysqlType":{
"op_id":"int(11)",
"order_id":"int(11)"
},
"old":null,
"pkNames":[
"op_id"
],
"sql":"",
"sqlType":{
"op_id":4,
"order_id":4
},
"table":"order_product",
"ts":1595720375837,
"type":"INSERT"
}

















在 2020-07-28 14:44:35,"Jark Wu"  写道:
>有kafka 中json 数据的样例不?
>有没有看过 TaskManager 中有没有异常 log 信息?
>
>
>
>On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>
>> 你好 测试代码如下
>>
>>
>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> " `data` VARCHAR , " +
>> " `table` VARCHAR " +
>> ") WITH (" +
>> " 'connector' = 'kafka'," +
>> " 'topic' = 'source_databases'," +
>> " 'properties.bootstrap.servers' = '***'," +
>> " 'properties.group.id' = 'real1'," +
>> " 'format' = 'json'," +
>> " 'scan.startup.mode' = 'earliest-offset'" +
>> ")";
>> public static void main(String[] args) throws Exception {
>>
>>
>> //bink table
>> StreamExecutionEnvironment bsEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamTableEnvironment bsTableEnv =
>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>
>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>
>>
>> tableResult.print();
>>
>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>
>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>
>> bsEnv.execute("aa");
>>
>> }
>>
>>
>>
>>
>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>> ,order_operation_time
>> ,inventory_batch_log
>> ,order_log
>> ,order_address_book
>> ,product_inventory
>> ,order_physical_relation
>> ,bil_business_attach
>> ,picking_detail
>> ,picking_detail
>> ,orders
>>
>>
>>
>>
>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> 看到例子都是useOldPlanner 来转table的。
>> 致谢
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
>> >抱歉,还是没有看到附件。
>> >如果是文本的话,你可以直接贴到邮件里。
>> >
>> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>> >
>> >> 我再上传一次
>> >>
>> >> 在2020年07月27日 18:55,Jark Wu  写道:
>> >>
>> >> Hi,
>> >> 你的附件好像没有上传。
>> >>
>> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>> >>
>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> >> >
>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>> +
>> >> > " `data` VARCHAR , " +
>> >> > " `table` VARCHAR " +
>> >> > ") WITH (" +
>> >> > " 'connector' = 'kafka'," +
>> >> > " 'topic' = 'order_source'," +
>> >> > " 'properties.bootstrap.servers' = '***'," +
>> >> > " 'properties.group.id' = 'real1'," +
>> >> > " 'format' = 'json'," +
>> >> > " 'scan.startup.mode' = 'earliest-offset'" +
>> >> > ")";
>> >> >
>> >> >
>> >> > 具体见附件 有打印
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >>
>> >>
>>



Re:Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 air23
格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来


{
"data":[
{
"op_id":"97037138",
"order_id":"84172164"
}
],
"database":"order_11",
"es":1595720375000,
"id":17469027,
"isDdl":false,
"mysqlType":{
"op_id":"int(11)",
"order_id":"int(11)"
},
"old":null,
"pkNames":[
"op_id"
],
"sql":"",
"sqlType":{
"op_id":4,
"order_id":4
},
"table":"order_product",
"ts":1595720375837,
"type":"INSERT"
}

















在 2020-07-28 14:44:35,"Jark Wu"  写道:
>有kafka 中json 数据的样例不?
>有没有看过 TaskManager 中有没有异常 log 信息?
>
>
>
>On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>
>> 你好 测试代码如下
>>
>>
>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> " `data` VARCHAR , " +
>> " `table` VARCHAR " +
>> ") WITH (" +
>> " 'connector' = 'kafka'," +
>> " 'topic' = 'source_databases'," +
>> " 'properties.bootstrap.servers' = '***'," +
>> " 'properties.group.id' = 'real1'," +
>> " 'format' = 'json'," +
>> " 'scan.startup.mode' = 'earliest-offset'" +
>> ")";
>> public static void main(String[] args) throws Exception {
>>
>>
>> //bink table
>> StreamExecutionEnvironment bsEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamTableEnvironment bsTableEnv =
>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>
>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>
>>
>> tableResult.print();
>>
>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>
>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>
>> bsEnv.execute("aa");
>>
>> }
>>
>>
>>
>>
>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>> ,order_operation_time
>> ,inventory_batch_log
>> ,order_log
>> ,order_address_book
>> ,product_inventory
>> ,order_physical_relation
>> ,bil_business_attach
>> ,picking_detail
>> ,picking_detail
>> ,orders
>>
>>
>>
>>
>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> 看到例子都是useOldPlanner 来转table的。
>> 致谢
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
>> >抱歉,还是没有看到附件。
>> >如果是文本的话,你可以直接贴到邮件里。
>> >
>> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>> >
>> >> 我再上传一次
>> >>
>> >> 在2020年07月27日 18:55,Jark Wu  写道:
>> >>
>> >> Hi,
>> >> 你的附件好像没有上传。
>> >>
>> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>> >>
>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> >> >
>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>> +
>> >> > " `data` VARCHAR , " +
>> >> > " `table` VARCHAR " +
>> >> > ") WITH (" +
>> >> > " 'connector' = 'kafka'," +
>> >> > " 'topic' = 'order_source'," +
>> >> > " 'properties.bootstrap.servers' = '***'," +
>> >> > " 'properties.group.id' = 'real1'," +
>> >> > " 'format' = 'json'," +
>> >> > " 'scan.startup.mode' = 'earliest-offset'" +
>> >> > ")";
>> >> >
>> >> >
>> >> > 具体见附件 有打印
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >>
>> >>
>>


回复: flink1.11.1启动问题

2020-07-28 文章 酷酷的浑蛋
谢谢你,我将flink-conf.yaml的taskmanager.memory.process.size由1728m调成2048m,这个问题解决了, 
但是我觉得,这个问题应该很常见才对,怎么偏被我碰上了,flink这个设计是正常的吗?




在2020年07月28日 10:26,Xintong Song 写道:
建议确认一下 Yarn 的配置 “yarn.scheduler.minimum-allocation-mb” 在 Yarn RM 和 Flink JM
这两台机器上是否一致。

Yarn 会对 container request 做归一化。例如你请求的 TM container 是 1728m
(taskmanager.memory.process.size) ,如果 minimum-allocation-mb 是 1024m,那么实际得到的
container 大小必须是 minimum-allocation-mb 的整数倍,也就是 2048m。Flink 会去获取 Yarn 的配置,计算
container request 实际分到的 container 应该多大,并对分到的 container 进行检查。现在看 JM 日志,分下来的
container 并没有通过这个检查,造成 Flink 认为 container 规格不匹配。这里最可能的原因是 Flink 拿到的
minimum-allocation-mb 和 Yarn RM 实际使用的不一致。

Thank you~

Xintong Song



On Mon, Jul 27, 2020 at 7:42 PM 酷酷的浑蛋  wrote:


首先,flink1.9提交到yarn集群是没有问题的,同等的配置提交flink1.11.1到yarn集群就报下面的错误
2020-07-27 17:08:14,661 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -

2020-07-27 17:08:14,665 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Starting YarnJobClusterEntrypoint (Version: 1.11.1, Scala: 2.11,
Rev:7eb514a, Date:2020-07-15T07:02:09+02:00)
2020-07-27 17:08:14,665 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  OS
current user: hadoop
2020-07-27 17:08:15,417 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Current
Hadoop/Kerberos user: wangty
2020-07-27 17:08:15,418 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  JVM:
Java HotSpot(TM) 64-Bit Server VM - Oracle Corporation - 1.8/25.191-b12
2020-07-27 17:08:15,418 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Maximum
heap size: 429 MiBytes
2020-07-27 17:08:15,418 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
JAVA_HOME: /usr/local/jdk/
2020-07-27 17:08:15,419 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Hadoop
version: 2.7.7
2020-07-27 17:08:15,419 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  JVM
Options:
2020-07-27 17:08:15,419 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
-Xmx469762048
2020-07-27 17:08:15,419 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
-Xms469762048
2020-07-27 17:08:15,419 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
-XX:MaxMetaspaceSize=268435456
2020-07-27 17:08:15,419 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
-Dlog.file=/data/emr/yarn/logs/application_1568724479991_18850539/container_e25_1568724479991_18850539_01_01/jobmanager.log
2020-07-27 17:08:15,419 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
-Dlog4j.configuration=file:log4j.properties
2020-07-27 17:08:15,419 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
-Dlog4j.configurationFile=file:log4j.properties
2020-07-27 17:08:15,419 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -  Program
Arguments: (none)
2020-07-27 17:08:15,419 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint[] -
Classpath:

flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-07-28 文章 鱼子酱
Hi,社区的各位大家好:
我目前生产上面使用的是1.8.2版本,相对稳定
为了能够用sql统一所有相关的作业,同时我也一直在跟着flink最新版本进行研究,
截止目前先后研究了1.10.1 1.11.1共2个大版本

在尝试使用的过程中,我发现了通过程序,使用sql进行group操作时,checkpoint中的数据量一直在缓慢增加
状态后端使用的是rocksdb 的增量模式
StateBackend backend =new
RocksDBStateBackend("hdfs:///checkpoints-data/",true);
设置了官网文档中找到的删除策略:
TableConfig tableConfig = streamTableEnvironment.getConfig();
tableConfig.setIdleStateRetentionTime(Time.minutes(2),
Time.minutes(7));  

请问是我使用的方式不对吗?

通过WebUI查看详细的checkpoint信息,发现状态大的原因主要集中在group这一Operator



版本影响:flink1.10.1 flink1.11.1
planner:blink planner
source : kafka source
时间属性: env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);





sql:
insert into  result
select request_time ,request_id ,request_cnt ,avg_resptime
,stddev_resptime ,terminal_cnt
,SUBSTRING(DATE_FORMAT(LOCALTIMESTAMP,'-MM-dd HH:mm:ss.SSS'),0,19) from
(   select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
,commandId as request_id
,count(*) as request_cnt
,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime
from log
where
commandId in (104005 ,204005 ,404005)
and errCode=0 and attr=0
group by TUMBLE(times, INTERVAL '1' MINUTE),commandId

union all

select SUBSTRING(DATE_FORMAT(TUMBLE_START(times, INTERVAL '1'
MINUTE),'-MM-dd HH:mm:ss.SSS'),0,21) as request_time
,
,count(*) as request_cnt
,ROUND(avg(CAST(`respTime` as double)),2) as avg_resptime
,ROUND(stddev_pop(CAST(`respTime` as double)),2) as stddev_resptime
from log
where
commandId in (104005 ,204005 ,404005)
and errCode=0 and attr=0
group by TUMBLE(times, INTERVAL '1' MINUTE)
)


source:

create table log (
  eventTime bigint
  ,times timestamp(3)
  
  ,commandId integer
  ,watermark for times as times - interval '5' second
)
with(
 'connector' = 'kafka-0.10',
 'topic' = '……',
 'properties.bootstrap.servers' = '……',
 'properties.group.id' = '……',
 'scan.startup.mode' = 'latest-offset',
 'format' = 'json'
)

sink1:
create table result (
  request_time varchar
  ,request_id integer
  ,request_cnt bigint
  ,avg_resptime double
  ,stddev_resptime double
  ,insert_time varchar
) with (
  'connector' = 'kafka-0.10',
  'topic' = '……',
  'properties.bootstrap.servers' = '……',
  'properties.group.id' = '……',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
)





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 Jark Wu
有kafka 中json 数据的样例不?
有没有看过 TaskManager 中有没有异常 log 信息?



On Tue, 28 Jul 2020 at 09:40, air23  wrote:

> 你好 测试代码如下
>
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'source_databases'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
> public static void main(String[] args) throws Exception {
>
>
> //bink table
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
>
> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>
>
> tableResult.print();
>
> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>
> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>
> bsEnv.execute("aa");
>
> }
>
>
>
>
> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
> ,order_operation_time
> ,inventory_batch_log
> ,order_log
> ,order_address_book
> ,product_inventory
> ,order_physical_relation
> ,bil_business_attach
> ,picking_detail
> ,picking_detail
> ,orders
>
>
>
>
> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
> 看到例子都是useOldPlanner 来转table的。
> 致谢
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
> >抱歉,还是没有看到附件。
> >如果是文本的话,你可以直接贴到邮件里。
> >
> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
> >
> >> 我再上传一次
> >>
> >> 在2020年07月27日 18:55,Jark Wu  写道:
> >>
> >> Hi,
> >> 你的附件好像没有上传。
> >>
> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
> >>
> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
> >> >
> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
> +
> >> > " `data` VARCHAR , " +
> >> > " `table` VARCHAR " +
> >> > ") WITH (" +
> >> > " 'connector' = 'kafka'," +
> >> > " 'topic' = 'order_source'," +
> >> > " 'properties.bootstrap.servers' = '***'," +
> >> > " 'properties.group.id' = 'real1'," +
> >> > " 'format' = 'json'," +
> >> > " 'scan.startup.mode' = 'earliest-offset'" +
> >> > ")";
> >> >
> >> >
> >> > 具体见附件 有打印
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >>
>


回复: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-28 文章 wind.fly....@outlook.com
嗯,对,创建TableEnvironment和使用不是一个线程。

发件人: godfrey he 
发送时间: 2020年7月28日 14:19
收件人: user-zh 
主题: Re: flink1.11.0 执行sqlQuery时报NullPointException

这个问题只能说是使用TableEnvironment不当的问题。ververica的gateway的模式其实就是多线程。
创建TableEnvironment和使用TableEnvironment可能不是一个线程,worker线程是被复用的。
简单来说就是:
当session创建的时候,worker thread1 会创建一个TableEnvironment,
然后当后续其他该session请求过来时候,可能是 worker thread2使用该TableEnvironment执行sql。

这个其实就是在多线程情况下使用TableEnvironment。不符合TableEnvironment只能在单线程使用的约束。

wind.fly@outlook.com  于2020年7月28日周二 下午2:09写道:

>
> gateway就类似于一个web服务,大概流程是建立连接时会初始化一个session,在session里面初始化TableEnvironment,然后根据sql类型做不同的操作,比如select语句会去执行sqlQuery,具体可查看
> https://github.com/ververica/flink-sql-gateway。
>
> 另外,加了RelMetadataQueryBase.THREAD_PROVIDERS
>
> .set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))后确实不报这个错了。
>
> 题外话,个人认为flink不应该将这样的异常抛给用户去解决,除非我去深入研究源码,要不然根本无法搞清楚具体发生了什么,在封装性上还有待改善。
> 
> 发件人: godfrey he 
> 发送时间: 2020年7月28日 13:55
> 收件人: user-zh 
> 主题: Re: flink1.11.0 执行sqlQuery时报NullPointException
>
> 我的怀疑点还是多线程引起的。
> 你能具体描述一下你们gateway的行为吗? 是一个web server?
>
> 另外,你可以在table env执行query前加上
> RelMetadataQueryBase.THREAD_PROVIDERS
>
> .set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()));
> 这句话临时fix。
>
> wind.fly@outlook.com  于2020年7月28日周二
> 上午11:02写道:
>
> > 不是多线程同时操作一个tableEnvironment,每执行一次都会创建一个TableEnvironment
> > 
> > 发件人: godfrey he 
> > 发送时间: 2020年7月28日 9:58
> > 收件人: user-zh 
> > 主题: Re: flink1.11.0 执行sqlQuery时报NullPointException
> >
> > 你们是否在多线程环境下使用 TableEnvironment ?
> > TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。
> >
> > godfrey he  于2020年7月28日周二 上午9:55写道:
> >
> > > hi 能给出详细的schema信息吗?
> > >
> > > wind.fly@outlook.com  于2020年7月27日周一
> > > 下午7:02写道:
> > >
> > >> 补充一下,执行的sql如下:
> > >>
> > >> select order_no, order_time from
> > >> x.ods.ods_binlog_test_trip_create_t_order_1
> > >>
> > >> 
> > >> 发件人: wind.fly@outlook.com 
> > >> 发送时间: 2020年7月27日 18:49
> > >> 收件人: user-zh@flink.apache.org 
> > >> 主题: flink1.11.0 执行sqlQuery时报NullPointException
> > >>
> > >> Hi,all:
> > >>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
> > >> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
> > >> Caused by: java.lang.NullPointerException
> > >>   at java.util.Objects.requireNonNull(Objects.java:203)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
> > >>   at
> > >>
> >
> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
> > >>   at
> org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
> > >>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
> > >>   at
> > >> org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> > >>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> > >>
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
> > >>   at
> > 

Re: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-28 文章 godfrey he
这个问题只能说是使用TableEnvironment不当的问题。ververica的gateway的模式其实就是多线程。
创建TableEnvironment和使用TableEnvironment可能不是一个线程,worker线程是被复用的。
简单来说就是:
当session创建的时候,worker thread1 会创建一个TableEnvironment,
然后当后续其他该session请求过来时候,可能是 worker thread2使用该TableEnvironment执行sql。

这个其实就是在多线程情况下使用TableEnvironment。不符合TableEnvironment只能在单线程使用的约束。

wind.fly@outlook.com  于2020年7月28日周二 下午2:09写道:

>
> gateway就类似于一个web服务,大概流程是建立连接时会初始化一个session,在session里面初始化TableEnvironment,然后根据sql类型做不同的操作,比如select语句会去执行sqlQuery,具体可查看
> https://github.com/ververica/flink-sql-gateway。
>
> 另外,加了RelMetadataQueryBase.THREAD_PROVIDERS
>
> .set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))后确实不报这个错了。
>
> 题外话,个人认为flink不应该将这样的异常抛给用户去解决,除非我去深入研究源码,要不然根本无法搞清楚具体发生了什么,在封装性上还有待改善。
> 
> 发件人: godfrey he 
> 发送时间: 2020年7月28日 13:55
> 收件人: user-zh 
> 主题: Re: flink1.11.0 执行sqlQuery时报NullPointException
>
> 我的怀疑点还是多线程引起的。
> 你能具体描述一下你们gateway的行为吗? 是一个web server?
>
> 另外,你可以在table env执行query前加上
> RelMetadataQueryBase.THREAD_PROVIDERS
>
> .set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()));
> 这句话临时fix。
>
> wind.fly@outlook.com  于2020年7月28日周二
> 上午11:02写道:
>
> > 不是多线程同时操作一个tableEnvironment,每执行一次都会创建一个TableEnvironment
> > 
> > 发件人: godfrey he 
> > 发送时间: 2020年7月28日 9:58
> > 收件人: user-zh 
> > 主题: Re: flink1.11.0 执行sqlQuery时报NullPointException
> >
> > 你们是否在多线程环境下使用 TableEnvironment ?
> > TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。
> >
> > godfrey he  于2020年7月28日周二 上午9:55写道:
> >
> > > hi 能给出详细的schema信息吗?
> > >
> > > wind.fly@outlook.com  于2020年7月27日周一
> > > 下午7:02写道:
> > >
> > >> 补充一下,执行的sql如下:
> > >>
> > >> select order_no, order_time from
> > >> x.ods.ods_binlog_test_trip_create_t_order_1
> > >>
> > >> 
> > >> 发件人: wind.fly@outlook.com 
> > >> 发送时间: 2020年7月27日 18:49
> > >> 收件人: user-zh@flink.apache.org 
> > >> 主题: flink1.11.0 执行sqlQuery时报NullPointException
> > >>
> > >> Hi,all:
> > >>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
> > >> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
> > >> Caused by: java.lang.NullPointerException
> > >>   at java.util.Objects.requireNonNull(Objects.java:203)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
> > >>   at
> > >>
> >
> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
> > >>   at
> > >>
> >
> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
> > >>   at
> org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
> > >>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
> > >>   at
> > >> org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> > >>   at
> > >>
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> > >>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> > >>
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
> > >>   at
> > >>
> >
> org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93)
> > >>   at
> > >>
> >
> 

回复: flink1.11.0 执行sqlQuery时报NullPointException

2020-07-28 文章 wind.fly....@outlook.com
gateway就类似于一个web服务,大概流程是建立连接时会初始化一个session,在session里面初始化TableEnvironment,然后根据sql类型做不同的操作,比如select语句会去执行sqlQuery,具体可查看https://github.com/ververica/flink-sql-gateway。

另外,加了RelMetadataQueryBase.THREAD_PROVIDERS
.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()))后确实不报这个错了。

题外话,个人认为flink不应该将这样的异常抛给用户去解决,除非我去深入研究源码,要不然根本无法搞清楚具体发生了什么,在封装性上还有待改善。

发件人: godfrey he 
发送时间: 2020年7月28日 13:55
收件人: user-zh 
主题: Re: flink1.11.0 执行sqlQuery时报NullPointException

我的怀疑点还是多线程引起的。
你能具体描述一下你们gateway的行为吗? 是一个web server?

另外,你可以在table env执行query前加上
RelMetadataQueryBase.THREAD_PROVIDERS
.set(JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE()));
这句话临时fix。

wind.fly@outlook.com  于2020年7月28日周二 上午11:02写道:

> 不是多线程同时操作一个tableEnvironment,每执行一次都会创建一个TableEnvironment
> 
> 发件人: godfrey he 
> 发送时间: 2020年7月28日 9:58
> 收件人: user-zh 
> 主题: Re: flink1.11.0 执行sqlQuery时报NullPointException
>
> 你们是否在多线程环境下使用 TableEnvironment ?
> TableEnvironment 不是线程安全的,多线程情况使用可能出现一些莫名其妙的问题。
>
> godfrey he  于2020年7月28日周二 上午9:55写道:
>
> > hi 能给出详细的schema信息吗?
> >
> > wind.fly@outlook.com  于2020年7月27日周一
> > 下午7:02写道:
> >
> >> 补充一下,执行的sql如下:
> >>
> >> select order_no, order_time from
> >> x.ods.ods_binlog_test_trip_create_t_order_1
> >>
> >> 
> >> 发件人: wind.fly@outlook.com 
> >> 发送时间: 2020年7月27日 18:49
> >> 收件人: user-zh@flink.apache.org 
> >> 主题: flink1.11.0 执行sqlQuery时报NullPointException
> >>
> >> Hi,all:
> >>  本人正在为公司之前基于flink1.10的gateway升级flink版本到1.11,用的hive
> >> catalog,建表后,执行sqlQuery方法时报NullPointException,希望给出排错建议,具体报错信息如下:
> >> Caused by: java.lang.NullPointerException
> >>   at java.util.Objects.requireNonNull(Objects.java:203)
> >>   at
> >>
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:141)
> >>   at
> >>
> org.apache.calcite.rel.metadata.RelMetadataQuery.(RelMetadataQuery.java:106)
> >>   at
> >>
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.(FlinkRelMetadataQuery.java:73)
> >>   at
> >>
> org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery.instance(FlinkRelMetadataQuery.java:52)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:39)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.FlinkRelOptClusterFactory$$anon$1.get(FlinkRelOptClusterFactory.scala:38)
> >>   at
> >>
> org.apache.calcite.plan.RelOptCluster.getMetadataQuery(RelOptCluster.java:178)
> >>   at
> >>
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:118)
> >>   at
> >>
> org.apache.calcite.rel.logical.LogicalProject.create(LogicalProject.java:111)
> >>   at
> >>
> org.apache.calcite.rel.core.RelFactories$ProjectFactoryImpl.createProject(RelFactories.java:180)
> >>   at org.apache.calcite.tools.RelBuilder.project_(RelBuilder.java:1462)
> >>   at org.apache.calcite.tools.RelBuilder.project(RelBuilder.java:1256)
> >>   at
> >> org.apache.calcite.tools.RelBuilder.projectNamed(RelBuilder.java:1521)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4125)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> >>   at
> >>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> >>   at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNodes(SqlExprToRexConverterImpl.java:81)
> >>   at
> >>
> org.apache.flink.table.planner.calcite.SqlExprToRexConverterImpl.convertToRexNode(SqlExprToRexConverterImpl.java:73)
> >>   at
> >>
> org.apache.flink.table.planner.delegation.ParserImpl.parseSqlExpression(ParserImpl.java:93)
> >>   at
> >>
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolveExpressionDataType(CatalogTableSchemaResolver.java:119)
> >>   at
> >>
> org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:83)
> >>   at
> >>
> org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380)
> >>   at
> >>
> org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
> >>   at
> >>
> org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:375)
> >>   at
> >>
> org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:75)
> >>   at
> >>
>