关于FlinkSQL新增operatoer后基于savepoint重启报错的问题

2020-06-07 文章 陈赋赟
原先sql任务是: CREATE TABLE A_source(...) CREATE TABLE B_sink (...) INSERT INTO B_sink SELECT 1 FROM A_source ; 我基于这个FlinkSQL任务生成了savepoint后,我重新修改为 CREATE TABLE A_source(...) CREATE TABLE B_sink (...) CREATE TABLE C_source(...) CREATE TABLE D_sink (...) INSERT INTO B_sink SELECT 1 FROM

?????? flink 1.9 ??????UDAF ????state??????????????

2020-06-07 文章 star
?? ?? ---- ??:"Benchao Li"

Re: flink 1.9 自定义UDAF 实现state管理的逻辑吗?

2020-06-07 文章 Benchao Li
UDAF的accumulator本身就会被Flink的聚合算子作为state存起来,自然就会参与checkpoint和恢复。 不需要你做额外的操作。你实现UDAF的时候需要注意的是,在UDAF里面不要有自己的临时状态, 把所有信息都放到accumulator中。 star <3149768...@qq.com> 于2020年6月8日周一 上午10:50写道: > 我的udaf里有一些聚合操作,比如接收一些数据 聚合到一个arrayList里,输出也是arraylist。我想问要不要把这个arraylist > 也做checkpoint > > > > >

?????? flink 1.9 ??????UDAF ????state??????????????

2020-06-07 文章 star
udaf?? ??arrayListarraylistarraylist checkpoint ---- ??:"Benchao Li"

Re: flink 1.9 自定义UDAF 实现state管理的逻辑吗?

2020-06-07 文章 Benchao Li
没有完全明白你的问题。 你是要问UDAF的相关的state是怎么被Flink管理的么? 还是问UDAF里面如果用了state,应该自己怎么来管理呢? star <3149768...@qq.com> 于2020年6月8日周一 上午10:44写道: > 请教大家, > > > flink 1.9 自定义UDAF 实现state管理的逻辑吗? > > > 还是和sql一样 自己管理stage? > > > class MyFunc extends AggregateFunction{ > createAccumulator > accumulate > getValue > merge >

flink 1.9 ??????UDAF ????state??????????????

2020-06-07 文章 star
?? flink 1.9 ??UDAF state?? ??sql stage?? class MyFunc extends AggregateFunction{ createAccumulator accumulate getValue merge }

Re: flink 1.10SQL 报错问题求教

2020-06-07 文章 godfrey he
hi 请问你用的flink是哪个版本?StreamTask这个类里报了NPE,感觉是bug。 hb <343122...@163.com> 于2020年6月5日周五 下午3:07写道: > Flink SQL 作业, 开始运行正常, checkpoint也正常, 没有大状态(状态19.7KB), > 但是执行一段时间后,开始报错, 然后恢复执行个几秒,也马上报这个错,然后报错恢复不停循环. > 哪位帮忙看看,不胜感激. > > > 2020-06-05 14:13:19,791 INFO org.apache.flink.runtime.taskmanager.Task - >

?????? flink1.9 Sql ????????????????????????state??????

2020-06-07 文章 star
?? restorerestore?? error ---- ??:"Benchao Li"

?????? ????flinksql ??????mysql??????????

2020-06-07 文章 ??????
hi,?? ?? ---- ??:"Px New"<15701181132mr@gmail.com; :2020??6??7??(??) 7:02 ??:"user-zh"

回复:关于flinksql 与维表mysql的关联问题

2020-06-07 文章 1048262223
Hi 是的。 Best, Yichao Yang 发自我的iPhone -- 原始邮件 -- 发件人: Px New <15701181132mr@gmail.com 发送时间: 2020年6月7日 19:03 收件人: user-zh

Re: 关于flinksql 与维表mysql的关联问题

2020-06-07 文章 Px New
好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作? 1048262223 <1048262...@qq.com>于2020年6月7日 周日下午3:57写道: > Hi > > > 可以使用open + broadcast的方式解决~ > > > Best, > Yichao Yang > > > > > > --原始邮件-- > 发件人:"Px New"<15701181132mr@gmail.com; > 发送时间:2020年6月6日(星期六)

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
 不好意思,没注意到 感谢 Benchao Li 于2020年6月7日周日 下午6:47写道: > FROM_UNIXTIME接收的是秒的时间戳,你的maxwell_ts看起来是微秒吧,应该/100吧 > > macia kk 于2020年6月7日周日 下午6:15写道: > >> 打印出来是这样的 >> >> "maxwell_ts":1591518126072000,"ts_watermark":"52403-03-16 00:21:12" >> >> macia kk 于2020年6月7日周日 下午5:53写道: >> >>>

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 Benchao Li
FROM_UNIXTIME接收的是秒的时间戳,你的maxwell_ts看起来是微秒吧,应该/100吧 macia kk 于2020年6月7日周日 下午6:15写道: > 打印出来是这样的 > > "maxwell_ts":1591518126072000,"ts_watermark":"52403-03-16 00:21:12" > > macia kk 于2020年6月7日周日 下午5:53写道: > >> 再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛 >> >> 如上,这里定义的 ts_watermark, 想用他们做

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
打印出来是这样的 "maxwell_ts":1591518126072000,"ts_watermark":"52403-03-16 00:21:12" macia kk 于2020年6月7日周日 下午5:53写道: > 再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛 > > 如上,这里定义的 ts_watermark, 想用他们做 watermakr,但是他的返回结果是 null, 我的 maxwell_ts 是 > millseconds,我看了函数的使用方法,没想到哪里有问题 > > val bsSettings = >

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
再次感谢,成功了,这个问题困扰我两周了。还有个问题,能麻烦帮我看下嘛 如上,这里定义的 ts_watermark, 想用他们做 watermakr,但是他的返回结果是 null, 我的 maxwell_ts 是 millseconds,我看了函数的使用方法,没想到哪里有问题 val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings)

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 Leonard Xu
Hi, 1.10确实有这个bug, 这个问题在1.10.1中已经修复了,你可以使用升级到1.10.1,Benchao之前修了一个计算列相关的,我刚刚看不使用计算列也会有这问题,应该是在[1]中 jark wu 修复的。 Best, Leonard Xu [1] https://issues.apache.org/jira/browse/FLINK-16526 > 在 2020年6月7日,15:32,macia kk 写道: > > 各位大佬, > > 我的数据源 json

Re: flink sql 中值为null时结果都为 false

2020-06-07 文章 Leonard Xu
Hi, Flink 用Calcite做sql解析和优化, 这是个 bool 的二值逻辑和三值逻辑处理问题,calcite默认在 where clause[2] 处理时 是用UNKNOWN_AS_FALSE mode, 这个结果是符合预期的, 类似的还有"x IS TRUE","JOIN ... ON x", "HAVING x。 [1] https://github.com/apache/calcite/blob/master/core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java#L1016

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
好的,感谢 Benchao Li 于2020年6月7日周日 下午4:04写道: > 1.10还是有bug的,1.10.1已经修复[1]了。可以尝试下1.10.1 > > [1] https://issues.apache.org/jira/browse/FLINK-16068 > > macia kk 于2020年6月7日周日 下午3:51写道: > > > 1.10 > > > > 1048262223 <1048262...@qq.com> 于2020年6月7日周日 下午3:48写道: > > > > > Hi > > > > > > > > >

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 Benchao Li
1.10还是有bug的,1.10.1已经修复[1]了。可以尝试下1.10.1 [1] https://issues.apache.org/jira/browse/FLINK-16068 macia kk 于2020年6月7日周日 下午3:51写道: > 1.10 > > 1048262223 <1048262...@qq.com> 于2020年6月7日周日 下午3:48写道: > > > Hi > > > > > > 这个好像有同学在群里提到过,也会出现ddl关键字冲突问题,他是通过将版本升到了1.10解决的,能提供下你是用的版本吗? > > > > > > Best, > >

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 Benchao Li
嗯,你这个是哪个版本呢?曾经的确是有过计算列的时候会有这种bug,不过后来已经修复了。 macia kk 于2020年6月7日周日 下午3:42写道: > 下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂 > > Benchao Li 于2020年6月7日周日 下午3:38写道: > > > Hi, > > 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。 > > > > macia kk 于2020年6月7日周日 下午3:33写道: > > > > >

?????? ????flinksql ??????mysql??????????

2020-06-07 文章 1048262223
Hi open + broadcast??~ Best, Yichao Yang ---- ??:"Px New"<15701181132mr@gmail.com; :2020??6??6??(??) 9:50 ??:"user-zh"

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
1.10 1048262223 <1048262...@qq.com> 于2020年6月7日周日 下午3:48写道: > Hi > > > 这个好像有同学在群里提到过,也会出现ddl关键字冲突问题,他是通过将版本升到了1.10解决的,能提供下你是用的版本吗? > > > Best, > Yichao Yang > > > > 发自我的iPhone > > > -- 原始邮件 -- > 发件人: macia kk 发送时间: 2020年6月7日 15:42 > 收件人: user-zh 主题: 回复:Flink SQL

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val sourceTable = """CREATE TABLE my_kafak_source ( |`table` varchar, |

回复:Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 1048262223
Hi 这个好像有同学在群里提到过,也会出现ddl关键字冲突问题,他是通过将版本升到了1.10解决的,能提供下你是用的版本吗? Best, Yichao Yang 发自我的iPhone -- 原始邮件 -- 发件人: macia kk

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
```scala val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val sourceTable = """CREATE TABLE my_kafak_source ( |`table` varchar,

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
下边的代码里,没有用 `table` 字段,我现在只要把 table, database 这两行去掉,可以能跑,只要加上,就会挂 Benchao Li 于2020年6月7日周日 下午3:38写道: > Hi, > 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。 > > macia kk 于2020年6月7日周日 下午3:33写道: > > > 各位大佬, > > > > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 > > > > val

Re: Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 Benchao Li
Hi, 看起来你的DDL应该没有太大问题,你可以把代码贴的再完整一点么,可能跟下面的代码有关系。 macia kk 于2020年6月7日周日 下午3:33写道: > 各位大佬, > > 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 > > val bsSettings = > > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() > val bsTableEnv =

Flink SQL create table 关键字 table 加反引号 解析失败

2020-06-07 文章 macia kk
各位大佬, 我的数据源 json 里有 database, table 字段,想解析出来,是保留关键字,我加了反引号,但是还是报错,这是为什么呢 val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build() val bsTableEnv = StreamTableEnvironment.create(env, bsSettings) val sourceTable = """CREATE TABLE my_kafak_source (