Re: Re: spark代码直接运行至Flink平台

2020-04-21 文章 hsdcl...@163.com
看了下好像 Apache Beam 就是干这个事情的,学习啦 




hsdcl...@163.com
 
发件人: lec ssmi
发送时间: 2020-04-22 13:37
收件人: flink-user-cn
主题: Re: Re: spark代码直接运行至Flink平台
那还不如直接用apache beam直接将这些框架的API全部统一起来。
 
hsdcl...@163.com  于2020年4月22日周三 上午11:27写道:
 
> 降低用户迁移学习成本,两个框架有类似的地方,不知道比喻是否恰当,Flink平台 是否可以类似虚拟机,上层的流批应用如java,
> spark就像Scala一样
>
>
>
>
> hsdcl...@163.com
>
> 发件人: Jeff Zhang
> 发送时间: 2020-04-22 10:52
> 收件人: user-zh
> 主题: Re: spark代码直接运行至Flink平台
> 啥目的 ?
>
> hsdcl...@163.com  于2020年4月22日周三 上午9:49写道:
>
> >   Hi,
> >   有个脑洞大开的想法,有没有可能出一个模块功能,可以将用户写的spark代码直接运行在Flink平台
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


答复: 关于Flink SQL UDAF中调用createAccumulator的调用问题

2020-04-21 文章 刘首维
Hi benchao,

  感谢你的回复,完美解答了我的疑惑。然后我在看代码做实验的时候又派生了两个想法/问题:

1. 将accu作为一个单独的field在GroupAggFunction中,然后在snapshotState的时候才去向state更新
2. 为accu设置一个0/初始/空/幺元或者类似概念的状态,这样我们就不必去销毁acc而是去只是将其置回


  或者说时区在设计这个部分的时候,有什么其他的考量吗

发件人: Benchao Li 
发送时间: 2020年4月21日 18:28:09
收件人: 刘首维
抄送: user-zh
主题: Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题

Hi 首维,

这里说的是同一个key会创建一次,所以你看到一个AggFunction会创建多次更这个并不冲突,因为一个AggFunction属于一个subtask维度的,一个subtask处理不止一个key的。

你的第二个问题:
> 我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group 
> by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来
这个的确是存在这个问题。比如两个group by嵌套使用,第一个group by由于会更新结果,所以会retract之前的结果。第二个group 
by收到retract就会先撤销当前的结果,然后再发送撤销之后的结果;然后后面又会来一条正常的append数据,这条数据导致再次retract一次,和append一次。

> 还有在agg over group by的时候是不是createAccumulator的生命周期会像窗口聚合的时候生命周期会改变吗。
这个没有本质区别,正常情况下一个key下的agg是会一直存在的。除非是你配置了state 
retention时间,那么对应的key的state如果过了retention时间没有访问就会被清理。

刘首维 mailto:liushou...@autohome.com.cn>> 
于2020年4月21日周二 下午5:59写道:

Hi benchao,


非常感谢你的回复,我说一下我产生疑问的原因吧。首先我的朴素思路是createAccumlator针对一个group 
by的一个key应该被创建一次,可是我做实验的时候(在create 
acc的时候打印日志),我发现日志打印了不止一次,而且我确认是同一线程的同一AggFunction对象打印的。而且我也翻看了代码可以看到accumulator确实是以一个state方式存储的,每次都会先尝试从state取,就更不应该出现多次打印createAccmulator日志的问题了。


  为了方便你帮我分析,我来补充一下环境和场景:


   版本: 1.7.2/1.9.1

  场景 : group by 嵌套, 常规聚合



  我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group 
by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来。还有在agg over group 
by的时候是不是createAccumulator的生命周期会像窗口聚合的时候生命周期会改变吗。


再次感谢你的回复

best regards


发件人: Benchao Li mailto:libenc...@gmail.com>>
发送时间: 2020年4月21日 17:45:54
收件人: user-zh
主题: Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题

Hi 首维,

这是个很好的问题。

> 这个方法的调用时机是什么呢,会被调用几次呢?
这个调用的时机是每个key的第一条数据来的时候,会创建一个accumulator。创建的次数大约是key的数量。
当然这里说的是regular groupby;
如果是window group by的话,就是每个window都会做上面的这个事情。

> 一个accumulator的生命周期是怎么样的?
如果是window group by的话,那它的生命周期就是跟window是一样的。
如果是regular groupby的话,可以认为是全局的。除非有一条数据retract掉了当前的结果之后,等于被聚合的
数据是0条的时候,也会销毁。

> 一个accumulator会被反复的序列化反序列化吗?
这个问题非常好。它是否序列化跟你用的state backend有关系。
如果是rocksdb的话,是会每次更新都会存一次state。如果不是rocksdb,它就是一个内存里的数据结构,只有在
checkpoint的时候序列化。
当然这个在regular groupby的情况下,开启mini batch会减少每次都需要更新state的情况。

刘首维 mailto:liushou...@autohome.com.cn>> 
于2020年4月21日周二 下午5:37写道:

> Hi all,
>
>
>
>最近有几个疑问没能很好地理解清楚:
>
>
>
>我们都知道,UDAF中的有createAccumulator这个方法,那么:
>
> 这个方法的调用时机是什么呢,会被调用几次呢?
>
> 一个accumulator的生命周期是怎么样的?
>
> 一个accumulator会被反复的序列化反序列化吗?
>
>
>  麻烦了解相关细节的社区的同学们帮忙解答一下~
>
> 先谢谢啦
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; 
libenc...@pku.edu.cn


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; 
libenc...@pku.edu.cn


Re: Re: spark代码直接运行至Flink平台

2020-04-21 文章 lec ssmi
那还不如直接用apache beam直接将这些框架的API全部统一起来。

hsdcl...@163.com  于2020年4月22日周三 上午11:27写道:

> 降低用户迁移学习成本,两个框架有类似的地方,不知道比喻是否恰当,Flink平台 是否可以类似虚拟机,上层的流批应用如java,
> spark就像Scala一样
>
>
>
>
> hsdcl...@163.com
>
> 发件人: Jeff Zhang
> 发送时间: 2020-04-22 10:52
> 收件人: user-zh
> 主题: Re: spark代码直接运行至Flink平台
> 啥目的 ?
>
> hsdcl...@163.com  于2020年4月22日周三 上午9:49写道:
>
> >   Hi,
> >   有个脑洞大开的想法,有没有可能出一个模块功能,可以将用户写的spark代码直接运行在Flink平台
>
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: json中date类型解析失败

2020-04-21 文章 Leonard Xu
Hi
 报错是因为'format.ignore-parse-errors' 
参数是在社区最新的版本才支持的,FLINK-16725在1.11应该也会修复,如果需要使用的话可以等1.11发布后使用或者自己编译master分支,
即使有了这个参数你的问题也无法解决,对你的case每行记录都会解析错误所以会过滤掉所有数据。
建议你可以在数据源就转为标准的json格式或者写个udf将long转为timestamp后使用。

祝好,
Leonard Xu

> 在 2020年4月22日,12:33,王双利  写道:
> 
> 要不你们再做一个fastjson版本的?
> 目前内部解析用的都是fastjson
> 
> 
> 
> 发件人: 王双利
> 发送时间: 2020-04-22 12:31
> 收件人: user-zh
> 主题: 回复: Re: json中date类型解析失败
>配置后报错误 ,
> 'format.ignore-parse-errors' = 'true'
> 这个参数需要怎么配置呢?
> The matching candidates:
>org.apache.flink.formats.json.JsonRowFormatFactory
>Unsupported property keys:
>format.ignore-parse-errors
> WITH (
> ..
> 'format.type' = 'json',
> 'format.ignore-parse-errors' = 'true',
> 
> )
> 
> 
> 
> 发件人: Leonard Xu
> 发送时间: 2020-04-22 12:18
> 收件人: user-zh; 王双利
> 主题: Re: json中date类型解析失败
> Hi,
> flink支持的json format是遵循RFC标准[1]的,不支持从long型转化为json timestamp, json的 
> tiemstamp类型转化可以简单参考下,这个虽然符合标准,单对用户习惯来说确实不友好,目前社区也有一个jira[2]在跟进这个问题了。关于鲁棒性的问题,json
>  format有个参数支持跳过解析错误的记录,'format.ignore-parse-errors' = 'true'
> 
> 
> ```
> Long time = System.currentTimeMillis();
> DateFormat dateFormat =  new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'");
> Date date = new Date(time);
> String jsonSchemaDate = dateFormat.format(date);
> ```
> [1] 
> https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times
> [2] https://issues.apache.org/jira/browse/FLINK-16725 
> 
> 
> Best,
> Leonard Xu
> 
>> 在 2020年4月22日,12:05,王双利  写道:
>> 
>> 使用  flink-json -1.10.0 解析json数据报下面的错误
>> 
>> Caused by: java.time.format.DateTimeParseException: Text '1587527019680' 
>> could not be parsed at index 0
>> 
>> 经检查 是 以下字段导致的
>> {"jnlno":"e4574cce-8c9f-4d3f-974f-fc15250ec10d","ip":"122.96.41.218","channel":"pc","transdate":1587527019680,"event":"login","userid":"9","deviceid":"11","taskid":"1","retcode":"00","status":"fail"}
>> 
>> 其中 transdate 是使用fastjson序列化得来的
>> request.put("transdate",cal.getTime());JSON.toJSONString(request),解析失败后,系统直接停止,这个觉得也不太好吧,鲁棒性不够,万一其他系统发送一个错误格式的,系统直接挂掉,感觉不太合理。
>> 以上的应该怎么解决才合适。
>> 
>> 
> 



回复: 回复: json中date类型解析失败

2020-04-21 文章 王双利
 要不你们再做一个fastjson版本的?
目前内部解析用的都是fastjson


 
发件人: 王双利
发送时间: 2020-04-22 12:31
收件人: user-zh
主题: 回复: Re: json中date类型解析失败
配置后报错误 ,
'format.ignore-parse-errors' = 'true'
这个参数需要怎么配置呢?
The matching candidates:
org.apache.flink.formats.json.JsonRowFormatFactory
Unsupported property keys:
format.ignore-parse-errors
WITH (
..
'format.type' = 'json',
'format.ignore-parse-errors' = 'true',
 
 )


 
发件人: Leonard Xu
发送时间: 2020-04-22 12:18
收件人: user-zh; 王双利
主题: Re: json中date类型解析失败
Hi,
flink支持的json format是遵循RFC标准[1]的,不支持从long型转化为json timestamp, json的 
tiemstamp类型转化可以简单参考下,这个虽然符合标准,单对用户习惯来说确实不友好,目前社区也有一个jira[2]在跟进这个问题了。关于鲁棒性的问题,json
 format有个参数支持跳过解析错误的记录,'format.ignore-parse-errors' = 'true'
 
 
```
Long time = System.currentTimeMillis();
DateFormat dateFormat =  new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'");
Date date = new Date(time);
String jsonSchemaDate = dateFormat.format(date);
```
[1] 
https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times
[2] https://issues.apache.org/jira/browse/FLINK-16725 

 
Best,
Leonard Xu
 
> 在 2020年4月22日,12:05,王双利  写道:
> 
> 使用  flink-json -1.10.0 解析json数据报下面的错误
> 
> Caused by: java.time.format.DateTimeParseException: Text '1587527019680' 
> could not be parsed at index 0
> 
> 经检查 是 以下字段导致的
> {"jnlno":"e4574cce-8c9f-4d3f-974f-fc15250ec10d","ip":"122.96.41.218","channel":"pc","transdate":1587527019680,"event":"login","userid":"9","deviceid":"11","taskid":"1","retcode":"00","status":"fail"}
> 
> 其中 transdate 是使用fastjson序列化得来的
> request.put("transdate",cal.getTime());JSON.toJSONString(request),解析失败后,系统直接停止,这个觉得也不太好吧,鲁棒性不够,万一其他系统发送一个错误格式的,系统直接挂掉,感觉不太合理。
> 以上的应该怎么解决才合适。
> 
> 
 


回复: Re: json中date类型解析失败

2020-04-21 文章 王双利
配置后报错误 ,
'format.ignore-parse-errors' = 'true'
这个参数需要怎么配置呢?
The matching candidates:
org.apache.flink.formats.json.JsonRowFormatFactory
Unsupported property keys:
format.ignore-parse-errors
WITH (
..
'format.type' = 'json',
'format.ignore-parse-errors' = 'true',
 
 )


 
发件人: Leonard Xu
发送时间: 2020-04-22 12:18
收件人: user-zh; 王双利
主题: Re: json中date类型解析失败
Hi,
flink支持的json format是遵循RFC标准[1]的,不支持从long型转化为json timestamp, json的 
tiemstamp类型转化可以简单参考下,这个虽然符合标准,单对用户习惯来说确实不友好,目前社区也有一个jira[2]在跟进这个问题了。关于鲁棒性的问题,json
 format有个参数支持跳过解析错误的记录,'format.ignore-parse-errors' = 'true'
 
 
```
Long time = System.currentTimeMillis();
DateFormat dateFormat =  new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'");
Date date = new Date(time);
String jsonSchemaDate = dateFormat.format(date);
```
[1] 
https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times
[2] https://issues.apache.org/jira/browse/FLINK-16725 

 
Best,
Leonard Xu
 
> 在 2020年4月22日,12:05,王双利  写道:
> 
> 使用  flink-json -1.10.0 解析json数据报下面的错误
> 
> Caused by: java.time.format.DateTimeParseException: Text '1587527019680' 
> could not be parsed at index 0
> 
> 经检查 是 以下字段导致的
> {"jnlno":"e4574cce-8c9f-4d3f-974f-fc15250ec10d","ip":"122.96.41.218","channel":"pc","transdate":1587527019680,"event":"login","userid":"9","deviceid":"11","taskid":"1","retcode":"00","status":"fail"}
> 
> 其中 transdate 是使用fastjson序列化得来的
> request.put("transdate",cal.getTime());JSON.toJSONString(request),解析失败后,系统直接停止,这个觉得也不太好吧,鲁棒性不够,万一其他系统发送一个错误格式的,系统直接挂掉,感觉不太合理。
> 以上的应该怎么解决才合适。
> 
> 
 


Re: json中date类型解析失败

2020-04-21 文章 Leonard Xu
Hi,
flink支持的json format是遵循RFC标准[1]的,不支持从long型转化为json timestamp, json的 
tiemstamp类型转化可以简单参考下,这个虽然符合标准,单对用户习惯来说确实不友好,目前社区也有一个jira[2]在跟进这个问题了。关于鲁棒性的问题,json
 format有个参数支持跳过解析错误的记录,'format.ignore-parse-errors' = 'true'


```
Long time = System.currentTimeMillis();
DateFormat dateFormat =  new SimpleDateFormat("-MM-dd'T'HH:mm:ss.SSS'Z'");
Date date = new Date(time);
String jsonSchemaDate = dateFormat.format(date);
```
[1] 
https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times
[2] https://issues.apache.org/jira/browse/FLINK-16725 


Best,
Leonard Xu

> 在 2020年4月22日,12:05,王双利  写道:
> 
> 使用  flink-json -1.10.0 解析json数据报下面的错误
> 
> Caused by: java.time.format.DateTimeParseException: Text '1587527019680' 
> could not be parsed at index 0
> 
> 经检查 是 以下字段导致的
> {"jnlno":"e4574cce-8c9f-4d3f-974f-fc15250ec10d","ip":"122.96.41.218","channel":"pc","transdate":1587527019680,"event":"login","userid":"9","deviceid":"11","taskid":"1","retcode":"00","status":"fail"}
> 
> 其中 transdate 是使用fastjson序列化得来的
> request.put("transdate",cal.getTime());JSON.toJSONString(request),解析失败后,系统直接停止,这个觉得也不太好吧,鲁棒性不够,万一其他系统发送一个错误格式的,系统直接挂掉,感觉不太合理。
> 以上的应该怎么解决才合适。
> 
> 



json中date类型解析失败

2020-04-21 文章 王双利
 使用  flink-json -1.10.0 解析json数据报下面的错误

Caused by: java.time.format.DateTimeParseException: Text '1587527019680' could 
not be parsed at index 0

经检查 是 以下字段导致的
{"jnlno":"e4574cce-8c9f-4d3f-974f-fc15250ec10d","ip":"122.96.41.218","channel":"pc","transdate":1587527019680,"event":"login","userid":"9","deviceid":"11","taskid":"1","retcode":"00","status":"fail"}

其中 transdate 是使用fastjson序列化得来的
request.put("transdate",cal.getTime());JSON.toJSONString(request),解析失败后,系统直接停止,这个觉得也不太好吧,鲁棒性不够,万一其他系统发送一个错误格式的,系统直接挂掉,感觉不太合理。
以上的应该怎么解决才合适。




flink1.10关于jar包冲突问题

2020-04-21 文章 宇张
在使用Flink1.10时,遇到最多的问题就是jar包冲突问题,okio这个包flink-parent引用的就有四个版本,还有一些没办法<
exclusions>的包,请问社区有没有优化jar包冲突的提议。


Re: Re: spark代码直接运行至Flink平台

2020-04-21 文章 hsdcl...@163.com
降低用户迁移学习成本,两个框架有类似的地方,不知道比喻是否恰当,Flink平台 是否可以类似虚拟机,上层的流批应用如java,  spark就像Scala一样 




hsdcl...@163.com
 
发件人: Jeff Zhang
发送时间: 2020-04-22 10:52
收件人: user-zh
主题: Re: spark代码直接运行至Flink平台
啥目的 ?
 
hsdcl...@163.com  于2020年4月22日周三 上午9:49写道:
 
>   Hi,
>   有个脑洞大开的想法,有没有可能出一个模块功能,可以将用户写的spark代码直接运行在Flink平台
 
 
 
-- 
Best Regards
 
Jeff Zhang


Re: spark代码直接运行至Flink平台

2020-04-21 文章 Jeff Zhang
啥目的 ?

hsdcl...@163.com  于2020年4月22日周三 上午9:49写道:

>   Hi,
>   有个脑洞大开的想法,有没有可能出一个模块功能,可以将用户写的spark代码直接运行在Flink平台



-- 
Best Regards

Jeff Zhang


Re: 关于状态TTL

2020-04-21 文章 LakeShen
社区版的 Planner 针对 Key 状态的清理,使用的 Timer 来进行清理。
1.9.1 Blink planner 最底层状态清理 还是使用的 StateTTLConfig 来进行清理(不是
Background),所以存在部分状态后面没读,
状态没有清理的情况

Benchao Li  于2020年4月21日周二 下午11:15写道:

> 我对checkpoint不是很熟悉。 不过这个错误看起来是因为修改了TtlConfig导致serializer不兼容导致的,可能不是很好解决。
>
> 酷酷的浑蛋  于2020年4月21日周二 下午10:37写道:
>
> > hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错:
> > java.lang.RuntimeException: Error while getting state
> > at
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
> > at
> >
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
> > at
> >
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.(JoinRecordStateViews.java:221)
> > at
> >
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.(JoinRecordStateViews.java:205)
> > at
> >
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:85)
> > at
> >
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:88)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.util.StateMigrationException: The new state
> > serializer cannot be incompatible.
> > at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
> > at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
> > at
> >
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
> > at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
> > at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
> > at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
> > at
> >
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
> > at
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> > at
> >
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> > at
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> > at
> >
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
> > ... 10 more
> >
> >
> >
> >
> > 在2020年4月17日 15:27,酷酷的浑蛋 写道:
> > 好的,非常感谢您,我去按照您说的代码改下,非常感谢
> >
> >
> >
> >
> > 在2020年4月17日 15:17,Benchao Li 写道:
> > 嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。
> >
> > 酷酷的浑蛋  于2020年4月17日周五 下午3:09写道:
> >
> > 我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有
> > tableConfig.setIdleStateRetentionTime(Time.minutes(1),
> > Time.minutes(6));这种方式设置ttl
> >
> >
> >
> >
> > 在2020年4月17日 14:54,Benchao Li 写道:
> > 嗯,blink planner跟legacy planner是有一些实现上的差异。
> > 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:
> >
> > static StateTtlConfig createTtlConfig(long retentionTime, boolean
> > stateCleaningEnabled) {
> > if (stateCleaningEnabled) {
> > checkArgument(retentionTime > 0);
> > return StateTtlConfig
> > .newBuilder(Time.milliseconds(retentionTime))
> > .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> > .cleanupInBackground() // added this line
> > .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> > // changed this line
> > .build();
> > } else {
> > return StateTtlConfig.DISABLED;
> > }
> > }
> >
> >
> > 酷酷的浑蛋  于2020年4月17日周五 下午2:47写道:
> >
> > 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期
> >
> >
> >
> >
> > 在2020年4月17日 14:16,Benchao Li 写道:
> > 这是两个问题,
> >
> > - 状态只访问一次,可能不会清理。
> >
> >
> >
> >
> >
> 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
> > - 状态已经过期了,但是会被使用到。
> > 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-16581
> >
> > 酷酷的浑蛋  于2020年4月17日周五 下午2:06写道:
> >
> > 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
> >
> >
> >
> >
> > 在2020年4月17日 13:07,Benchao Li 写道:
> > 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
> > 所以这个问题现在是不能完全避免了。
> > 我已经建了一个jira[1]来跟踪和改进这一点。
> >
> > [1] 

spark代码直接运行至Flink平台

2020-04-21 文章 hsdcl...@163.com
  Hi,
      有个脑洞大开的想法,有没有可能出一个模块功能,可以将用户写的spark代码直接运行在Flink平台

回复: Re: flink sql string char 不兼容?

2020-04-21 文章 王双利
  我这边用kafka的AppendStream没什么问题,
改的是支持Retract模式的,KafkaTableSinkBase继承的是RetractStreamTableSink
基本是按照下面的链接的地址改的
https://www.cnblogs.com/Springmoon-venn/p/12652845.html



王双利
 
发件人: Leonard Xu
发送时间: 2020-04-22 09:03
收件人: user-zh
主题: Re: flink sql string char 不兼容?
Hi, 王双利
 
我试了下1.10.0的版本,没能复现你的异常, 如Jingsong Lees所说的char(n)到varchar已经支持了,
你能完整的贴下loginevent  的 sql吗?我再看看
 
祝好
Leonard Xu
 
> 在 2020年4月21日,22:22,Jingsong Li  写道:
> 
> Hi,
> 
> - 首先1.10中把char Insert到varchar中是支持的,可以再check下哪里有没有问题吗?
> - 'false'应该是char(5)而不是char(4)
> 
> Best,
> Jingsong Lee
> 
> On Tue, Apr 21, 2020 at 9:01 PM Leonard Xu  wrote:
> 
>> Hi
>> 
>> CHAR(n) 、VARCHAR 在SQL语义里是不同的类型,SQL里写的 ‘false’ 常量会解析到CHAR(n)
>> 因为常量长度已经确定会选择使用CHAR(n),
>> 目前是Flink还不支持CHAR(n) 和 VARCHAR 类型之间的隐式转换,所以类型检查会报错,你可以先用CAST(‘false’ as
>> VARCHAR)后处理。
>> 
>> 祝好
>> Leonard
>> 
>>> 在 2020年4月21日,18:32,王双利  写道:
>>> 
>>> hit声明的是varchar,现在是,'false'  编译的时候认为是char(4) ,导致类型不匹配
>>> 
>>> 
>>> 
>>> 王双利
>>> 
>>> 发件人: Leonard Xu
>>> 发送时间: 2020-04-21 18:29
>>> 收件人: user-zh
>>> 主题: Re: flink sql string char 不兼容?
>>> Hi
>>> Sink 对应的字段(taskid\hit)需要声明成varchar, 现在不支持varchar类型写入char(n)
>>> 
>>> 祝好,
>>> Leonard Xu
>>> 
 在 2020年4月21日,18:20,王双利  写道:
 
 下面的sql 执行的时候报 下面的错误CREATE TABLE  target (
  jnlno VARCHAR,
 -- taskid char(9),
 -- hit char(4)
 taskid VARCHAR,
  hit VARCHAR
 )
 insert into  target select  a.jnlno,'11qeq','false' from loginevent
>> a
 
 Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Type STRING of table field
>> 'hit' does not match with the physical type CHAR(4) of the 'EXPR$2' field
>> of the TableSink consumed type.
 
 Flink 版本 1.10
 怎么解决呢?Flink sql的 string char 不兼容?
 sql应该怎么写合适呢?
>> 
>> 
> 
> -- 
> Best, Jingsong Lee


Re: flink sql string char 不兼容?

2020-04-21 文章 Leonard Xu
Hi, 王双利

我试了下1.10.0的版本,没能复现你的异常, 如Jingsong Lees所说的char(n)到varchar已经支持了,
你能完整的贴下loginevent  的 sql吗?我再看看

祝好
Leonard Xu

> 在 2020年4月21日,22:22,Jingsong Li  写道:
> 
> Hi,
> 
> - 首先1.10中把char Insert到varchar中是支持的,可以再check下哪里有没有问题吗?
> - 'false'应该是char(5)而不是char(4)
> 
> Best,
> Jingsong Lee
> 
> On Tue, Apr 21, 2020 at 9:01 PM Leonard Xu  wrote:
> 
>> Hi
>> 
>> CHAR(n) 、VARCHAR 在SQL语义里是不同的类型,SQL里写的 ‘false’ 常量会解析到CHAR(n)
>> 因为常量长度已经确定会选择使用CHAR(n),
>> 目前是Flink还不支持CHAR(n) 和 VARCHAR 类型之间的隐式转换,所以类型检查会报错,你可以先用CAST(‘false’ as
>> VARCHAR)后处理。
>> 
>> 祝好
>> Leonard
>> 
>>> 在 2020年4月21日,18:32,王双利  写道:
>>> 
>>> hit声明的是varchar,现在是,'false'  编译的时候认为是char(4) ,导致类型不匹配
>>> 
>>> 
>>> 
>>> 王双利
>>> 
>>> 发件人: Leonard Xu
>>> 发送时间: 2020-04-21 18:29
>>> 收件人: user-zh
>>> 主题: Re: flink sql string char 不兼容?
>>> Hi
>>> Sink 对应的字段(taskid\hit)需要声明成varchar, 现在不支持varchar类型写入char(n)
>>> 
>>> 祝好,
>>> Leonard Xu
>>> 
 在 2020年4月21日,18:20,王双利  写道:
 
 下面的sql 执行的时候报 下面的错误CREATE TABLE  target (
  jnlno VARCHAR,
 -- taskid char(9),
 -- hit char(4)
 taskid VARCHAR,
  hit VARCHAR
 )
 insert into  target select  a.jnlno,'11qeq','false' from loginevent
>> a
 
 Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Type STRING of table field
>> 'hit' does not match with the physical type CHAR(4) of the 'EXPR$2' field
>> of the TableSink consumed type.
 
 Flink 版本 1.10
 怎么解决呢?Flink sql的 string char 不兼容?
 sql应该怎么写合适呢?
>> 
>> 
> 
> -- 
> Best, Jingsong Lee



Re: 【flink-connector-kafka】是否支持Subscribe模式

2020-04-21 文章 zhisheng
可以使用不同的 group.id 消费

i'mpossible <605769...@qq.com> 于2020年4月21日周二 下午6:12写道:

> Hi:
>  Flink支持Subscribe模式吗?用的connector版本是
> flink-connector-kafka-0.11_2.11,0.11x;
>  因为业务需要,我想要优雅下线掉TopicB,即不中断事件流;执行结果发现当Flink服务和A服务指定同一个group.id
> ,同时消费TopicA时,kafka偏移量提交失败(开启了检查点);
>
>
> 感谢解答!!!
>


Re: 如何看到他人问题

2020-04-21 文章 zhisheng
中文用户邮件列表可以看:http://apache-flink.147419.n8.nabble.com/

英文开发邮件列表可以看:http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/

英文用户邮件列表可以看:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

tison  于2020年4月21日周二 下午5:52写道:

> cc
>
>
> Leonard Xu  于2020年4月21日周二 下午5:03写道:
>
> > Hi,
> > 订阅user-zh邮件邮件组即可收到该邮件组里的所有邮件,
> > 可以发送任意内容的邮件到  user-zh-subscr...@flink.apache.org  订阅来自
> > user-zh@flink.apache.org 邮件组的邮件
> >
> > 邮件组的订阅管理,可以参考[1]
> >
> > 祝好,
> > Leonard Xu
> >
> https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list
> >
> > > 在 2020年4月21日,16:55,一袭月色 <1906286...@qq.com> 写道:
> > >
> > > 如何看到他人问题
> >
> >
>


Re: 关于状态TTL

2020-04-21 文章 Benchao Li
我对checkpoint不是很熟悉。 不过这个错误看起来是因为修改了TtlConfig导致serializer不兼容导致的,可能不是很好解决。

酷酷的浑蛋  于2020年4月21日周二 下午10:37写道:

> hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错:
> java.lang.RuntimeException: Error while getting state
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
> at
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
> at
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.(JoinRecordStateViews.java:221)
> at
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.(JoinRecordStateViews.java:205)
> at
> org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:85)
> at
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:88)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.StateMigrationException: The new state
> serializer cannot be incompatible.
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
> at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
> at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
> at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
> at
> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
> at
> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
> at
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
> ... 10 more
>
>
>
>
> 在2020年4月17日 15:27,酷酷的浑蛋 写道:
> 好的,非常感谢您,我去按照您说的代码改下,非常感谢
>
>
>
>
> 在2020年4月17日 15:17,Benchao Li 写道:
> 嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。
>
> 酷酷的浑蛋  于2020年4月17日周五 下午3:09写道:
>
> 我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有
> tableConfig.setIdleStateRetentionTime(Time.minutes(1),
> Time.minutes(6));这种方式设置ttl
>
>
>
>
> 在2020年4月17日 14:54,Benchao Li 写道:
> 嗯,blink planner跟legacy planner是有一些实现上的差异。
> 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:
>
> static StateTtlConfig createTtlConfig(long retentionTime, boolean
> stateCleaningEnabled) {
> if (stateCleaningEnabled) {
> checkArgument(retentionTime > 0);
> return StateTtlConfig
> .newBuilder(Time.milliseconds(retentionTime))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .cleanupInBackground() // added this line
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> // changed this line
> .build();
> } else {
> return StateTtlConfig.DISABLED;
> }
> }
>
>
> 酷酷的浑蛋  于2020年4月17日周五 下午2:47写道:
>
> 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期
>
>
>
>
> 在2020年4月17日 14:16,Benchao Li 写道:
> 这是两个问题,
>
> - 状态只访问一次,可能不会清理。
>
>
>
>
> 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
> - 状态已经过期了,但是会被使用到。
> 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。
>
> [1] https://issues.apache.org/jira/browse/FLINK-16581
>
> 酷酷的浑蛋  于2020年4月17日周五 下午2:06写道:
>
> 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
>
>
>
>
> 在2020年4月17日 13:07,Benchao Li 写道:
> 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
> 所以这个问题现在是不能完全避免了。
> 我已经建了一个jira[1]来跟踪和改进这一点。
>
> [1] https://issues.apache.org/jira/browse/FLINK-17199
>
> 酷酷的浑蛋  于2020年4月17日周五 下午12:51写道:
>
>
>
>
>
> 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
> 在2020年4月16日 15:28,酷酷的浑蛋 写道:
> 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
>
>
>
>
> 在2020年4月15日 18:04,Benchao Li 写道:
> Hi,
>
> 你用的是哪个版本呢?
> 在1.9版本里面的确是有点问题,默认没有开启cleanup in background
> [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。
>
> [1] https://issues.apache.org/jira/browse/FLINK-15938
>
> 酷酷的浑蛋  于2020年4月15日周三 下午5:40写道:
>
>
>
> 我在flink sql中设置了
> 

回复: 关于状态TTL

2020-04-21 文章 酷酷的浑蛋
hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错:
java.lang.RuntimeException: Error while getting state
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
at 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.(JoinRecordStateViews.java:221)
at 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.(JoinRecordStateViews.java:205)
at 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:85)
at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:88)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer cannot be incompatible.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
... 10 more




在2020年4月17日 15:27,酷酷的浑蛋 写道:
好的,非常感谢您,我去按照您说的代码改下,非常感谢




在2020年4月17日 15:17,Benchao Li 写道:
嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。

酷酷的浑蛋  于2020年4月17日周五 下午3:09写道:

我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有
tableConfig.setIdleStateRetentionTime(Time.minutes(1),
Time.minutes(6));这种方式设置ttl




在2020年4月17日 14:54,Benchao Li 写道:
嗯,blink planner跟legacy planner是有一些实现上的差异。
如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:

static StateTtlConfig createTtlConfig(long retentionTime, boolean
stateCleaningEnabled) {
if (stateCleaningEnabled) {
checkArgument(retentionTime > 0);
return StateTtlConfig
.newBuilder(Time.milliseconds(retentionTime))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupInBackground() // added this line
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// changed this line
.build();
} else {
return StateTtlConfig.DISABLED;
}
}


酷酷的浑蛋  于2020年4月17日周五 下午2:47写道:

我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期




在2020年4月17日 14:16,Benchao Li 写道:
这是两个问题,

- 状态只访问一次,可能不会清理。



这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
- 状态已经过期了,但是会被使用到。
这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。

[1] https://issues.apache.org/jira/browse/FLINK-16581

酷酷的浑蛋  于2020年4月17日周五 下午2:06写道:

其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了




在2020年4月17日 13:07,Benchao Li 写道:
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。

[1] https://issues.apache.org/jira/browse/FLINK-17199

酷酷的浑蛋  于2020年4月17日周五 下午12:51写道:





我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
在2020年4月16日 15:28,酷酷的浑蛋 写道:
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋  于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a





当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730

回复: 关于状态TTL

2020-04-21 文章 酷酷的浑蛋
hello,我按照您说的方式改了源码,增加了那两行代码,然后任务用savepoint停止,在从savepoint启动,就报下面的错:
java.lang.RuntimeException: Error while getting state
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:119)
at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getMapState(StreamingRuntimeContext.java:179)
at 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.(JoinRecordStateViews.java:221)
at 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews$InputSideHasNoUniqueKey.(JoinRecordStateViews.java:205)
at 
org.apache.flink.table.runtime.operators.join.stream.state.JoinRecordStateViews.create(JoinRecordStateViews.java:85)
at 
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.open(StreamingJoinOperator.java:88)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:532)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:396)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer cannot be incompatible.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:534)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:482)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:643)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createMapState(TtlStateFactory.java:155)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:279)
at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:328)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getMapState(DefaultKeyedStateStore.java:116)
... 10 more




在2020年4月17日 15:27,酷酷的浑蛋 写道:
好的,非常感谢您,我去按照您说的代码改下,非常感谢




在2020年4月17日 15:17,Benchao Li 写道:
嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。

酷酷的浑蛋  于2020年4月17日周五 下午3:09写道:

我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有
tableConfig.setIdleStateRetentionTime(Time.minutes(1),
Time.minutes(6));这种方式设置ttl




在2020年4月17日 14:54,Benchao Li 写道:
嗯,blink planner跟legacy planner是有一些实现上的差异。
如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:

static StateTtlConfig createTtlConfig(long retentionTime, boolean
stateCleaningEnabled) {
if (stateCleaningEnabled) {
checkArgument(retentionTime > 0);
return StateTtlConfig
.newBuilder(Time.milliseconds(retentionTime))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupInBackground() // added this line
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// changed this line
.build();
} else {
return StateTtlConfig.DISABLED;
}
}


酷酷的浑蛋  于2020年4月17日周五 下午2:47写道:

我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期




在2020年4月17日 14:16,Benchao Li 写道:
这是两个问题,

- 状态只访问一次,可能不会清理。



这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
- 状态已经过期了,但是会被使用到。
这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。

[1] https://issues.apache.org/jira/browse/FLINK-16581

酷酷的浑蛋  于2020年4月17日周五 下午2:06写道:

其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了




在2020年4月17日 13:07,Benchao Li 写道:
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。

[1] https://issues.apache.org/jira/browse/FLINK-17199

酷酷的浑蛋  于2020年4月17日周五 下午12:51写道:





我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
在2020年4月16日 15:28,酷酷的浑蛋 写道:
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋  于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a





当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730

Re: flink sql string char 不兼容?

2020-04-21 文章 Jingsong Li
Hi,

- 首先1.10中把char Insert到varchar中是支持的,可以再check下哪里有没有问题吗?
- 'false'应该是char(5)而不是char(4)

Best,
Jingsong Lee

On Tue, Apr 21, 2020 at 9:01 PM Leonard Xu  wrote:

> Hi
>
> CHAR(n) 、VARCHAR 在SQL语义里是不同的类型,SQL里写的 ‘false’ 常量会解析到CHAR(n)
> 因为常量长度已经确定会选择使用CHAR(n),
> 目前是Flink还不支持CHAR(n) 和 VARCHAR 类型之间的隐式转换,所以类型检查会报错,你可以先用CAST(‘false’ as
> VARCHAR)后处理。
>
> 祝好
> Leonard
>
> > 在 2020年4月21日,18:32,王双利  写道:
> >
> > hit声明的是varchar,现在是,'false'  编译的时候认为是char(4) ,导致类型不匹配
> >
> >
> >
> > 王双利
> >
> > 发件人: Leonard Xu
> > 发送时间: 2020-04-21 18:29
> > 收件人: user-zh
> > 主题: Re: flink sql string char 不兼容?
> > Hi
> > Sink 对应的字段(taskid\hit)需要声明成varchar, 现在不支持varchar类型写入char(n)
> >
> > 祝好,
> > Leonard Xu
> >
> >> 在 2020年4月21日,18:20,王双利  写道:
> >>
> >> 下面的sql 执行的时候报 下面的错误CREATE TABLE  target (
> >>   jnlno VARCHAR,
> >> -- taskid char(9),
> >> -- hit char(4)
> >>  taskid VARCHAR,
> >>   hit VARCHAR
> >> )
> >> insert into  target select  a.jnlno,'11qeq','false' from loginevent
> a
> >>
> >> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Type STRING of table field
> 'hit' does not match with the physical type CHAR(4) of the 'EXPR$2' field
> of the TableSink consumed type.
> >>
> >> Flink 版本 1.10
> >> 怎么解决呢?Flink sql的 string char 不兼容?
> >> sql应该怎么写合适呢?
>
>

-- 
Best, Jingsong Lee


Re: flink sql string char 不兼容?

2020-04-21 文章 Leonard Xu
Hi

CHAR(n) 、VARCHAR 在SQL语义里是不同的类型,SQL里写的 ‘false’ 常量会解析到CHAR(n) 
因为常量长度已经确定会选择使用CHAR(n),
目前是Flink还不支持CHAR(n) 和 VARCHAR 类型之间的隐式转换,所以类型检查会报错,你可以先用CAST(‘false’ as 
VARCHAR)后处理。

祝好
Leonard

> 在 2020年4月21日,18:32,王双利  写道:
> 
> hit声明的是varchar,现在是,'false'  编译的时候认为是char(4) ,导致类型不匹配
> 
> 
> 
> 王双利
> 
> 发件人: Leonard Xu
> 发送时间: 2020-04-21 18:29
> 收件人: user-zh
> 主题: Re: flink sql string char 不兼容?
> Hi 
> Sink 对应的字段(taskid\hit)需要声明成varchar, 现在不支持varchar类型写入char(n)
> 
> 祝好,
> Leonard Xu
> 
>> 在 2020年4月21日,18:20,王双利  写道:
>> 
>> 下面的sql 执行的时候报 下面的错误CREATE TABLE  target (
>>   jnlno VARCHAR,
>> -- taskid char(9),
>> -- hit char(4)
>>  taskid VARCHAR,
>>   hit VARCHAR
>> ) 
>> insert into  target select  a.jnlno,'11qeq','false' from loginevent a
>> 
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> Type STRING of table field 'hit' does not match with the physical type 
>> CHAR(4) of the 'EXPR$2' field of the TableSink consumed type.
>> 
>> Flink 版本 1.10
>> 怎么解决呢?Flink sql的 string char 不兼容?
>> sql应该怎么写合适呢?



Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题

2020-04-21 文章 Benchao Li
Hi 首维,

这里说的是同一个key会创建一次,所以你看到一个AggFunction会创建多次更这个并不冲突,因为一个AggFunction属于一个subtask维度的,一个subtask处理不止一个key的。

你的第二个问题:
> 我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group
by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来
这个的确是存在这个问题。比如两个group by嵌套使用,第一个group by由于会更新结果,所以会retract之前的结果。第二个group
by收到retract就会先撤销当前的结果,然后再发送撤销之后的结果;然后后面又会来一条正常的append数据,这条数据导致再次retract一次,和append一次。

> 还有在agg over group by的时候是不是createAccumulator的生命周期会像窗口聚合的时候生命周期会改变吗。
这个没有本质区别,正常情况下一个key下的agg是会一直存在的。除非是你配置了state
retention时间,那么对应的key的state如果过了retention时间没有访问就会被清理。

刘首维  于2020年4月21日周二 下午5:59写道:

> Hi benchao,
>
>
> 非常感谢你的回复,我说一下我产生疑问的原因吧。首先我的朴素思路是createAccumlator针对一个group
> by的一个key应该被创建一次,可是我做实验的时候(在create
> acc的时候打印日志),我发现日志打印了不止一次,而且我确认是同一线程的同一AggFunction对象打印的。而且我也翻看了代码可以看到accumulator确实是以一个state方式存储的,每次都会先尝试从state取,就更不应该出现多次打印createAccmulator日志的问题了。
>
>
>   为了方便你帮我分析,我来补充一下环境和场景:
>
>
>版本: 1.7.2/1.9.1
>
>   场景 : group by 嵌套, 常规聚合
>
>
>
>   我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group
> by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来。还有在agg over group
> by的时候是不是createAccumulator的生命周期会像窗口聚合的时候生命周期会改变吗。
>
>
> 再次感谢你的回复
>
> best regards
> --
> *发件人:* Benchao Li 
> *发送时间:* 2020年4月21日 17:45:54
> *收件人:* user-zh
> *主题:* Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题
>
> Hi 首维,
>
> 这是个很好的问题。
>
> > 这个方法的调用时机是什么呢,会被调用几次呢?
> 这个调用的时机是每个key的第一条数据来的时候,会创建一个accumulator。创建的次数大约是key的数量。
> 当然这里说的是regular groupby;
> 如果是window group by的话,就是每个window都会做上面的这个事情。
>
> > 一个accumulator的生命周期是怎么样的?
> 如果是window group by的话,那它的生命周期就是跟window是一样的。
> 如果是regular groupby的话,可以认为是全局的。除非有一条数据retract掉了当前的结果之后,等于被聚合的
> 数据是0条的时候,也会销毁。
>
> > 一个accumulator会被反复的序列化反序列化吗?
> 这个问题非常好。它是否序列化跟你用的state backend有关系。
> 如果是rocksdb的话,是会每次更新都会存一次state。如果不是rocksdb,它就是一个内存里的数据结构,只有在
> checkpoint的时候序列化。
> 当然这个在regular groupby的情况下,开启mini batch会减少每次都需要更新state的情况。
>
> 刘首维  于2020年4月21日周二 下午5:37写道:
>
> > Hi all,
> >
> >
> >
> >最近有几个疑问没能很好地理解清楚:
> >
> >
> >
> >我们都知道,UDAF中的有createAccumulator这个方法,那么:
> >
> > 这个方法的调用时机是什么呢,会被调用几次呢?
> >
> > 一个accumulator的生命周期是怎么样的?
> >
> > 一个accumulator会被反复的序列化反序列化吗?
> >
> >
> >  麻烦了解相关细节的社区的同学们帮忙解答一下~
> >
> > 先谢谢啦
> >
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


回复: Re: flink sql string char 不兼容?

2020-04-21 文章 王双利
hit声明的是varchar,现在是,'false'  编译的时候认为是char(4) ,导致类型不匹配



王双利
 
发件人: Leonard Xu
发送时间: 2020-04-21 18:29
收件人: user-zh
主题: Re: flink sql string char 不兼容?
Hi 
Sink 对应的字段(taskid\hit)需要声明成varchar, 现在不支持varchar类型写入char(n)
 
祝好,
Leonard Xu
 
> 在 2020年4月21日,18:20,王双利  写道:
> 
> 下面的sql 执行的时候报 下面的错误CREATE TABLE  target (
>jnlno VARCHAR,
> -- taskid char(9),
> -- hit char(4)
>   taskid VARCHAR,
>hit VARCHAR
> ) 
> insert into  target select  a.jnlno,'11qeq','false' from loginevent a
> 
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type STRING of table field 'hit' does not match with the physical type 
> CHAR(4) of the 'EXPR$2' field of the TableSink consumed type.
> 
> Flink 版本 1.10
>  怎么解决呢?Flink sql的 string char 不兼容?
> sql应该怎么写合适呢?


Re: flink sql string char 不兼容?

2020-04-21 文章 Benchao Li
写成varchar应该就可以了。

王双利  于2020年4月21日周二 下午6:21写道:

> 下面的sql 执行的时候报 下面的错误CREATE TABLE  target (
> jnlno VARCHAR,
> -- taskid char(9),
> -- hit char(4)
>taskid VARCHAR,
> hit VARCHAR
> )
> insert into  target select  a.jnlno,'11qeq','false' from loginevent a
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type STRING of table field 'hit' does not match with the physical type
> CHAR(4) of the 'EXPR$2' field of the TableSink consumed type.
>
> Flink 版本 1.10
>   怎么解决呢?Flink sql的 string char 不兼容?
> sql应该怎么写合适呢?
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: flink sql string char 不兼容?

2020-04-21 文章 Leonard Xu
Hi 
Sink 对应的字段(taskid\hit)需要声明成varchar, 现在不支持varchar类型写入char(n)

祝好,
Leonard Xu

> 在 2020年4月21日,18:20,王双利  写道:
> 
> 下面的sql 执行的时候报 下面的错误CREATE TABLE  target (
>jnlno VARCHAR,
> -- taskid char(9),
> -- hit char(4)
>   taskid VARCHAR,
>hit VARCHAR
> ) 
> insert into  target select  a.jnlno,'11qeq','false' from loginevent a
> 
> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
> Type STRING of table field 'hit' does not match with the physical type 
> CHAR(4) of the 'EXPR$2' field of the TableSink consumed type.
> 
> Flink 版本 1.10
>  怎么解决呢?Flink sql的 string char 不兼容?
> sql应该怎么写合适呢?



Re:Re: Re: flink启动任务的方式

2020-04-21 文章 chenxuying
您说的jarFiles是以什么样的方式提交任务
然后我试了一下plugin,好像并不可以,重启flink cluster也不行 , 也不知是不是我的方式不对
我的目录结构是
xxx/flink/plugins/
folder1/
udf.jar


另外说一下,如果我把udf.jar放到 
/flink/lib下,重启是可以的,不过这不是我想要的方式,不知道您是否理解,因为我想要的我随时可以写个udf.jar,随时可以用,不要重启flink 
cluster

在 2020-04-21 17:46:00,"Arnold Zai"  写道:
>jarFiles参数不是个参数列表么,多传几个。
>
>或把依赖提前部署到${FLINK_HOME}/plugins里
>
>chenxuying  于2020年4月21日周二 下午3:36写道:
>
>> 这个是可以 , 不过我们的需求不允许打FatJar
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-04-21 15:27:48,"Arnold Zai"  写道:
>> >打个FatJar
>> >
>> >chenxuying  于2020年4月21日周二 下午2:47写道:
>> >
>> >> 请问下目前flink的启动方式有哪些
>> >> 1 通过命令行来执行
>> >> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c
>> >> cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
>> >> 2通过自带的webui页面上传jar , submit jar
>> >> 3 通过代码 createRemoteEnvironment
>> >>
>> >> 目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api
>> >> 无法实现命令行那样提供其他的jar包
>> >>
>> >>
>> >>
>> >>
>>


flink sql string char 不兼容?

2020-04-21 文章 王双利
下面的sql 执行的时候报 下面的错误CREATE TABLE  target (
jnlno VARCHAR,
-- taskid char(9),
-- hit char(4)
   taskid VARCHAR,
hit VARCHAR
) 
insert into  target select  a.jnlno,'11qeq','false' from loginevent a

Exception in thread "main" org.apache.flink.table.api.ValidationException: Type 
STRING of table field 'hit' does not match with the physical type CHAR(4) of 
the 'EXPR$2' field of the TableSink consumed type.

Flink 版本 1.10
  怎么解决呢?Flink sql的 string char 不兼容?
sql应该怎么写合适呢?


??flink-connector-kafka??????????Subscribe????

2020-04-21 文章 i'mpossible
Hi??
  
FlinkSubscribeconnector??flink-connector-kafka-0.11_2.11??0.11x??
  
??TopicBFlink??A??group.id??TopicAkafka



  ??

答复: 关于Flink SQL UDAF中调用createAccumulator的调用问题

2020-04-21 文章 刘首维
Hi benchao,


非常感谢你的回复,我说一下我产生疑问的原因吧。首先我的朴素思路是createAccumlator针对一个group 
by的一个key应该被创建一次,可是我做实验的时候(在create 
acc的时候打印日志),我发现日志打印了不止一次,而且我确认是同一线程的同一AggFunction对象打印的。而且我也翻看了代码可以看到accumulator确实是以一个state方式存储的,每次都会先尝试从state取,就更不应该出现多次打印createAccmulator日志的问题了。


  为了方便你帮我分析,我来补充一下环境和场景:


   版本: 1.7.2/1.9.1

  场景 : group by 嵌套, 常规聚合



  我个人理解Flink Streaming SQL其实实现上物化视图的增量更新算法很像(这块不确定思考的正确性),那么在嵌套group 
by或者接join后是不是会有更新放大(amplify)问题,同时由于这个问题导致中间状态暴露出来。还有在agg over group 
by的时候是不是createAccumulator的生命周期会像窗口聚合的时候生命周期会改变吗。


再次感谢你的回复

best regards


发件人: Benchao Li 
发送时间: 2020年4月21日 17:45:54
收件人: user-zh
主题: Re: 关于Flink SQL UDAF中调用createAccumulator的调用问题

Hi 首维,

这是个很好的问题。

> 这个方法的调用时机是什么呢,会被调用几次呢?
这个调用的时机是每个key的第一条数据来的时候,会创建一个accumulator。创建的次数大约是key的数量。
当然这里说的是regular groupby;
如果是window group by的话,就是每个window都会做上面的这个事情。

> 一个accumulator的生命周期是怎么样的?
如果是window group by的话,那它的生命周期就是跟window是一样的。
如果是regular groupby的话,可以认为是全局的。除非有一条数据retract掉了当前的结果之后,等于被聚合的
数据是0条的时候,也会销毁。

> 一个accumulator会被反复的序列化反序列化吗?
这个问题非常好。它是否序列化跟你用的state backend有关系。
如果是rocksdb的话,是会每次更新都会存一次state。如果不是rocksdb,它就是一个内存里的数据结构,只有在
checkpoint的时候序列化。
当然这个在regular groupby的情况下,开启mini batch会减少每次都需要更新state的情况。

刘首维  于2020年4月21日周二 下午5:37写道:

> Hi all,
>
>
>
>最近有几个疑问没能很好地理解清楚:
>
>
>
>我们都知道,UDAF中的有createAccumulator这个方法,那么:
>
> 这个方法的调用时机是什么呢,会被调用几次呢?
>
> 一个accumulator的生命周期是怎么样的?
>
> 一个accumulator会被反复的序列化反序列化吗?
>
>
>  麻烦了解相关细节的社区的同学们帮忙解答一下~
>
> 先谢谢啦
>


--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: flink checkpoint savepoint问题

2020-04-21 文章 Yun Tang
Hi
原因是因为新增字段或者修改字段类型后,新的serializer无法(反)序列化原先存储的数据,对于这种有字段增改需求的场景,目前Flink社区主要借助于Pojo或者avro来实现
 [1],建议对相关的state schema做重新规划,以满足这种有后续升级需求的场景。

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html

祝好
唐云


From: xyq 
Sent: Tuesday, April 21, 2020 14:37
To: user-zh 
Subject: flink checkpoint savepoint问题

hello 
我是用flink1.10的ddl的双流窗口join,但是当我新增字段或修改字段类型,把程序重启后,无论是从savepoint处还是checkpoint处重启都是失败,最后只能删除掉checkpoint或savepoint才能使用,但是这样会丢些数据,请问该怎么处理,非常感谢?


报错如下:
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_33601e3dd532edccff92bfce124910c6_(1/3) from 
any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 common frames omitted
Caused by: org.apache.flink.util.StateMigrationException: The new key 
serializer must be compatible.
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:180)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
... 15 common frames omitted


Re: 如何看到他人问题

2020-04-21 文章 tison
cc


Leonard Xu  于2020年4月21日周二 下午5:03写道:

> Hi,
> 订阅user-zh邮件邮件组即可收到该邮件组里的所有邮件,
> 可以发送任意内容的邮件到  user-zh-subscr...@flink.apache.org  订阅来自
> user-zh@flink.apache.org 邮件组的邮件
>
> 邮件组的订阅管理,可以参考[1]
>
> 祝好,
> Leonard Xu
> https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list
>
> > 在 2020年4月21日,16:55,一袭月色 <1906286...@qq.com> 写道:
> >
> > 如何看到他人问题
>
>


Re: Re: flink启动任务的方式

2020-04-21 文章 tison
REST API jar run endpoint 不支持关联其他 jar 听起来是个问题。FatJar 是一种解决方案,这个可以提到 JIRA
上作为需求(x

Best,
tison.


Arnold Zai  于2020年4月21日周二 下午5:46写道:

> jarFiles参数不是个参数列表么,多传几个。
>
> 或把依赖提前部署到${FLINK_HOME}/plugins里
>
> chenxuying  于2020年4月21日周二 下午3:36写道:
>
> > 这个是可以 , 不过我们的需求不允许打FatJar
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 在 2020-04-21 15:27:48,"Arnold Zai"  写道:
> > >打个FatJar
> > >
> > >chenxuying  于2020年4月21日周二 下午2:47写道:
> > >
> > >> 请问下目前flink的启动方式有哪些
> > >> 1 通过命令行来执行
> > >> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c
> > >> cn.xxx.flink.table.sql.Job
> /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
> > >> 2通过自带的webui页面上传jar , submit jar
> > >> 3 通过代码 createRemoteEnvironment
> > >>
> > >> 目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api
> > >> 无法实现命令行那样提供其他的jar包
> > >>
> > >>
> > >>
> > >>
> >
>


Re: Re: flink启动任务的方式

2020-04-21 文章 Arnold Zai
jarFiles参数不是个参数列表么,多传几个。

或把依赖提前部署到${FLINK_HOME}/plugins里

chenxuying  于2020年4月21日周二 下午3:36写道:

> 这个是可以 , 不过我们的需求不允许打FatJar
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-04-21 15:27:48,"Arnold Zai"  写道:
> >打个FatJar
> >
> >chenxuying  于2020年4月21日周二 下午2:47写道:
> >
> >> 请问下目前flink的启动方式有哪些
> >> 1 通过命令行来执行
> >> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c
> >> cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
> >> 2通过自带的webui页面上传jar , submit jar
> >> 3 通过代码 createRemoteEnvironment
> >>
> >> 目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api
> >> 无法实现命令行那样提供其他的jar包
> >>
> >>
> >>
> >>
>


关于Flink SQL UDAF中调用createAccumulator的调用问题

2020-04-21 文章 刘首维
Hi all,



   最近有几个疑问没能很好地理解清楚:



   我们都知道,UDAF中的有createAccumulator这个方法,那么:

这个方法的调用时机是什么呢,会被调用几次呢?

一个accumulator的生命周期是怎么样的?

一个accumulator会被反复的序列化反序列化吗?


 麻烦了解相关细节的社区的同学们帮忙解答一下~

先谢谢啦


Re: 如何看到他人问题

2020-04-21 文章 Leonard Xu
Hi,
订阅user-zh邮件邮件组即可收到该邮件组里的所有邮件,
可以发送任意内容的邮件到  user-zh-subscr...@flink.apache.org  订阅来自 user-zh@flink.apache.org 
邮件组的邮件

邮件组的订阅管理,可以参考[1]

祝好,
Leonard Xu
https://flink.apache.org/community.html#how-to-subscribe-to-a-mailing-list

> 在 2020年4月21日,16:55,一袭月色 <1906286...@qq.com> 写道:
> 
> 如何看到他人问题



????????????????

2020-04-21 文章 ????????


Re: 关于StreamingFileSink

2020-04-21 文章 Leonard Xu
Hello,图挂了,可以搞个图床了挂链接到邮件列表。。。
另外问下为什么不从最新的cp开始恢复作业呢?这样我理解会有脏数据吧。

> 在 2020年4月19日,23:23,Yun Gao  写道:
> 
>  Hello~ 想再确认一下预期的行为:现在是希望后面重新写之后,用新写过的part-xx来覆盖之前生成的文件么~?
> 
> 
> --
> From:酷酷的浑蛋 
> Send Time:2020 Apr. 18 (Sat.) 20:32
> To:user-zh 
> Subject:关于StreamingFileSink
> 
> 
> 我在用StreamingFileSink 
> 往hdfs写数据的时候,如果任务停止了,从前面的某个checkpoint启动(不是最新checkpoint),就会发生下面的情况:
> 
> 
> 其中part-4-9/part-4-13/part-4-14 
> 这几个文件已经在最新checkpoint时生成了,任务从前面某个checkpoint启动后,继续生成part-xx文件,但是xx(文件编号)不会从最新开始,这样就导致新生成的.part-4-13.inprogressx/part-4-14.inprogressx最终不会变成完成状态,而且hive读取不到点'.'开头的文件,有什么方式可以避免这样的情况,难道只能手动去改文件名吗
> 



Re:Re: flink启动任务的方式

2020-04-21 文章 chenxuying
这个是可以 , 不过我们的需求不允许打FatJar

















在 2020-04-21 15:27:48,"Arnold Zai"  写道:
>打个FatJar
>
>chenxuying  于2020年4月21日周二 下午2:47写道:
>
>> 请问下目前flink的启动方式有哪些
>> 1 通过命令行来执行
>> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c
>> cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
>> 2通过自带的webui页面上传jar , submit jar
>> 3 通过代码 createRemoteEnvironment
>>
>> 目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api
>> 无法实现命令行那样提供其他的jar包
>>
>>
>>
>>


Re: flink启动任务的方式

2020-04-21 文章 Arnold Zai
打个FatJar

chenxuying  于2020年4月21日周二 下午2:47写道:

> 请问下目前flink的启动方式有哪些
> 1 通过命令行来执行
> flink run -C file:///usr/local/soft/flink/function-0.1.jar -c
> cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
> 2通过自带的webui页面上传jar , submit jar
> 3 通过代码 createRemoteEnvironment
>
> 目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api
> 无法实现命令行那样提供其他的jar包
>
>
>
>


flink启动任务的方式

2020-04-21 文章 chenxuying
请问下目前flink的启动方式有哪些
1 通过命令行来执行
flink run -C file:///usr/local/soft/flink/function-0.1.jar -c 
cn.xxx.flink.table.sql.Job /usr/local/soft/flink/flink-1.0-SNAPSHOT.jar
2通过自带的webui页面上传jar , submit jar
3 通过代码 createRemoteEnvironment


目前主要使用的是,通过代码请求rest api http请求来实现上传jar,跑任务, 但是目前遇到的问题是,通过rest api 
无法实现命令行那样提供其他的jar包 

flink checkpoint savepoint问题

2020-04-21 文章 xyq
hello 
我是用flink1.10的ddl的双流窗口join,但是当我新增字段或修改字段类型,把程序重启后,无论是从savepoint处还是checkpoint处重启都是失败,最后只能删除掉checkpoint或savepoint才能使用,但是这样会丢些数据,请问该怎么处理,非常感谢?


报错如下:
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_33601e3dd532edccff92bfce124910c6_(1/3) from 
any of the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 common frames omitted
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 common frames omitted
Caused by: org.apache.flink.util.StateMigrationException: The new key 
serializer must be compatible.
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:194)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:180)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:167)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
... 15 common frames omitted