Re: 关于flink检查点

2020-04-17 Thread coke half
你好,我现在了解到有对检查点开销的问题建模中考虑到恢复时间等开销,当负载大时检查点间隔会缩短。其实问题也就是,在大负载情况下自动缩短检查点间隔这个动作在实际业务场景中有意义吗?谢谢


发件人: Lee Sysuke 
发送时间: Friday, April 17, 2020 10:41:42 AM
收件人: user-zh 
主题: Re: 关于flink检查点

一点个人看法:
一般业务场景下,大家都需要对流任务的错误范围有个比较确定性的认知。比如设置固定的5min周期,就可以比较确定流处理即使failover,误差也能控制在五分钟内。
但如果是自适应的间隔,负载越高周期越长,但实际failover在高负载下出现的概率应该远大于低负载,这样的设置实用价值可能就并不太大了。

half coke  于2020年4月15日周三 下午4:15写道:

> 是的,根据任务负载的变化自动调整checkpoint的间隔,或者可以通过用户写的逻辑调整检查点。
> 刚开始学习flink,想请教一下。
>
> Congxian Qiu  于2020年4月15日周三 下午12:33写道:
>
> > hi
> >
> > 你说的间隔自适应是指什么呢?是指做 checkpoint 的间隔自动调整吗?
> >
> > Best,
> > Congxian
> >
> >
> > half coke  于2020年4月15日周三 下午12:24写道:
> >
> > > 请问下为什么flink没有支持自适应检查点间隔呢?是出于什么样的考虑吗?
> > >
> >
>


Re: Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread Benchao Li
这个按照目前的设计,应该不能算是bug,应该是by desigh的。
主要问题还是因为有两层agg,第一层的agg的retract会导致第二层的agg重新计算和下发结果。

dixingxing85 于2020年4月18日 周六上午11:38写道:

> 多谢benchao,
> 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如:
> 20200417,86
> 20200417,90
> 20200417,130
> 20200417,131
>
> 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的:
> 20200417,90
> 20200417,86
> 20200417,130
> 20200417,86
> 20200417,131
>
> 我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。
> 如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧?
>
> Sent from my iPhone
>
> On Apr 18, 2020, at 10:08, Benchao Li  wrote:
>
> 
>
> Hi,
>
> 这个是支持的哈。
> 你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new].
> 如果是两层的话,就成了:
> 第一层-[old], 第二层-[cur], +[old]
> 第一层+[new], 第二层[-old], +[new]
>
> dixingxin...@163.com  于2020年4月18日周六 上午2:11写道:
>
>>
>> Hi all:
>>
>> 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,*我们想确认下,这是否是个bug,
>> 或者flink还不支持这种sql*。
>> 具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A ->
>> dt,  B -> pvareaid)
>>
>> SELECT dt, SUM(a.uv) AS uv
>> FROM (
>>SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
>>FROM streaming_log_event
>>WHERE action IN ('action1')
>>   AND pvareaid NOT IN ('pv1', 'pv2')
>>   AND pvareaid IS NOT NULL
>>GROUP BY dt, pvareaid
>> ) a
>> GROUP BY dt;
>>
>> sink接收到的数据对应日志为:
>>
>> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(false,0,86,20200417)
>> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(true,0,130,20200417)
>> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(false,0,130,20200417)
>> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(true,0,86,20200417)
>> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(false,0,86,20200417)
>> 2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(true,0,131,20200417)
>>
>>
>> 我们使用的是1.7.2, 测试作业的并行度为1。
>> 这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228
>>
>>
>> --
>> dixingxin...@163.com
>>
>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
> --

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread Benchao Li
这个按照目前的设计,应该不能算是bug,应该是by desigh的。
主要问题还是因为有两层agg,第一层的agg的retract会导致第二层的agg重新计算和下发结果。

dixingxing85 于2020年4月18日 周六上午11:38写道:

> 多谢benchao,
> 我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如:
> 20200417,86
> 20200417,90
> 20200417,130
> 20200417,131
>
> 而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的:
> 20200417,90
> 20200417,86
> 20200417,130
> 20200417,86
> 20200417,131
>
> 我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。
> 如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧?
>
> Sent from my iPhone
>
> On Apr 18, 2020, at 10:08, Benchao Li  wrote:
>
> 
>
> Hi,
>
> 这个是支持的哈。
> 你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new].
> 如果是两层的话,就成了:
> 第一层-[old], 第二层-[cur], +[old]
> 第一层+[new], 第二层[-old], +[new]
>
> dixingxin...@163.com  于2020年4月18日周六 上午2:11写道:
>
>>
>> Hi all:
>>
>> 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,*我们想确认下,这是否是个bug,
>> 或者flink还不支持这种sql*。
>> 具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A ->
>> dt,  B -> pvareaid)
>>
>> SELECT dt, SUM(a.uv) AS uv
>> FROM (
>>SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
>>FROM streaming_log_event
>>WHERE action IN ('action1')
>>   AND pvareaid NOT IN ('pv1', 'pv2')
>>   AND pvareaid IS NOT NULL
>>GROUP BY dt, pvareaid
>> ) a
>> GROUP BY dt;
>>
>> sink接收到的数据对应日志为:
>>
>> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(false,0,86,20200417)
>> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(true,0,130,20200417)
>> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(false,0,130,20200417)
>> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(true,0,86,20200417)
>> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(false,0,86,20200417)
>> 2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(true,0,131,20200417)
>>
>>
>> 我们使用的是1.7.2, 测试作业的并行度为1。
>> 这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228
>>
>>
>> --
>> dixingxin...@163.com
>>
>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
> --

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-17 Thread tison
Hi Yang,

Name filtering & schema special handling makes sense for me. We can enrich
later if there is requirement without breaking interface.

For #1, from my perspective your first proposal is

  having an option specifies remote flink/lib, then we turn off auto
uploading local flink/lib and register that path as local resources

It seems we here add another special logic for handling one kind of
things...what I propose is we do these two steps explicitly separated:

1. an option turns off auto uploading local flink/lib
2. a general option register remote files as local resources

The rest thing here is that you propose we handle flink/lib as PUBLIC
visibility while other files as APPLICATION visibility, whether a
composite configuration or name filtering to special handle libs makes
sense though.

YarnClusterDescriptor already has a lot of special handling logics which
introduce a number of config options and keys, which should
have been configured in few of common options and validated at the runtime.

Best,
tison.


Yang Wang  于2020年4月17日周五 下午11:42写道:

> Hi tison,
>
> For #3, if you mean registering remote HDFS file as local resource, we
> should make the "-yt/--yarnship"
> to support remote directory. I think it is the right direction.
>
> For #1, if the users could ship remote directory, then they could also
> specify like this
> "-yt hdfs://hdpdev/flink/release/flink-1.x,
> hdfs://hdpdev/user/someone/mylib". Do you mean we add an
> option for whether trying to avoid unnecessary uploading? Maybe we could
> filter by names and file size.
> I think this is a good suggestion, and we do not need to introduce a new
> config option "-ypl".
>
> For #2, for flink-dist, the #1 could already solve the problem. We do not
> need to support remote schema.
> It will confuse the users when we only support HDFS, not S3, OSS, etc.
>
>
> Best,
> Yang
>
> tison  于2020年4月17日周五 下午8:05写道:
>
>> Hi Yang,
>>
>> I agree that these two of works would benefit from single assignee. My
>> concern is as below
>>
>> 1. Both share libs & remote flink dist/libs are remote ship files. I
>> don't think we have to implement multiple codepath/configuration.
>> 2. So, for concept clarification, there are
>>   (1) an option to disable shipping local libs
>>   (2) flink-dist supports multiple schema at least we said "hdfs://"
>>   (3) an option for registering remote shipfiles with path & visibility.
>> I think new configuration system helps.
>>
>> the reason we have to special handling (2) instead of including it in (3)
>> is because when shipping flink-dist to TM container, we specially
>> detect flink-dist. Of course we can merge it into general ship files and
>> validate shipfiles finally contain flink-dist, which is an alternative.
>>
>> The *most important* difference is (1) and (3) which we don't have an
>> option for only remote libs. Is this clarification satisfy your proposal?
>>
>> Best,
>> tison.
>>
>>
>> Till Rohrmann  于2020年4月17日周五 下午7:49写道:
>>
>>> Hi Yang,
>>>
>>> from what I understand it sounds reasonable to me. Could you sync with
>>> Tison on FLINK-14964 on how to proceed. I'm not super deep into these
>>> issues but they seem to be somewhat related and Tison already did some
>>> implementation work.
>>>
>>> I'd say it be awesome if we could include this kind of improvement into
>>> the release.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Apr 16, 2020 at 4:43 AM Yang Wang  wrote:
>>>
 Hi All, thanks a lot for reviving this discussion.

 I think we could unify the FLINK-13938 and FLINK-14964 since they have
 the similar
 purpose, avoid unnecessary uploading and downloading jars in YARN
 deployment.
 The difference is FLINK-13938 aims to support the flink system lib
 directory only, while
 FLINK-14964 is trying to support arbitrary pre-uloaded jars(including
 user and system jars).


 So i suggest to do this feature as following.
 1. Upload the flink lib directory or users to hdfs, e.g.
 "hdfs://hdpdev/flink/release/flink-1.x"
 "hdfs://hdpdev/user/someone/mylib"
 2. Use the -ypl argument to specify the shared lib, multiple
 directories could be specified
 3. YarnClusterDescriptor will use the pre-uploaded jars to avoid
 unnecessary uploading,
 both for system and user jars
 4. YarnClusterDescriptor needs to set the system jars to public
 visibility so that the distributed
 cache in the YARN nodemanager could be reused by multiple applications.
 This is to avoid
 unnecessary downloading, especially for the "flink-dist-*.jar". For the
 user shared lib, the
 visibility is still set to "APPLICATION" level.


 For our past internal use case, the shared lib could help with
 accelerating the submission a lot.
 Also it helps to reduce the pressure of HDFS when we want to launch
 many applications together.

 @tison @Till Rohrmann  @Hailu, Andreas
  If you guys thinks the suggestion makes 

Re: Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread dixingxing85
多谢benchao,
我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如:
20200417,86
20200417,90
20200417,130
20200417,131

而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的:
20200417,90
20200417,86
20200417,130
20200417,86
20200417,131

我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。
如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧?

Sent from my iPhone

> On Apr 18, 2020, at 10:08, Benchao Li  wrote:
> 
> 
> Hi,
> 
> 这个是支持的哈。
> 你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new].
> 如果是两层的话,就成了:
> 第一层-[old], 第二层-[cur], +[old]
> 第一层+[new], 第二层[-old], +[new]
> 
> dixingxin...@163.com  于2020年4月18日周六 上午2:11写道:
>> 
>> Hi all:
>> 
>> 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,我们想确认下,这是否是个bug, 
>> 或者flink还不支持这种sql。
>> 具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A -> dt,  
>> B -> pvareaid)
>> SELECT dt, SUM(a.uv) AS uv
>> FROM (
>>SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
>>FROM streaming_log_event
>>WHERE action IN ('action1')
>>   AND pvareaid NOT IN ('pv1', 'pv2')
>>   AND pvareaid IS NOT NULL
>>GROUP BY dt, pvareaid
>> ) a
>> GROUP BY dt;
>> sink接收到的数据对应日志为:
>> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(false,0,86,20200417)
>> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(true,0,130,20200417)
>> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(false,0,130,20200417)
>> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(true,0,86,20200417)
>> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(false,0,86,20200417)
>> 2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(true,0,131,20200417)
>> 
>> 我们使用的是1.7.2, 测试作业的并行度为1。
>> 这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228
>> 
>> 
>> dixingxin...@163.com
> 
> 
> -- 
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread dixingxing85
多谢benchao,
我这个作业的结果预期结果是每天只有一个结果,这个结果应该是越来越大的,比如:
20200417,86
20200417,90
20200417,130
20200417,131

而不应该是忽大忽小的,数字由大变小,这样的结果需求方肯定不能接受的:
20200417,90
20200417,86
20200417,130
20200417,86
20200417,131

我的疑问是内层的group by产生的retract流,会影响sink吗,我是在sink端打的日志。
如果flink支持这种两层group by的话,那这种结果变小的情况应该算是bug吧?

Sent from my iPhone

> On Apr 18, 2020, at 10:08, Benchao Li  wrote:
> 
> 
> Hi,
> 
> 这个是支持的哈。
> 你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new].
> 如果是两层的话,就成了:
> 第一层-[old], 第二层-[cur], +[old]
> 第一层+[new], 第二层[-old], +[new]
> 
> dixingxin...@163.com  于2020年4月18日周六 上午2:11写道:
>> 
>> Hi all:
>> 
>> 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,我们想确认下,这是否是个bug, 
>> 或者flink还不支持这种sql。
>> 具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A -> dt,  
>> B -> pvareaid)
>> SELECT dt, SUM(a.uv) AS uv
>> FROM (
>>SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
>>FROM streaming_log_event
>>WHERE action IN ('action1')
>>   AND pvareaid NOT IN ('pv1', 'pv2')
>>   AND pvareaid IS NOT NULL
>>GROUP BY dt, pvareaid
>> ) a
>> GROUP BY dt;
>> sink接收到的数据对应日志为:
>> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(false,0,86,20200417)
>> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(true,0,130,20200417)
>> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(false,0,130,20200417)
>> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(true,0,86,20200417)
>> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(false,0,86,20200417)
>> 2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
>> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
>> data(true,0,131,20200417)
>> 
>> 我们使用的是1.7.2, 测试作业的并行度为1。
>> 这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228
>> 
>> 
>> dixingxin...@163.com
> 
> 
> -- 
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread Benchao Li
Hi,

这个是支持的哈。
你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new].
如果是两层的话,就成了:
第一层-[old], 第二层-[cur], +[old]
第一层+[new], 第二层[-old], +[new]

dixingxin...@163.com  于2020年4月18日周六 上午2:11写道:

>
> Hi all:
>
> 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,*我们想确认下,这是否是个bug,
> 或者flink还不支持这种sql*。
> 具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A ->
> dt,  B -> pvareaid)
>
> SELECT dt, SUM(a.uv) AS uv
> FROM (
>SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
>FROM streaming_log_event
>WHERE action IN ('action1')
>   AND pvareaid NOT IN ('pv1', 'pv2')
>   AND pvareaid IS NOT NULL
>GROUP BY dt, pvareaid
> ) a
> GROUP BY dt;
>
> sink接收到的数据对应日志为:
>
> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(false,0,86,20200417)
> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(true,0,130,20200417)
> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(false,0,130,20200417)
> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive data(true,0,86,20200417)
> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(false,0,86,20200417)
> 2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(true,0,131,20200417)
>
>
> 我们使用的是1.7.2, 测试作业的并行度为1。
> 这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228
>
>
> --
> dixingxin...@163.com
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread Benchao Li
Hi,

这个是支持的哈。
你看到的现象是因为group by会产生retract结果,也就是会先发送-[old],再发送+[new].
如果是两层的话,就成了:
第一层-[old], 第二层-[cur], +[old]
第一层+[new], 第二层[-old], +[new]

dixingxin...@163.com  于2020年4月18日周六 上午2:11写道:

>
> Hi all:
>
> 我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,*我们想确认下,这是否是个bug,
> 或者flink还不支持这种sql*。
> 具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A ->
> dt,  B -> pvareaid)
>
> SELECT dt, SUM(a.uv) AS uv
> FROM (
>SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
>FROM streaming_log_event
>WHERE action IN ('action1')
>   AND pvareaid NOT IN ('pv1', 'pv2')
>   AND pvareaid IS NOT NULL
>GROUP BY dt, pvareaid
> ) a
> GROUP BY dt;
>
> sink接收到的数据对应日志为:
>
> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(false,0,86,20200417)
> 2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(true,0,130,20200417)
> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(false,0,130,20200417)
> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive data(true,0,86,20200417)
> 2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(false,0,86,20200417)
> 2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed 
> (1/1) (GeneralRedisSinkFunction.invoke:169) - receive 
> data(true,0,131,20200417)
>
>
> 我们使用的是1.7.2, 测试作业的并行度为1。
> 这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228
>
>
> --
> dixingxin...@163.com
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Checkpoints for kafka source sometimes get 55 GB size (instead of 2 MB) and flink job fails during restoring from such checkpoint

2020-04-17 Thread Jacob Sevart
This sounds a lot like an issue I just went through (
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Very-large-metadata-file-td33356.html).
Are you using a union list state anywhere?

You could also use the debugging steps mentioned in that thread to inspect
the contents of the bad checkpoint.

On Fri, Apr 17, 2020 at 4:46 PM Oleg Vysotsky  wrote:

> Hi Timo,
>
> Thank you for the suggestion!
>
> Last two days I tried to find the clear way to reproduce the problem and
> to define the root cause. The problem with "abnormal" checkpoints happened
> only on our largest flink job (which processes 6k-10k events per second).
> Similar smallerjobs (same code) don't have this problem. E.g. the similar
> job which processes about 3 times less events don't have this problem.  As
> a result, remote debugging is quite challenging. Instead of debugging I
> added logging to FlinkKafkaConsumerBase#snapshotState and set
> commitOffsetsOnCheckpoints to false to disable "additional" logic in
> FlinkKafkaConsumerBase#snapshotState (please check my temp "log" changes
> below). The logging was as expected like
> {"lvl":"WARN","msg":"snapshotState fetcher: partition:
> KafkaTopicPartition{topic\u003d\u0027dsp-producer-z-clickstream-web-raw\u0027,
> partition\u003d2} offset:1091528771\n"} I didn't find any example of large
> entry added to "unionOffsetStates"
>
> Looks like the problem is that the flink job periodically (not often)
> creates continues set of "bad" checkpoints, which have reasonable
> "checkpoint" size for each operator. After restoring from such "bad"
> checkpoint the flink job starts creating "abnormal" checkpoint which
> includes 55 Gb  for kafka source operator (please check the attachments,
> "Source: dsp-producer-z-clickstream-raw" is kafka source). Creating
> "abnormal" checkpoint is 100% reproducible in this case.  Just in case, we
> just switched to use kafka source instead of kinesis source. We have the
> same job with kinesis for 1+ year and didn't have this problem. Any advices
> are appreciated.
>
>
> @Override
> public final void snapshotState(FunctionSnapshotContext context)
> throws Exception {
> if (!running) {
> LOG.debug("snapshotState() called on closed
> source");
> } else {
> unionOffsetStates.clear();
>
> final AbstractFetcher fetcher =
> this.kafkaFetcher;
> StringBuilder sb = new
> StringBuilder("snapshotState ");
> if (fetcher == null) {
> sb.append("null fetcher: ");
> // the fetcher has not yet been
> initialized, which means we need to return the
> // originally restored offsets or the
> assigned partitions
> for (Map.Entry
> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
>
> unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(),
> subscribedPartition.getValue()));
> sb.append("partition:
> ").append(subscribedPartition.getKey()).append("
> offset:").append(subscribedPartition.getValue()).append('\n');
> }
>
> if (offsetCommitMode ==
> OffsetCommitMode.ON_CHECKPOINTS) {
> // the map cannot be
> asynchronously updated, because only one checkpoint call can happen
> // on this function at a time:
> either snapshotState() or notifyCheckpointComplete()
>
> pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
> }
>
> } else {
> HashMap
> currentOffsets = fetcher.snapshotCurrentState();
>
> if (offsetCommitMode ==
> OffsetCommitMode.ON_CHECKPOINTS) {
> // the map cannot be
> asynchronously updated, because only one checkpoint call can happen
> // on this function at a time:
> either snapshotState() or notifyCheckpointComplete()
>
> pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
> }
> sb.append("fetcher: ");
> for (Map.Entry
> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
> unionOffsetStates.add(
>
> Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),
> kafkaTopicPartitionLongEntry.getValue()));
> sb.append("partition:
> ").append(kafkaTopicPartitionLongEntry.getKey()).append("
> offset:").append(kafkaTopicPartitionLongEntry.getValue()).append('\n');
> }
>
> }
>
> if 

flink1.9,WEB页面,Job Manager页面样式问题。

2020-04-17 Thread guanyq
ok 找到原因了!不好意思!
在 2020-04-16 08:03:29,"guanyq"  写道:
>代码里面是有env.execute,提交job出现以下错误,可能时什么原因?
>The program didn't contain a Flink job. Perhaps you forgot to call execute() 
>on the execution environment.


Re: Akka Error

2020-04-17 Thread tison
If you run a program using "flink run" in dist/bin, dependencies should be
taken care of.

Could you describe detailedly how you "start a flink program"? Did you
write an entrypoint, compile it and run by "java YouProgram"? If so, you
should configure classpath by yourself.

Best,
tison.


Alexander Borgschulze  于2020年4月18日周六 上午3:03写道:

> When I try to start a flink program, I get the following exception:
>
> com.typesafe.config.ConfigException$Missing: No configuration setting
> found for key 'akka.version'
> at
> com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152)
> at
> com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170)
> at
> com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)
>
>
> Do I need to change some files and add a new key-value pair?
>


Re: multi-sql checkpoint fail

2020-04-17 Thread tison
Hi,

Could you share the stack traces?

Best,
tison.


forideal  于2020年4月18日周六 上午12:33写道:

> Hello friend
> I have two SQL, checkpoint fails all the time. One task is to open a
> sliding window for an hour, and then another task consumes the output data
> of the previous task. There will be no problem with the two tasks submitted
> separately.
>
> -- first Calculation-- second Write the calculation to redis-- firstinsert 
> into
>   dw_access_logselect
>   time_key,
>   query_nor,
>   query_nor_counter,
>   '1' as group_keyfrom(
> select
>   HOP_START(
> event_time_fake,
> interval '1' MINUTE,
> interval '60' MINUTE
>   ) as time_key,
>   query_nor,
>   count(1) as query_nor_counter
> from(
> select
>   RED_JSON_VALUE(request, '$.query_nor') as query_nor,
>   RED_JSON_VALUE(request, '$.target') as target,
>   event_time_fake
> from
>   (
> select
>   red_pb_parser(body, 'request') as request,
>   event_time_fake
> from
>   access_log_source
>   )
>   )
> group by
>   query_nor,
>   HOP(   -- sliding window size one hour, step one minute
> event_time_fake,
> interval '1' MINUTE,
> interval '60' MINUTE
>   )
>   )where
>   query_nor_counter > 100;
> -- secondinsert into
>   dw_sink_access_logselect
>   'fix_key' as `key`,
>   get_json_value(query_nor, query_nor_counter) as `value` -- agg_funcfrom
>   dw_access_loggroup by
>   tumble (time_key_fake, interval '1' MINUTE),
>   group_key
>
> Article Link:https://zhuanlan.zhihu.com/p/132764573
> Picture Link:
> https://pic4.zhimg.com/80/v2-d3b1105b1419fef3ea6b9176085a5597_1440w.jpg
> https://pic3.zhimg.com/80/v2-b6ea7b4a8368c4bae03afb94c723bcca_1440w.jpg
>
> Best, forideal
>
>
>
>
>
>


Re: Flink 1.10 Out of memory

2020-04-17 Thread Zahid Rahman
https://betsol.com/java-memory-management-for-java-virtual-machine-jvm/

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 17 Apr 2020 at 14:07, Lasse Nedergaard <
lassenedergaardfl...@gmail.com> wrote:

> Hi.
>
> We have migrated to Flink 1.10 and face out of memory exception and
> hopeful can someone point us in the right direction.
>
> We have a job that use broadcast state, and we sometimes get out memory
> when it creates a savepoint. See stacktrack below.
> We have assigned 2.2 GB/task manager and
> configured  taskmanager.memory.process.size : 2200m
> In Flink 1.9 our container was terminated because OOM, so 1.10 do a better
> job, but it still not working and the task manager is leaking mem for each
> OOM and finial kill by Mesos
>
>
> Any idea what we can do to figure out what settings we need to change?
>
> Thanks in advance
>
> Lasse Nedergaard
>
>
> WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could
> not close the state stream for
> s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa.
> java.io.IOException: Cannot allocate memory at
> java.io.FileOutputStream.writeBytes(Native Method) at
> java.io.FileOutputStream.write(FileOutputStream.java:326) at
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at
> java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at
> java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at
> java.io.FilterOutputStream.close(FilterOutputStream.java:158) at
> com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995)
> at
> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
> at
> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
> at
> org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
> at
> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
> at
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277)
> at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at
> org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
> INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Discarding checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c.
> org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint
> 3509 for operator Feature extraction (8/12). at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot
> allocate memory at
> java.util.concurrent.FutureTask.report(FutureTask.java:122) at
> java.util.concurrent.FutureTask.get(FutureTask.java:192) at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
> ... 3 common frames omitted Caused by:
> org.apache.flink.util.SerializedThrowable: Cannot allocate memory at
> java.io.FileOutputStream.writeBytes(Native Method) at
> java.io.FileOutputStream.write(FileOutputStream.java:326) at
> java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at
> java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at
> java.io.FilterOutputStream.write(FilterOutputStream.java:77) at
> 

Akka Error

2020-04-17 Thread Alexander Borgschulze
When I try to start a flink program, I get the following exception:

 

com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.version'
        at com.typesafe.config.impl.SimpleConfig.findKeyOrNull(SimpleConfig.java:152)
        at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:170)
        at com.typesafe.config.impl.SimpleConfig.findOrNull(SimpleConfig.java:176)

 

 

Do I need to change some files and add a new key-value pair?


Re: Debug Slowness in Async Checkpointing

2020-04-17 Thread Robert Metzger
Hi,
did you check the TaskManager logs if there are retries by the s3a file
system during checkpointing?

I'm not aware of any metrics in Flink that could be helpful in this
situation.

Best,
Robert

On Tue, Apr 14, 2020 at 12:02 AM Lu Niu  wrote:

> Hi, Flink users
>
> We notice sometimes async checkpointing can be extremely slow, leading to
> checkpoint timeout. For example, For a state size around 2.5MB, it could
> take 7~12min in async checkpointing:
>
> [image: Screen Shot 2020-04-09 at 5.04.30 PM.png]
>
> Notice all the slowness comes from async checkpointing, no delay in sync
> part and barrier assignment. As we use rocksdb incremental checkpointing, I
> notice the slowness might be caused by uploading the file to s3. However, I
> am not completely sure since there are other steps in async checkpointing.
> Does flink expose fine-granular metrics to debug such slowness?
>
> setup: flink 1.9.1, rocksdb incremental state backend, S3AHaoopFileSystem
>
> Best
> Lu
>


Re: Checkpoint Space Usage Debugging

2020-04-17 Thread Yun Tang
Hi Kent

You can view checkpoint details via web UI to know how much checkpointed data 
uploaded for each operator, and you can compare the state size as time goes on 
to see whether they upload checkpointed data in stable range.

Best
Yun Tang

From: Kent Murra 
Sent: Saturday, April 18, 2020 1:47
To: user@flink.apache.org 
Subject: Checkpoint Space Usage Debugging

I'm looking into a situation where our checkpoint sizes are automatically 
growing over time.  I'm unable to pinpoint exactly why this is happening, and 
it would be great if there was a way to figure out how much checkpoint space is 
attributable to each operator so I can narrow it down.  Are there any tools or 
methods for introspecting the checkpoint data so that I can determine where the 
space is going?

The pipeline in question is consuming from Kinesis and batching up data using 
windows.  I suspected that I was doing something wrong with windowing, but I'm 
emitting FIRE_AND_PURGE and also setting a max end timestamp.  The Kinesis 
consumer is not emitting watermarks at the moment, but as far as I know thats 
not necessary for proper checkpointing (only exactly once behavior).


Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread dixingxin...@163.com

Hi all:

我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,我们想确认下,这是否是个bug, 或者flink还不支持这种sql。
具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A -> dt,  B 
-> pvareaid)
SELECT dt, SUM(a.uv) AS uv
FROM (
   SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
   FROM streaming_log_event
   WHERE action IN ('action1')
  AND pvareaid NOT IN ('pv1', 'pv2')
  AND pvareaid IS NOT NULL
   GROUP BY dt, pvareaid
) a
GROUP BY dt;
sink接收到的数据对应日志为:
2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,86,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,131,20200417)

我们使用的是1.7.2, 测试作业的并行度为1。
这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228




dixingxin...@163.com


Flink streaming sql是否支持两层group by聚合

2020-04-17 Thread dixingxin...@163.com

Hi all:

我们有个streaming sql得到的结果不正确,现象是sink得到的数据一会大一会小,我们想确认下,这是否是个bug, 或者flink还不支持这种sql。
具体场景是:先group by A, B两个维度计算UV,然后再group by A 把维度B的UV sum起来,对应的SQL如下:(A -> dt,  B 
-> pvareaid)
SELECT dt, SUM(a.uv) AS uv
FROM (
   SELECT dt, pvareaid, COUNT(DISTINCT cuid) AS uv
   FROM streaming_log_event
   WHERE action IN ('action1')
  AND pvareaid NOT IN ('pv1', 'pv2')
  AND pvareaid IS NOT NULL
   GROUP BY dt, pvareaid
) a
GROUP BY dt;
sink接收到的数据对应日志为:
2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:38,727INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,130,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,86,20200417)
2020-04-17 22:28:39,327INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(false,0,86,20200417)
2020-04-17 22:28:39,328INFO groupBy xx -> to: Tuple2 -> Sink: Unnamed (1/1) 
(GeneralRedisSinkFunction.invoke:169) - receive data(true,0,131,20200417)

我们使用的是1.7.2, 测试作业的并行度为1。
这是对应的 issue: https://issues.apache.org/jira/browse/FLINK-17228




dixingxin...@163.com


Checkpoint Space Usage Debugging

2020-04-17 Thread Kent Murra
I'm looking into a situation where our checkpoint sizes are automatically
growing over time.  I'm unable to pinpoint exactly why this is happening,
and it would be great if there was a way to figure out how much checkpoint
space is attributable to each operator so I can narrow it down.  Are there
any tools or methods for introspecting the checkpoint data so that I can
determine where the space is going?

The pipeline in question is consuming from Kinesis and batching up data
using windows.  I suspected that I was doing something wrong with
windowing, but I'm emitting FIRE_AND_PURGE and also setting a max end
timestamp.  The Kinesis consumer is not emitting watermarks at the moment,
but as far as I know thats not necessary for proper checkpointing (only
exactly once behavior).


Re: Flink upgrade to 1.10: function

2020-04-17 Thread seeksst
Hi,


   Thank you for reply.


   I find it caused by SqlStdOperatorTable and have tried many ways to change 
it, but failed. Finally, I decided to copy it and renamed. Another thing that 
caught my attention is that i also define last_value function which args is 
same to SqlStdOperatorTable, and through CalciteConfig to replace 
SqlOperatorTable, so i can use CustomBasicOperatorTable replace 
BasicOperatorTable and register my defined last_value function. I change 
AggregateUtil to make it work. It seems not prefect, do you have any advice to 
avoid changing AggregateUtil?


  the different thing is that my JSON_VALUE function args is not similar to 
SqlStdOperatorTable.there may be no way to replace it. I learn more about 
Build-in Function by source code, JSON_VALUE may not realize.is it necessary to 
define many functions in SqlStdOperatorTable that not use? as i know, parser 
may decide many thing, and limit by calcite, so convert may get information 
which i don’t want and can’t change it. it seems hard to solve, and i have no 
idea.


   Best,
  L

 原始邮件 
发件人: Jark Wu
收件人: Till Rohrmann; Danny Chan
抄送: seeksst; user; Timo 
Walther
发送时间: 2020年4月17日(周五) 22:53
主题: Re: Flink upgrade to 1.10: function


Hi,


I guess you are already close to the truth. Since Flink 1.10 we upgraded 
Calcite to 1.21 which reserves JSON_VALUE as keyword.
So that a user define function can't use this name anymore. That's why 
JSON_VALUE(...) will always be parsed as the Calcite's builtin function 
definition. 
Currently, I don't have other ideas except using another function name... 


cc @Danny Chan , do you know why JSON_VALUE is a reserved keyword in Calcite? 
Could we change it into non-reserved keyword?


Best,
Jark






On Fri, 17 Apr 2020 at 21:00, Till Rohrmann  wrote:

Hi,


thanks for reporting these problems. I'm pulling in Timo and Jark who are 
working on the SQL component. They might be able to help you with your problem.


Cheers,
Till


On Thu, Apr 16, 2020 at 11:10 AM seeksst  wrote:

Hi, All


Recently, I try to upgrade flink from 1.8.2 to 1.10, but i meet some problem 
about function. In 1.8.2, there are just Built-In function and User-defined 
Functions, but in 1.10, there are 4 categories of funtions.
I defined a function which named JSON_VALUE in my system, it doesn’t exist in 
1.8.2, but present to 1.10.0. of course i use it in sql, something like  
'select JSON_VALUE(string, string) from table1’, no category or database. the 
problem is in 1.10.0, my function will be recognized as SqlJsonValueFunction, 
and args not match, so my sql is wrong.
I read document about Ambiguous Function Reference, In my 
understanding, my function will be registered as temporary system function, and 
it should be chosen first. isn’t it? I try to debug it, and find some 
information:
First, sql will be parsed by ParseImpl, and JSON_VALUE will be parsed as 
SqlBasicCall, operator is SqlJsonValueFunction, it’s belonged to SYSTEM catalog 
and the kind is OTHER_FUNCTION. Then, SqlUtil.lookupRoutine will not find this 
SqlFunction, because it not in BasicOperatorTable. my function in 
FunctionCatalog, but SqlJsonValueFunction belonged to SYSTEM, not belong to 
USER_DEFINED, so program will not search it in FunctionCatalog.
How can i solve this problem without modifying sql and function name? my 
program can choose flink version and have many sql jobs, so i don’t wish to 
modify sql and function name.
Thansk.

multi-sql checkpoint fail

2020-04-17 Thread forideal
Hello friend

I have two SQL, checkpoint fails all the time. One task is to open a sliding 
window for an hour, and then another task consumes the output data of the 
previous task. There will be no problem with the two tasks submitted separately.
-- first Calculation
-- second Write the calculation to redis
-- first
insertintodw_access_logselecttime_key,query_nor,query_nor_counter,'1'asgroup_keyfrom(selectHOP_START(event_time_fake,interval'1'MINUTE,interval'60'MINUTE)astime_key,query_nor,count(1)asquery_nor_counterfrom(selectRED_JSON_VALUE(request,'$.query_nor')asquery_nor,RED_JSON_VALUE(request,'$.target')astarget,event_time_fakefrom(selectred_pb_parser(body,'request')asrequest,event_time_fakefromaccess_log_source))groupbyquery_nor,HOP(--
 sliding window size one hour, step one minute
event_time_fake,interval'1'MINUTE,interval'60'MINUTE))wherequery_nor_counter>100;--
 second
insertintodw_sink_access_logselect'fix_key'as`key`,get_json_value(query_nor,query_nor_counter)as`value`--
 agg_func
fromdw_access_loggroupbytumble(time_key_fake,interval'1'MINUTE),group_key
Article Link:https://zhuanlan.zhihu.com/p/132764573
Picture Link:
https://pic4.zhimg.com/80/v2-d3b1105b1419fef3ea6b9176085a5597_1440w.jpg 
https://pic3.zhimg.com/80/v2-b6ea7b4a8368c4bae03afb94c723bcca_1440w.jpg


Best, forideal





Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-17 Thread Yang Wang
Hi tison,

For #3, if you mean registering remote HDFS file as local resource, we
should make the "-yt/--yarnship"
to support remote directory. I think it is the right direction.

For #1, if the users could ship remote directory, then they could also
specify like this
"-yt hdfs://hdpdev/flink/release/flink-1.x,
hdfs://hdpdev/user/someone/mylib". Do you mean we add an
option for whether trying to avoid unnecessary uploading? Maybe we could
filter by names and file size.
I think this is a good suggestion, and we do not need to introduce a new
config option "-ypl".

For #2, for flink-dist, the #1 could already solve the problem. We do not
need to support remote schema.
It will confuse the users when we only support HDFS, not S3, OSS, etc.


Best,
Yang

tison  于2020年4月17日周五 下午8:05写道:

> Hi Yang,
>
> I agree that these two of works would benefit from single assignee. My
> concern is as below
>
> 1. Both share libs & remote flink dist/libs are remote ship files. I don't
> think we have to implement multiple codepath/configuration.
> 2. So, for concept clarification, there are
>   (1) an option to disable shipping local libs
>   (2) flink-dist supports multiple schema at least we said "hdfs://"
>   (3) an option for registering remote shipfiles with path & visibility. I
> think new configuration system helps.
>
> the reason we have to special handling (2) instead of including it in (3)
> is because when shipping flink-dist to TM container, we specially
> detect flink-dist. Of course we can merge it into general ship files and
> validate shipfiles finally contain flink-dist, which is an alternative.
>
> The *most important* difference is (1) and (3) which we don't have an
> option for only remote libs. Is this clarification satisfy your proposal?
>
> Best,
> tison.
>
>
> Till Rohrmann  于2020年4月17日周五 下午7:49写道:
>
>> Hi Yang,
>>
>> from what I understand it sounds reasonable to me. Could you sync with
>> Tison on FLINK-14964 on how to proceed. I'm not super deep into these
>> issues but they seem to be somewhat related and Tison already did some
>> implementation work.
>>
>> I'd say it be awesome if we could include this kind of improvement into
>> the release.
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 16, 2020 at 4:43 AM Yang Wang  wrote:
>>
>>> Hi All, thanks a lot for reviving this discussion.
>>>
>>> I think we could unify the FLINK-13938 and FLINK-14964 since they have
>>> the similar
>>> purpose, avoid unnecessary uploading and downloading jars in YARN
>>> deployment.
>>> The difference is FLINK-13938 aims to support the flink system lib
>>> directory only, while
>>> FLINK-14964 is trying to support arbitrary pre-uloaded jars(including
>>> user and system jars).
>>>
>>>
>>> So i suggest to do this feature as following.
>>> 1. Upload the flink lib directory or users to hdfs, e.g.
>>> "hdfs://hdpdev/flink/release/flink-1.x"
>>> "hdfs://hdpdev/user/someone/mylib"
>>> 2. Use the -ypl argument to specify the shared lib, multiple directories
>>> could be specified
>>> 3. YarnClusterDescriptor will use the pre-uploaded jars to avoid
>>> unnecessary uploading,
>>> both for system and user jars
>>> 4. YarnClusterDescriptor needs to set the system jars to public
>>> visibility so that the distributed
>>> cache in the YARN nodemanager could be reused by multiple applications.
>>> This is to avoid
>>> unnecessary downloading, especially for the "flink-dist-*.jar". For the
>>> user shared lib, the
>>> visibility is still set to "APPLICATION" level.
>>>
>>>
>>> For our past internal use case, the shared lib could help with
>>> accelerating the submission a lot.
>>> Also it helps to reduce the pressure of HDFS when we want to launch many
>>> applications together.
>>>
>>> @tison @Till Rohrmann  @Hailu, Andreas
>>>  If you guys thinks the suggestion makes sense. I
>>> will try to find some time to work on this and hope it could catch up
>>> with release-1.1 cycle.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Hailu, Andreas [Engineering]  于2020年4月16日周四
>>> 上午8:47写道:
>>>
 Okay, I’ll continue to watch the JIRAs. Thanks for the update, Till.



 *// *ah



 *From:* Till Rohrmann 
 *Sent:* Wednesday, April 15, 2020 10:51 AM
 *To:* Hailu, Andreas [Engineering] 
 *Cc:* Yang Wang ; tison ;
 user@flink.apache.org
 *Subject:* Re: Flink Conf "yarn.flink-dist-jar" Question



 Hi Andreas,



 it looks as if FLINK-13938 and FLINK-14964 won't make it into the
 1.10.1 release because the community is about to start the release process.
 Since FLINK-13938 is a new feature it will be shipped with a major release.
 There is still a bit of time until the 1.11 feature freeze and if Yang Wang
 has time to finish this PR, then we could ship it.



 Cheers,

 Till



 On Wed, Apr 15, 2020 at 3:23 PM Hailu, Andreas [Engineering] <
 andreas.ha...@gs.com> wrote:

 Yang, Tison,



 Do we know 

Re: Flink upgrade to 1.10: function

2020-04-17 Thread Jark Wu
Hi,

I guess you are already close to the truth. Since Flink 1.10 we upgraded
Calcite to 1.21 which reserves JSON_VALUE as keyword.
So that a user define function can't use this name anymore. That's why
JSON_VALUE(...) will always be parsed as the Calcite's builtin function
definition.
Currently, I don't have other ideas except using another function name...

cc @Danny Chan  , do you know why JSON_VALUE is a
reserved keyword in Calcite? Could we change it into non-reserved keyword?

Best,
Jark



On Fri, 17 Apr 2020 at 21:00, Till Rohrmann  wrote:

> Hi,
>
> thanks for reporting these problems. I'm pulling in Timo and Jark who are
> working on the SQL component. They might be able to help you with your
> problem.
>
> Cheers,
> Till
>
> On Thu, Apr 16, 2020 at 11:10 AM seeksst  wrote:
>
>> Hi, All
>>
>>
>> Recently, I try to upgrade flink from 1.8.2 to 1.10, but i meet some
>> problem about function. In 1.8.2, there are just Built-In function and
>> User-defined Functions, but in 1.10, there are 4 categories of funtions.
>>
>> I defined a function which named JSON_VALUE in my system, it doesn’t
>> exist in 1.8.2, but present to 1.10.0. of course i use it in sql, something
>> like  'select JSON_VALUE(string, string) from table1’, no category or
>> database. the problem is in 1.10.0, my function will be recognized as
>> SqlJsonValueFunction, and args not match, so my sql is wrong.
>>
>> I read document about Ambiguous Function Reference, In my
>> understanding, my function will be registered as temporary system function,
>> and it should be chosen first. isn’t it? I try to debug it, and find some
>> information:
>>
>> First, sql will be parsed by ParseImpl, and JSON_VALUE will be parsed as
>> SqlBasicCall, operator is SqlJsonValueFunction, it’s belonged to SYSTEM
>> catalog and the kind is OTHER_FUNCTION. Then, SqlUtil.lookupRoutine will
>> not find this SqlFunction, because it not in BasicOperatorTable. my
>> function in FunctionCatalog, but SqlJsonValueFunction belonged to
>> SYSTEM, not belong to USER_DEFINED, so program will not search it in
>> FunctionCatalog.
>>
>> How can i solve this problem without modifying sql and function name? my
>> program can choose flink version and have many sql jobs, so i don’t wish to
>> modify sql and function name.
>>
>> Thansk.
>>
>


Re: Can I use Joda-Time in Flink?

2020-04-17 Thread tison
Hi Alexander,

What do you mean exactly? Could you describe it in pseudo code? I'm not
quite sure where Java-Time used in env.

Best,
tison.


Alexander Borgschulze  于2020年4月17日周五 下午9:21写道:

> Can I use Joda-Time instead of Java-Time and set it up in the
> StreamExecutionEnvironment?
>


Can I use Joda-Time in Flink?

2020-04-17 Thread Alexander Borgschulze
Can I use Joda-Time instead of Java-Time and set it up in the StreamExecutionEnvironment?


Flink 1.10 Out of memory

2020-04-17 Thread Lasse Nedergaard
Hi.

We have migrated to Flink 1.10 and face out of memory exception and hopeful
can someone point us in the right direction.

We have a job that use broadcast state, and we sometimes get out memory
when it creates a savepoint. See stacktrack below.
We have assigned 2.2 GB/task manager and
configured  taskmanager.memory.process.size : 2200m
In Flink 1.9 our container was terminated because OOM, so 1.10 do a better
job, but it still not working and the task manager is leaking mem for each
OOM and finial kill by Mesos


Any idea what we can do to figure out what settings we need to change?

Thanks in advance

Lasse Nedergaard


WARN o.a.flink.runtime.state.filesystem.FsCheckpointStreamFactory - Could
not close the state stream for
s3://flinkstate/dcos-prod/checkpoints/fc9318cc236d09f0bfd994f138896d6c/chk-3509/cf0714dc-ad7c-4946-b44c-96d4a131a4fa.
java.io.IOException: Cannot allocate memory at
java.io.FileOutputStream.writeBytes(Native Method) at
java.io.FileOutputStream.write(FileOutputStream.java:326) at
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at
java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140) at
java.io.FilterOutputStream.flush(FilterOutputStream.java:140) at
java.io.FilterOutputStream.close(FilterOutputStream.java:158) at
com.facebook.presto.hive.s3.PrestoS3FileSystem$PrestoS3OutputStream.close(PrestoS3FileSystem.java:995)
at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
at
org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:101)
at
org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
at
org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.close(FsCheckpointStreamFactory.java:277)
at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:263) at
org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:250) at
org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:122)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.closeSnapshotIO(AsyncSnapshotCallable.java:167)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:83)
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Discarding
checkpoint 3509 of job fc9318cc236d09f0bfd994f138896d6c.
org.apache.flink.util.SerializedThrowable: Could not materialize checkpoint
3509 for operator Feature extraction (8/12). at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1238)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1180)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) Caused by:
org.apache.flink.util.SerializedThrowable: java.io.IOException: Cannot
allocate memory at
java.util.concurrent.FutureTask.report(FutureTask.java:122) at
java.util.concurrent.FutureTask.get(FutureTask.java:192) at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:53)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1143)
... 3 common frames omitted Caused by:
org.apache.flink.util.SerializedThrowable: Cannot allocate memory at
java.io.FileOutputStream.writeBytes(Native Method) at
java.io.FileOutputStream.write(FileOutputStream.java:326) at
java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82) at
java.io.BufferedOutputStream.write(BufferedOutputStream.java:95) at
java.io.FilterOutputStream.write(FilterOutputStream.java:77) at
java.io.FilterOutputStream.write(FilterOutputStream.java:125) at
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:57)
at java.io.DataOutputStream.write(DataOutputStream.java:107) at
org.apache.flink.fs.s3presto.common.HadoopDataOutputStream.write(HadoopDataOutputStream.java:47)
at
org.apache.flink.core.fs.FSDataOutputStreamWrapper.write(FSDataOutputStreamWrapper.java:66)
at

Re: Flink upgrade to 1.10: function

2020-04-17 Thread Till Rohrmann
Hi,

thanks for reporting these problems. I'm pulling in Timo and Jark who are
working on the SQL component. They might be able to help you with your
problem.

Cheers,
Till

On Thu, Apr 16, 2020 at 11:10 AM seeksst  wrote:

> Hi, All
>
>
> Recently, I try to upgrade flink from 1.8.2 to 1.10, but i meet some
> problem about function. In 1.8.2, there are just Built-In function and
> User-defined Functions, but in 1.10, there are 4 categories of funtions.
>
> I defined a function which named JSON_VALUE in my system, it doesn’t exist
> in 1.8.2, but present to 1.10.0. of course i use it in sql, something like
>  'select JSON_VALUE(string, string) from table1’, no category or database.
> the problem is in 1.10.0, my function will be recognized as
> SqlJsonValueFunction, and args not match, so my sql is wrong.
>
> I read document about Ambiguous Function Reference, In my
> understanding, my function will be registered as temporary system function,
> and it should be chosen first. isn’t it? I try to debug it, and find some
> information:
>
> First, sql will be parsed by ParseImpl, and JSON_VALUE will be parsed as
> SqlBasicCall, operator is SqlJsonValueFunction, it’s belonged to SYSTEM
> catalog and the kind is OTHER_FUNCTION. Then, SqlUtil.lookupRoutine will
> not find this SqlFunction, because it not in BasicOperatorTable. my
> function in FunctionCatalog, but SqlJsonValueFunction belonged to SYSTEM,
> not belong to USER_DEFINED, so program will not search it in
> FunctionCatalog.
>
> How can i solve this problem without modifying sql and function name? my
> program can choose flink version and have many sql jobs, so i don’t wish to
> modify sql and function name.
>
> Thansk.
>


Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-17 Thread tison
Hi Yang,

I agree that these two of works would benefit from single assignee. My
concern is as below

1. Both share libs & remote flink dist/libs are remote ship files. I don't
think we have to implement multiple codepath/configuration.
2. So, for concept clarification, there are
  (1) an option to disable shipping local libs
  (2) flink-dist supports multiple schema at least we said "hdfs://"
  (3) an option for registering remote shipfiles with path & visibility. I
think new configuration system helps.

the reason we have to special handling (2) instead of including it in (3)
is because when shipping flink-dist to TM container, we specially
detect flink-dist. Of course we can merge it into general ship files and
validate shipfiles finally contain flink-dist, which is an alternative.

The *most important* difference is (1) and (3) which we don't have an
option for only remote libs. Is this clarification satisfy your proposal?

Best,
tison.


Till Rohrmann  于2020年4月17日周五 下午7:49写道:

> Hi Yang,
>
> from what I understand it sounds reasonable to me. Could you sync with
> Tison on FLINK-14964 on how to proceed. I'm not super deep into these
> issues but they seem to be somewhat related and Tison already did some
> implementation work.
>
> I'd say it be awesome if we could include this kind of improvement into
> the release.
>
> Cheers,
> Till
>
> On Thu, Apr 16, 2020 at 4:43 AM Yang Wang  wrote:
>
>> Hi All, thanks a lot for reviving this discussion.
>>
>> I think we could unify the FLINK-13938 and FLINK-14964 since they have
>> the similar
>> purpose, avoid unnecessary uploading and downloading jars in YARN
>> deployment.
>> The difference is FLINK-13938 aims to support the flink system lib
>> directory only, while
>> FLINK-14964 is trying to support arbitrary pre-uloaded jars(including
>> user and system jars).
>>
>>
>> So i suggest to do this feature as following.
>> 1. Upload the flink lib directory or users to hdfs, e.g.
>> "hdfs://hdpdev/flink/release/flink-1.x"
>> "hdfs://hdpdev/user/someone/mylib"
>> 2. Use the -ypl argument to specify the shared lib, multiple directories
>> could be specified
>> 3. YarnClusterDescriptor will use the pre-uploaded jars to avoid
>> unnecessary uploading,
>> both for system and user jars
>> 4. YarnClusterDescriptor needs to set the system jars to public
>> visibility so that the distributed
>> cache in the YARN nodemanager could be reused by multiple applications.
>> This is to avoid
>> unnecessary downloading, especially for the "flink-dist-*.jar". For the
>> user shared lib, the
>> visibility is still set to "APPLICATION" level.
>>
>>
>> For our past internal use case, the shared lib could help with
>> accelerating the submission a lot.
>> Also it helps to reduce the pressure of HDFS when we want to launch many
>> applications together.
>>
>> @tison @Till Rohrmann  @Hailu, Andreas
>>  If you guys thinks the suggestion makes sense. I
>> will try to find some time to work on this and hope it could catch up
>> with release-1.1 cycle.
>>
>>
>> Best,
>> Yang
>>
>> Hailu, Andreas [Engineering]  于2020年4月16日周四
>> 上午8:47写道:
>>
>>> Okay, I’ll continue to watch the JIRAs. Thanks for the update, Till.
>>>
>>>
>>>
>>> *// *ah
>>>
>>>
>>>
>>> *From:* Till Rohrmann 
>>> *Sent:* Wednesday, April 15, 2020 10:51 AM
>>> *To:* Hailu, Andreas [Engineering] 
>>> *Cc:* Yang Wang ; tison ;
>>> user@flink.apache.org
>>> *Subject:* Re: Flink Conf "yarn.flink-dist-jar" Question
>>>
>>>
>>>
>>> Hi Andreas,
>>>
>>>
>>>
>>> it looks as if FLINK-13938 and FLINK-14964 won't make it into the 1.10.1
>>> release because the community is about to start the release process. Since
>>> FLINK-13938 is a new feature it will be shipped with a major release. There
>>> is still a bit of time until the 1.11 feature freeze and if Yang Wang has
>>> time to finish this PR, then we could ship it.
>>>
>>>
>>>
>>> Cheers,
>>>
>>> Till
>>>
>>>
>>>
>>> On Wed, Apr 15, 2020 at 3:23 PM Hailu, Andreas [Engineering] <
>>> andreas.ha...@gs.com> wrote:
>>>
>>> Yang, Tison,
>>>
>>>
>>>
>>> Do we know when some solution for 13938 and 14964 will arrive? Do you
>>> think it will be in a 1.10.x version?
>>>
>>>
>>>
>>> *// *ah
>>>
>>>
>>>
>>> *From:* Hailu, Andreas [Engineering]
>>> *Sent:* Friday, March 20, 2020 9:19 AM
>>> *To:* 'Yang Wang' 
>>> *Cc:* tison ; user@flink.apache.org
>>> *Subject:* RE: Flink Conf "yarn.flink-dist-jar" Question
>>>
>>>
>>>
>>> Hi Yang,
>>>
>>>
>>>
>>> This is good to know. As a stopgap measure until a solution between
>>> 13938 and 14964 arrives, we can automate the application staging directory
>>> cleanup from our client should the process fail. It’s not ideal, but will
>>> at least begin to manage our users’ quota. I’ll continue to watch the two
>>> tickets. Thank you.
>>>
>>>
>>>
>>> *// *ah
>>>
>>>
>>>
>>> *From:* Yang Wang 
>>> *Sent:* Monday, March 16, 2020 9:37 PM
>>> *To:* Hailu, Andreas [Engineering] 
>>> *Cc:* tison ; user@flink.apache.org
>>> *Subject:* Re: Flink Conf 

Re: Flink job didn't restart when a task failed

2020-04-17 Thread Till Rohrmann
Keep us posted once you caught the problem in the act. This would help to
debug/understand this problem tremendously.

Cheers,
Till

On Wed, Apr 15, 2020 at 8:44 AM Zhu Zhu  wrote:

> Sorry I made a mistake. Even if it's the case I had guessed, you will not
> get a log "Task {} is already in state FAILED." because that task was
> already unregistered before trying to update the state to JM. Unfortunately
> currently we have no log which can be used to prove it.
> Just to confirm that the line "FOG_PREDICTION_FUNCTION (15/20) (
> 3086efd0e57612710d0ea74138c01090) switched from RUNNING to FAILED" does
> not appear in the JM log, right? This might be an issue that the message
> was lost on network, which should be a rare case. Do you encounter it often?
>
> Thanks,
> Zhu Zhu
>
> Hanson, Bruce  于2020年4月15日周三 上午9:16写道:
>
>> Hi Zhu Zhu (and Till),
>>
>>
>>
>> Thanks for your thoughts on this problem. I do not see a message like the
>> one you mention "Task {} is already in state FAILED." I have attached a
>> file with all the task manager logs that we received at the time this
>> happened. As you see, there aren’t many. We turned on debug logging for
>> “org.apache.flink” on this job this afternoon so maybe we’ll find something
>> interesting if/when the issue happens again. I do hope we can catch it in
>> the act.
>>
>>
>>
>> -Bruce
>>
>>
>>
>> --
>>
>>
>>
>>
>>
>> *From: *Zhu Zhu 
>> *Date: *Monday, April 13, 2020 at 9:29 PM
>> *To: *Till Rohrmann 
>> *Cc: *Aljoscha Krettek , user ,
>> Gary Yao 
>> *Subject: *Re: Flink job didn't restart when a task failed
>>
>>
>>
>> Sorry for not following this ML earlier.
>>
>>
>>
>> I think the cause might be that the final state ('FAILED') update message
>> to JM is lost. TaskExecutor will simply fail the task (which does not take
>> effect in this case since the task is already FAILED) and will not update
>> the task state again in this case.
>>
>> @Bruce would you take a look at the TM log? If the guess is right, in
>> task manager logs there will be one line "Task {} is already in state
>> FAILED."
>>
>>
>>
>> Thanks,
>>
>> Zhu Zhu
>>
>>
>>
>> Till Rohrmann  于2020年4月10日周五 上午12:51写道:
>>
>> For future reference, here is the issue to track the reconciliation logic
>> [1].
>>
>>
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-17075
>> 
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Thu, Apr 9, 2020 at 6:47 PM Till Rohrmann 
>> wrote:
>>
>> Hi Bruce,
>>
>>
>>
>> what you are describing sounds indeed quite bad. Quite hard to say
>> whether we fixed such an issue in 1.10. It is definitely worth a try to
>> upgrade, though.
>>
>>
>>
>> In order to further debug the problem, it would be really great if you
>> could provide us with the log files of the JobMaster and the TaskExecutor.
>> Ideally on debug log level if you have them.
>>
>>
>>
>> One thing which we wanted to add is sending the current task statuses as
>> part of the heartbeat from the TM to the JM. Having this information would
>> allow us to reconcile a situation like you are describing.
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Thu, Apr 9, 2020 at 1:57 PM Aljoscha Krettek 
>> wrote:
>>
>> Hi,
>>
>> this indeed seems very strange!
>>
>> @Gary Could you maybe have a look at this since you work/worked quite a
>> bit on the scheduler?
>>
>> Best,
>> Aljoscha
>>
>> On 09.04.20 05:46, Hanson, Bruce wrote:
>> > Hello Flink folks:
>> >
>> > We had a problem with a Flink job the other day that I haven’t seen
>> before. One task encountered a failure and switched to FAILED (see the full
>> exception below). After the failure, the task said it was notifying the Job
>> Manager:
>> >
>> > 2020-04-06 08:21:04.329 [flink-akka.actor.default-dispatcher-55283]
>> level=INFO org.apache.flink.runtime.taskexecutor.TaskExecutor -
>> Un-registering task and sending final execution state FAILED to JobManager
>> for task FOG_PREDICTION_FUNCTION 3086efd0e57612710d0ea74138c01090.
>> >
>> > But I see no evidence that the Job Manager got the message. I would
>> expect with this type of failure that the Job Manager would restart the
>> job. In this case, the job carried on, hobbled, until the it stopped
>> processing data and our user had to manually restart the job. The job also
>> started experiencing checkpoint timeouts on every checkpoint due to this
>> operator stopping.
>> >
>> > Had the job restarted when this happened, I believe everything would
>> have been ok as the job had an appropriate restart strategy in place. The
>> Task Manager that this task was running on remained healthy and was
>> actively processing other tasks.
>> >
>> > It seems like this is some kind of a bug. Is this something anyone has
>> seen before? Could it be something that has been fixed if we went to 

Re: Flink Conf "yarn.flink-dist-jar" Question

2020-04-17 Thread Till Rohrmann
Hi Yang,

from what I understand it sounds reasonable to me. Could you sync with
Tison on FLINK-14964 on how to proceed. I'm not super deep into these
issues but they seem to be somewhat related and Tison already did some
implementation work.

I'd say it be awesome if we could include this kind of improvement into the
release.

Cheers,
Till

On Thu, Apr 16, 2020 at 4:43 AM Yang Wang  wrote:

> Hi All, thanks a lot for reviving this discussion.
>
> I think we could unify the FLINK-13938 and FLINK-14964 since they have the
> similar
> purpose, avoid unnecessary uploading and downloading jars in YARN
> deployment.
> The difference is FLINK-13938 aims to support the flink system lib
> directory only, while
> FLINK-14964 is trying to support arbitrary pre-uloaded jars(including user
> and system jars).
>
>
> So i suggest to do this feature as following.
> 1. Upload the flink lib directory or users to hdfs, e.g.
> "hdfs://hdpdev/flink/release/flink-1.x"
> "hdfs://hdpdev/user/someone/mylib"
> 2. Use the -ypl argument to specify the shared lib, multiple directories
> could be specified
> 3. YarnClusterDescriptor will use the pre-uploaded jars to avoid
> unnecessary uploading,
> both for system and user jars
> 4. YarnClusterDescriptor needs to set the system jars to public visibility
> so that the distributed
> cache in the YARN nodemanager could be reused by multiple applications.
> This is to avoid
> unnecessary downloading, especially for the "flink-dist-*.jar". For the
> user shared lib, the
> visibility is still set to "APPLICATION" level.
>
>
> For our past internal use case, the shared lib could help with
> accelerating the submission a lot.
> Also it helps to reduce the pressure of HDFS when we want to launch many
> applications together.
>
> @tison @Till Rohrmann  @Hailu, Andreas
>  If you guys thinks the suggestion makes sense. I
> will try to find some time to work on this and hope it could catch up with
> release-1.1 cycle.
>
>
> Best,
> Yang
>
> Hailu, Andreas [Engineering]  于2020年4月16日周四
> 上午8:47写道:
>
>> Okay, I’ll continue to watch the JIRAs. Thanks for the update, Till.
>>
>>
>>
>> *// *ah
>>
>>
>>
>> *From:* Till Rohrmann 
>> *Sent:* Wednesday, April 15, 2020 10:51 AM
>> *To:* Hailu, Andreas [Engineering] 
>> *Cc:* Yang Wang ; tison ;
>> user@flink.apache.org
>> *Subject:* Re: Flink Conf "yarn.flink-dist-jar" Question
>>
>>
>>
>> Hi Andreas,
>>
>>
>>
>> it looks as if FLINK-13938 and FLINK-14964 won't make it into the 1.10.1
>> release because the community is about to start the release process. Since
>> FLINK-13938 is a new feature it will be shipped with a major release. There
>> is still a bit of time until the 1.11 feature freeze and if Yang Wang has
>> time to finish this PR, then we could ship it.
>>
>>
>>
>> Cheers,
>>
>> Till
>>
>>
>>
>> On Wed, Apr 15, 2020 at 3:23 PM Hailu, Andreas [Engineering] <
>> andreas.ha...@gs.com> wrote:
>>
>> Yang, Tison,
>>
>>
>>
>> Do we know when some solution for 13938 and 14964 will arrive? Do you
>> think it will be in a 1.10.x version?
>>
>>
>>
>> *// *ah
>>
>>
>>
>> *From:* Hailu, Andreas [Engineering]
>> *Sent:* Friday, March 20, 2020 9:19 AM
>> *To:* 'Yang Wang' 
>> *Cc:* tison ; user@flink.apache.org
>> *Subject:* RE: Flink Conf "yarn.flink-dist-jar" Question
>>
>>
>>
>> Hi Yang,
>>
>>
>>
>> This is good to know. As a stopgap measure until a solution between 13938
>> and 14964 arrives, we can automate the application staging directory
>> cleanup from our client should the process fail. It’s not ideal, but will
>> at least begin to manage our users’ quota. I’ll continue to watch the two
>> tickets. Thank you.
>>
>>
>>
>> *// *ah
>>
>>
>>
>> *From:* Yang Wang 
>> *Sent:* Monday, March 16, 2020 9:37 PM
>> *To:* Hailu, Andreas [Engineering] 
>> *Cc:* tison ; user@flink.apache.org
>> *Subject:* Re: Flink Conf "yarn.flink-dist-jar" Question
>>
>>
>>
>> Hi Hailu,
>>
>>
>>
>> Sorry for the late response. If the Flink cluster(e.g. Yarn application)
>> is stopped directly
>>
>> by `yarn application -kill`, then the staging directory will be left
>> behind. Since the jobmanager
>>
>> do not have any change to clean up the staging directly. Also it may
>> happen when the
>>
>> jobmanager crashed and reached the attempts limit of Yarn.
>>
>>
>>
>> For FLINK-13938, yes, it is trying to use the Yarn public cache to
>> accelerate the container
>>
>> launch.
>>
>>
>>
>>
>>
>> Best,
>>
>> Yang
>>
>>
>>
>> Hailu, Andreas  于2020年3月10日周二 上午4:38写道:
>>
>> Also may I ask what causes these application ID directories to be left
>> behind? Is it a job failure, or can they persist even if the application
>> succeeds? I’d like to know so that I can implement my own cleanup in the
>> interim to prevent exceeding user disk space quotas.
>>
>>
>>
>> *// *ah
>>
>>
>>
>> *From:* Hailu, Andreas [Engineering]
>> *Sent:* Monday, March 9, 2020 1:20 PM
>> *To:* 'Yang Wang' 
>> *Cc:* tison ; user@flink.apache.org
>> *Subject:* RE: Flink Conf "yarn.flink-dist-jar" Question
>>
>>
>>
>> Hi 

Re: AvroParquetWriter issues writing to S3

2020-04-17 Thread Arvid Heise
Hi Diogo,

I saw similar issues already. The root cause is always users actually not
using any Flink specific stuff, but going to the Parquet Writer of Hadoop
directly. As you can see in your stacktrace, there is not one reference to
any Flink class.

The solution usually is to use the respective Flink sink instead of
bypassing them [1].
If you opt to implement it manually nonetheless, it's probably easier to
bundle Hadoop from a non-Flink dependency.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/streamfile_sink.html

On Thu, Apr 16, 2020 at 5:36 PM Diogo Santos 
wrote:

> Hi Till,
>
> definitely seems to be a strange issue. The first time the job is loaded
> (with a clean instance of the Cluster) the job goes well, but if it is
> canceled or started again the issue came.
>
> I built an example here https://github.com/congd123/flink-s3-example
>
> You can generate the artifact of the Flink Job and start the cluster with
> the configuration on the docker-compose.
>
> Thanks for helping
>
>
>
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: Schema with TypeInformation or DataType

2020-04-17 Thread tison
Thanks for your inputs and sorry that I said Schema doesn't support
DataType to register a field because I was looking into Flink 1.9 codes...

Best,
tison.


Jark Wu  于2020年4月17日周五 下午2:42写道:

> Hi Tison,
>
> Migration from TypeInformation to DataType is a large work and will across
> many releases. As far as I can tell, we will finalize the work in 1.11.
> As godfrey said above, Flink SQL & Table API should always use DataType,
> DataStream uses TypeInformation.
>
> Schema already supports DataType to register a field, and the the method
> using TypeInformation to register field is deprecated since 1.10.
>
> Best,
> Jark
>
> On Fri, 17 Apr 2020 at 14:14, tison  wrote:
>
>> Hi,
>>
>> I notice that our type system has two branches. One  is TypeInformation
>> while the other is
>> DataType. It is said that Table API will use DataType but there are
>> several questions about
>> this statement:
>>
>> 1. Will TypeInformation be deprecated and we use DataType as type system
>> everywhere?
>> 2. Schema in Table API currently support only TypeInformation to register
>> a field, shall we support
>> the DataType way as well?
>>
>> Best,
>> tison.
>>
>


Re: How watermark is generated in sql DDL statement

2020-04-17 Thread Benchao Li
Actually, BoundedOutOfOrderWatermarkGenerator is only used in tests,
the real WatermarkGenerator is code generated in
WatermarkGeneratorCodeGenerator

lec ssmi  于2020年4月17日周五 下午5:19写道:

> I think you are all right. I have I checked the source code of
> WatermarkAssignerOperator, and I have found  the WatermarkGenerator  passed
> in  WatermarkAssignerOperator  is  the  interface WatermarkGenerator. And
> BoundedOutOfOrderWatermarkGenerator  is the only implementation class of
> WatermarkGenerator. By the way , the interval  is based processing time .
>
> Benchao Li  于2020年4月17日周五 下午5:06写道:
>
>> WatermarkAssignerOperator is an inner mechanism for generating watermarks.
>>
>> The "Bounded Out of Orderness" is just one kind of the watermark
>> expressions, which
>> is most commonly used.
>>
>> The main logic of WatermarkAssignerOperator is:
>> - keep currentWatermark and lastWatermark
>> - when each element comes in
>>   - get watermark from this element, using the *watermark expression*
>>   - if the watermark > currentWatermark, then currentWatermark is updated
>>   - if currentWatermark - lastWatermark > watermarkInterval
>> - emit watermark to downstream, and update lastWatermark
>>
>> lec ssmi  于2020年4月17日周五 下午4:50写道:
>>
>>> Maybe you are all right. I was  more confused  .
>>> As the cwiki said, flink could use BoundedOutOfOrderTimestamps ,
>>> [image: image.png]
>>>
>>> but I have heard about WatermarkAssignerOperator from Blink developers.
>>>
>>> Benchao Li  于2020年4月17日周五 下午4:33写道:
>>>
 Hi lec ssmi,

 It's a good question. In blink planner, we use code gen to handle
 watermark expression.
 And in `WatermarkAssignerOperator` we calculate current watermark when
 each element comes in.
 If the watermark - lastEmitedWatermark > watermark interval, we will
 emit the new watermark.

 So it's neither `PeriodicWatermark` nor `PunctuatedWatermark`.

 lec ssmi  于2020年4月17日周五 下午3:12写道:

> Hi:
>In sql API , the declaration of watermark is realized by ddl
> statement . But which way is it implemented?
>   * PeriodicWatermark *  or   *PunctuatedWatermark*?
>   There seems to be  no explanation on the official website.
>
>   Thanks.
>


 --

 Benchao Li
 School of Electronics Engineering and Computer Science, Peking University
 Tel:+86-15650713730
 Email: libenc...@gmail.com; libenc...@pku.edu.cn


>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: How watermark is generated in sql DDL statement

2020-04-17 Thread lec ssmi
I think you are all right. I have I checked the source code of
WatermarkAssignerOperator, and I have found  the WatermarkGenerator  passed
in  WatermarkAssignerOperator  is  the  interface WatermarkGenerator. And
BoundedOutOfOrderWatermarkGenerator  is the only implementation class of
WatermarkGenerator. By the way , the interval  is based processing time .

Benchao Li  于2020年4月17日周五 下午5:06写道:

> WatermarkAssignerOperator is an inner mechanism for generating watermarks.
>
> The "Bounded Out of Orderness" is just one kind of the watermark
> expressions, which
> is most commonly used.
>
> The main logic of WatermarkAssignerOperator is:
> - keep currentWatermark and lastWatermark
> - when each element comes in
>   - get watermark from this element, using the *watermark expression*
>   - if the watermark > currentWatermark, then currentWatermark is updated
>   - if currentWatermark - lastWatermark > watermarkInterval
> - emit watermark to downstream, and update lastWatermark
>
> lec ssmi  于2020年4月17日周五 下午4:50写道:
>
>> Maybe you are all right. I was  more confused  .
>> As the cwiki said, flink could use BoundedOutOfOrderTimestamps ,
>> [image: image.png]
>>
>> but I have heard about WatermarkAssignerOperator from Blink developers.
>>
>> Benchao Li  于2020年4月17日周五 下午4:33写道:
>>
>>> Hi lec ssmi,
>>>
>>> It's a good question. In blink planner, we use code gen to handle
>>> watermark expression.
>>> And in `WatermarkAssignerOperator` we calculate current watermark when
>>> each element comes in.
>>> If the watermark - lastEmitedWatermark > watermark interval, we will
>>> emit the new watermark.
>>>
>>> So it's neither `PeriodicWatermark` nor `PunctuatedWatermark`.
>>>
>>> lec ssmi  于2020年4月17日周五 下午3:12写道:
>>>
 Hi:
In sql API , the declaration of watermark is realized by ddl
 statement . But which way is it implemented?
   * PeriodicWatermark *  or   *PunctuatedWatermark*?
   There seems to be  no explanation on the official website.

   Thanks.

>>>
>>>
>>> --
>>>
>>> Benchao Li
>>> School of Electronics Engineering and Computer Science, Peking University
>>> Tel:+86-15650713730
>>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>>
>>>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: SQL_CLI构建流式应用参数设置

2020-04-17 Thread godfrey he
第一个问题:set execution.parallelism=10;
这样命令设置的job的默认并发度。一些算子有自己并发度的设置逻辑,不受默认并发度的影响(例如
hive的source,是根据partition数来的)。你可以在提交作业之前调用set命令来修改每个job的默认并发度。
第二个问题:1.11在 FLINK-16822[1] 被fix后,你可以通过配置项方式设置checkpoint [2]。例如:
set execution.checkpointing.mode=EXACTLY_ONCE。

[1] https://issues.apache.org/jira/browse/FLINK-16822
[2]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#checkpointing

Best,
Godfrey

Even <452232...@qq.com> 于2020年4月17日周五 下午3:44写道:

> Hi!Nabble似乎无法注册,所以在邮件列表中无法回复。
> 关于周三问题,还有一点疑惑:
> 1、在CLI通过 set 命令,如 set
> execution.parallelism=10动态设置并行度,是当前CLI创建的所有任务都是这个并行度吗?
> 2、SQL CLI 还不支持 checkpoint 的设置,这个以后会考虑支持吗?如果已考虑会在下个版本中发布吗?
> 非常感谢!
>
>
> 
> Hi Even,
>
> 1. 目前 SQL CLI 支持通过在 sql-client-default.yaml 里设置 parallelism
> 和 max-parallelism 来控制 CLI 任务的默认并发。或者通过 set 命令,如 set
> execution.parallelism=10;放方式动态设置。例外,对于如果使用 blink
> planner,可以用 table.exec.resource.default-parallelism 来配置默认并发。[1]
> 另外 SQL CLI 还不支持 checkpoint 的设置。
> 2. 目前 SQL CLI 默认是 in-memory catalog,在每个SQL CLI的独立进程中,不会共享。如果SQL
> CLI挂掉,in-memory catalog 也会消失。你可以配置你的catalog为 hive catalog [1],
> 这样你创建的表会持久化到
> hive catalog 中,多个SQL CLI使用同一个hive catalog,可以达到你说期望的共享。
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#environment-files
>
> Best,
> Godfrey
>
> Even <[hidden email] 于2020年4月15日周三 下午3:35写道:
>
>  Hi!
>  请教两个问题:
>  1、 Flink SQL CLI 纯文本方式构建一个流式应用,在DDL语句中如何设置checkpoint和并行度这些参数?
>  2、 Flink SQL CLI
> 
> 纯文本方式构建的流式应用创建的那些表,我在另外一个CLI中是无法找到这些table的,这是为什么?如果任务挂掉了,应该怎么重启,还是必须重新再构建?


Re: How watermark is generated in sql DDL statement

2020-04-17 Thread Benchao Li
WatermarkAssignerOperator is an inner mechanism for generating watermarks.

The "Bounded Out of Orderness" is just one kind of the watermark
expressions, which
is most commonly used.

The main logic of WatermarkAssignerOperator is:
- keep currentWatermark and lastWatermark
- when each element comes in
  - get watermark from this element, using the *watermark expression*
  - if the watermark > currentWatermark, then currentWatermark is updated
  - if currentWatermark - lastWatermark > watermarkInterval
- emit watermark to downstream, and update lastWatermark

lec ssmi  于2020年4月17日周五 下午4:50写道:

> Maybe you are all right. I was  more confused  .
> As the cwiki said, flink could use BoundedOutOfOrderTimestamps ,
> [image: image.png]
>
> but I have heard about WatermarkAssignerOperator from Blink developers.
>
> Benchao Li  于2020年4月17日周五 下午4:33写道:
>
>> Hi lec ssmi,
>>
>> It's a good question. In blink planner, we use code gen to handle
>> watermark expression.
>> And in `WatermarkAssignerOperator` we calculate current watermark when
>> each element comes in.
>> If the watermark - lastEmitedWatermark > watermark interval, we will emit
>> the new watermark.
>>
>> So it's neither `PeriodicWatermark` nor `PunctuatedWatermark`.
>>
>> lec ssmi  于2020年4月17日周五 下午3:12写道:
>>
>>> Hi:
>>>In sql API , the declaration of watermark is realized by ddl
>>> statement . But which way is it implemented?
>>>   * PeriodicWatermark *  or   *PunctuatedWatermark*?
>>>   There seems to be  no explanation on the official website.
>>>
>>>   Thanks.
>>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: How watermark is generated in sql DDL statement

2020-04-17 Thread lec ssmi
Maybe you are all right. I was  more confused  .
As the cwiki said, flink could use BoundedOutOfOrderTimestamps ,
[image: image.png]

but I have heard about WatermarkAssignerOperator from Blink developers.

Benchao Li  于2020年4月17日周五 下午4:33写道:

> Hi lec ssmi,
>
> It's a good question. In blink planner, we use code gen to handle
> watermark expression.
> And in `WatermarkAssignerOperator` we calculate current watermark when
> each element comes in.
> If the watermark - lastEmitedWatermark > watermark interval, we will emit
> the new watermark.
>
> So it's neither `PeriodicWatermark` nor `PunctuatedWatermark`.
>
> lec ssmi  于2020年4月17日周五 下午3:12写道:
>
>> Hi:
>>In sql API , the declaration of watermark is realized by ddl statement
>> . But which way is it implemented?
>>   * PeriodicWatermark *  or   *PunctuatedWatermark*?
>>   There seems to be  no explanation on the official website.
>>
>>   Thanks.
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>


Re: Jars replication

2020-04-17 Thread Chesnay Schepler
My apologies, I remembered wrong. The jar endpoints all require working 
against the leading job master unfortunately.


On 17/04/2020 10:18, Andrés Garagiola wrote:

Thank Chesnay,

I'm invoking the "/jars" endpoint in both JMs and only one of them 
answers with the uploaded jar. If I try to send a run request to the 
one that doesn't have the jar, it responds with a "jar not found" 
error. I didn't find documentation about this, so I don't know if it 
is the expected behavior or something wrong in my cluster configuration.


Regards

On Fri, Apr 17, 2020 at 10:05 AM Chesnay Schepler > wrote:


Jars are not replicated to all JobManagers, this is currently
expected,
but generally undesirable for the use-case you out-lined.

IIRC the important part is that the upload goes directly against the
leader, the run request can be sent anywhere and it will be
redirected
internally to the leader.

On 17/04/2020 09:59, Andrés Garagiola wrote:
> Hi,
>
> I'm configuring a Flink cluster with high availability based on
> ZooKeeper and two Job Managers.
>
> When I upload a jar using the /jars/upload REST API, I don't get
the
> jar replicated in both JMs. Is this the expected behavior?
>
> I want to configure the cluster in such a way that once the jar is
> uploaded, I'm able to schedule multiple jobs based on the same
jar id,
> independently of what is the leader JM at the scheduling time.
>
> Also, I'm using a K8 service to reach the job managers, this
service
> choose one of the JM pods in every request, should I invoke
always the
> leader one?
>
> Thanks






Re: How watermark is generated in sql DDL statement

2020-04-17 Thread Benchao Li
Hi lec ssmi,

It's a good question. In blink planner, we use code gen to handle watermark
expression.
And in `WatermarkAssignerOperator` we calculate current watermark when each
element comes in.
If the watermark - lastEmitedWatermark > watermark interval, we will emit
the new watermark.

So it's neither `PeriodicWatermark` nor `PunctuatedWatermark`.

lec ssmi  于2020年4月17日周五 下午3:12写道:

> Hi:
>In sql API , the declaration of watermark is realized by ddl statement
> . But which way is it implemented?
>   * PeriodicWatermark *  or   *PunctuatedWatermark*?
>   There seems to be  no explanation on the official website.
>
>   Thanks.
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: Jars replication

2020-04-17 Thread Chesnay Schepler
Jars are not replicated to all JobManagers, this is currently expected, 
but generally undesirable for the use-case you out-lined.


IIRC the important part is that the upload goes directly against the 
leader, the run request can be sent anywhere and it will be redirected 
internally to the leader.


On 17/04/2020 09:59, Andrés Garagiola wrote:

Hi,

I'm configuring a Flink cluster with high availability based on 
ZooKeeper and two Job Managers.


When I upload a jar using the /jars/upload REST API, I don't get the 
jar replicated in both JMs. Is this the expected behavior?


I want to configure the cluster in such a way that once the jar is 
uploaded, I'm able to schedule multiple jobs based on the same jar id, 
independently of what is the leader JM at the scheduling time.


Also, I'm using a K8 service to reach the job managers, this service 
choose one of the JM pods in every request, should I invoke always the 
leader one?


Thanks





Jars replication

2020-04-17 Thread Andrés Garagiola
Hi,

I'm configuring a Flink cluster with high availability based on ZooKeeper
and two Job Managers.

When I upload a jar using the /jars/upload REST API, I don't get the jar
replicated in both JMs. Is this the expected behavior?

I want to configure the cluster in such a way that once the jar is
uploaded, I'm able to schedule multiple jobs based on the same jar id,
independently of what is the leader JM at the scheduling time.

Also, I'm using a K8 service to reach the job managers, this service choose
one of the JM pods in every request, should I invoke always the leader one?

Thanks


SQL_CLI????????????????????

2020-04-17 Thread Even
Hi??Nabble

1CLI set  set 
execution.parallelism=10??CLI
2??SQL CLI  checkpoint 
??
??



Hi Even,

1.  SQL CLI ?? sql-client-default.yaml ?? parallelism
?? max-parallelism ?? CLI  set  set
execution.parallelism=10;?? blink
planner table.exec.resource.default-parallelism 
[1]
 SQL CLI  checkpoint 
2.  SQL CLI ?? in-memory catalogSQL 
CLISQL
CLI??in-memory catalog catalog?? hive catalog [1], 

hive catalog SQL CLI??hive 
catalog??

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sqlClient.html#environment-files

Best,
Godfrey

Even <[hidden email] ??2020??4??15?? 3:35??

 Hi??
 ??
 1?? Flink SQL CLI 
??DDL??checkpoint??
 2?? Flink SQL CLI
 
??CLItable??

回复: 关于状态TTL

2020-04-17 Thread 酷酷的浑蛋
好的,非常感谢您,我去按照您说的代码改下,非常感谢




在2020年4月17日 15:17,Benchao Li 写道:
嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。

酷酷的浑蛋  于2020年4月17日周五 下午3:09写道:

我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有
tableConfig.setIdleStateRetentionTime(Time.minutes(1),
Time.minutes(6));这种方式设置ttl




在2020年4月17日 14:54,Benchao Li 写道:
嗯,blink planner跟legacy planner是有一些实现上的差异。
如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:

static StateTtlConfig createTtlConfig(long retentionTime, boolean
stateCleaningEnabled) {
if (stateCleaningEnabled) {
checkArgument(retentionTime > 0);
return StateTtlConfig
.newBuilder(Time.milliseconds(retentionTime))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupInBackground() // added this line
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// changed this line
.build();
} else {
return StateTtlConfig.DISABLED;
}
}


酷酷的浑蛋  于2020年4月17日周五 下午2:47写道:

我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期




在2020年4月17日 14:16,Benchao Li 写道:
这是两个问题,

- 状态只访问一次,可能不会清理。



这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
- 状态已经过期了,但是会被使用到。
这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。

[1] https://issues.apache.org/jira/browse/FLINK-16581

酷酷的浑蛋  于2020年4月17日周五 下午2:06写道:

其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了




在2020年4月17日 13:07,Benchao Li 写道:
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。

[1] https://issues.apache.org/jira/browse/FLINK-17199

酷酷的浑蛋  于2020年4月17日周五 下午12:51写道:





我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
在2020年4月16日 15:28,酷酷的浑蛋 写道:
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋  于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a





当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: 关于状态TTL

2020-04-17 Thread Benchao Li
嗯,这个是需要修改flink源码的,不只是从应用层的配置来修改。这个修改是修改Flink SQL底层的实现的。

酷酷的浑蛋  于2020年4月17日周五 下午3:09写道:

> 我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有
>  tableConfig.setIdleStateRetentionTime(Time.minutes(1),
> Time.minutes(6));这种方式设置ttl
>
>
>
>
> 在2020年4月17日 14:54,Benchao Li 写道:
> 嗯,blink planner跟legacy planner是有一些实现上的差异。
> 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:
>
> static StateTtlConfig createTtlConfig(long retentionTime, boolean
> stateCleaningEnabled) {
> if (stateCleaningEnabled) {
> checkArgument(retentionTime > 0);
> return StateTtlConfig
> .newBuilder(Time.milliseconds(retentionTime))
> .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
> .cleanupInBackground() // added this line
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
> // changed this line
> .build();
> } else {
> return StateTtlConfig.DISABLED;
> }
> }
>
>
> 酷酷的浑蛋  于2020年4月17日周五 下午2:47写道:
>
> 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期
>
>
>
>
> 在2020年4月17日 14:16,Benchao Li 写道:
> 这是两个问题,
>
> - 状态只访问一次,可能不会清理。
>
>
>
> 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
> - 状态已经过期了,但是会被使用到。
> 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。
>
> [1] https://issues.apache.org/jira/browse/FLINK-16581
>
> 酷酷的浑蛋  于2020年4月17日周五 下午2:06写道:
>
> 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
>
>
>
>
> 在2020年4月17日 13:07,Benchao Li 写道:
> 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
> 所以这个问题现在是不能完全避免了。
> 我已经建了一个jira[1]来跟踪和改进这一点。
>
> [1] https://issues.apache.org/jira/browse/FLINK-17199
>
> 酷酷的浑蛋  于2020年4月17日周五 下午12:51写道:
>
>
>
>
>
> 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
> 在2020年4月16日 15:28,酷酷的浑蛋 写道:
> 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
>
>
>
>
> 在2020年4月15日 18:04,Benchao Li 写道:
> Hi,
>
> 你用的是哪个版本呢?
> 在1.9版本里面的确是有点问题,默认没有开启cleanup in background
> [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。
>
> [1] https://issues.apache.org/jira/browse/FLINK-15938
>
> 酷酷的浑蛋  于2020年4月15日周三 下午5:40写道:
>
>
>
> 我在flink sql中设置了
> tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
> sql: select * from test t join test2 t2 on t.a=t2.a
>
>
>
>
>
> 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


How watermark is generated in sql DDL statement

2020-04-17 Thread lec ssmi
Hi:
   In sql API , the declaration of watermark is realized by ddl statement .
But which way is it implemented?
  * PeriodicWatermark *  or   *PunctuatedWatermark*?
  There seems to be  no explanation on the official website.

  Thanks.


回复: 关于状态TTL

2020-04-17 Thread 酷酷的浑蛋
我这边用的是sql方式,可以像您说的那样修改吗,我看官网sql方式只有 
tableConfig.setIdleStateRetentionTime(Time.minutes(1), 
Time.minutes(6));这种方式设置ttl




在2020年4月17日 14:54,Benchao Li 写道:
嗯,blink planner跟legacy planner是有一些实现上的差异。
如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:

static StateTtlConfig createTtlConfig(long retentionTime, boolean
stateCleaningEnabled) {
if (stateCleaningEnabled) {
checkArgument(retentionTime > 0);
return StateTtlConfig
.newBuilder(Time.milliseconds(retentionTime))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.cleanupInBackground() // added this line
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// changed this line
.build();
} else {
return StateTtlConfig.DISABLED;
}
}


酷酷的浑蛋  于2020年4月17日周五 下午2:47写道:

我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期




在2020年4月17日 14:16,Benchao Li 写道:
这是两个问题,

- 状态只访问一次,可能不会清理。


这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
- 状态已经过期了,但是会被使用到。
这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。

[1] https://issues.apache.org/jira/browse/FLINK-16581

酷酷的浑蛋  于2020年4月17日周五 下午2:06写道:

其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了




在2020年4月17日 13:07,Benchao Li 写道:
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。

[1] https://issues.apache.org/jira/browse/FLINK-17199

酷酷的浑蛋  于2020年4月17日周五 下午12:51写道:





我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
在2020年4月16日 15:28,酷酷的浑蛋 写道:
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋  于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a




当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: 关于状态TTL

2020-04-17 Thread Benchao Li
而且https://issues.apache.org/jira/browse/FLINK-15938 和
https://issues.apache.org/jira/browse/FLINK-16581
这两个issue现在已经都merge了,你也可以cherry-pick过去。

Benchao Li  于2020年4月17日周五 下午2:54写道:

> 嗯,blink planner跟legacy planner是有一些实现上的差异。
> 如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:
>
> static StateTtlConfig createTtlConfig(long retentionTime, boolean 
> stateCleaningEnabled) {
>if (stateCleaningEnabled) {
>   checkArgument(retentionTime > 0);
>   return StateTtlConfig
>  .newBuilder(Time.milliseconds(retentionTime))
>  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
>  .cleanupInBackground() // added this line
>  
> .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 
> changed this line
>  .build();
>} else {
>   return StateTtlConfig.DISABLED;
>}
> }
>
>
> 酷酷的浑蛋  于2020年4月17日周五 下午2:47写道:
>
>> 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期
>>
>>
>>
>>
>> 在2020年4月17日 14:16,Benchao Li 写道:
>> 这是两个问题,
>>
>> - 状态只访问一次,可能不会清理。
>>
>>
>> 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
>> - 状态已经过期了,但是会被使用到。
>> 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-16581
>>
>> 酷酷的浑蛋  于2020年4月17日周五 下午2:06写道:
>>
>> 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
>>
>>
>>
>>
>> 在2020年4月17日 13:07,Benchao Li 写道:
>> 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
>> 所以这个问题现在是不能完全避免了。
>> 我已经建了一个jira[1]来跟踪和改进这一点。
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-17199
>>
>> 酷酷的浑蛋  于2020年4月17日周五 下午12:51写道:
>>
>>
>>
>>
>>
>> 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
>> 在2020年4月16日 15:28,酷酷的浑蛋 写道:
>> 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
>>
>>
>>
>>
>> 在2020年4月15日 18:04,Benchao Li 写道:
>> Hi,
>>
>> 你用的是哪个版本呢?
>> 在1.9版本里面的确是有点问题,默认没有开启cleanup in background
>> [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-15938
>>
>> 酷酷的浑蛋  于2020年4月15日周三 下午5:40写道:
>>
>>
>>
>> 我在flink sql中设置了
>> tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
>> sql: select * from test t join test2 t2 on t.a=t2.a
>>
>>
>>
>>
>> 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明
>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>>
>>
>> --
>>
>> Benchao Li
>> School of Electronics Engineering and Computer Science, Peking University
>> Tel:+86-15650713730
>> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>

-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: 关于状态TTL

2020-04-17 Thread Benchao Li
嗯,blink planner跟legacy planner是有一些实现上的差异。
如果你这边方便改下代码的话,应该是能够对齐这个功能的,其实很简单,就两行代码,在JoinRecordStateViews中:

static StateTtlConfig createTtlConfig(long retentionTime, boolean
stateCleaningEnabled) {
   if (stateCleaningEnabled) {
  checkArgument(retentionTime > 0);
  return StateTtlConfig
 .newBuilder(Time.milliseconds(retentionTime))
 .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
 .cleanupInBackground() // added this line
 .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
// changed this line
 .build();
   } else {
  return StateTtlConfig.DISABLED;
   }
}


酷酷的浑蛋  于2020年4月17日周五 下午2:47写道:

> 我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期
>
>
>
>
> 在2020年4月17日 14:16,Benchao Li 写道:
> 这是两个问题,
>
> - 状态只访问一次,可能不会清理。
>
>
> 这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
> - 状态已经过期了,但是会被使用到。
> 这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。
>
> [1] https://issues.apache.org/jira/browse/FLINK-16581
>
> 酷酷的浑蛋  于2020年4月17日周五 下午2:06写道:
>
> 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
>
>
>
>
> 在2020年4月17日 13:07,Benchao Li 写道:
> 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
> 所以这个问题现在是不能完全避免了。
> 我已经建了一个jira[1]来跟踪和改进这一点。
>
> [1] https://issues.apache.org/jira/browse/FLINK-17199
>
> 酷酷的浑蛋  于2020年4月17日周五 下午12:51写道:
>
>
>
>
>
> 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
> 在2020年4月16日 15:28,酷酷的浑蛋 写道:
> 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
>
>
>
>
> 在2020年4月15日 18:04,Benchao Li 写道:
> Hi,
>
> 你用的是哪个版本呢?
> 在1.9版本里面的确是有点问题,默认没有开启cleanup in background
> [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。
>
> [1] https://issues.apache.org/jira/browse/FLINK-15938
>
> 酷酷的浑蛋  于2020年4月15日周三 下午5:40写道:
>
>
>
> 我在flink sql中设置了
> tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
> sql: select * from test t join test2 t2 on t.a=t2.a
>
>
>
>
> 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


回复: 关于状态TTL

2020-04-17 Thread 酷酷的浑蛋
我刚才测试了下,发现如果不用blink,join场景就是正常过期然后访问不到了,而而blink就会先访问一次才过期




在2020年4月17日 14:16,Benchao Li 写道:
这是两个问题,

- 状态只访问一次,可能不会清理。

这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
- 状态已经过期了,但是会被使用到。
这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。

[1] https://issues.apache.org/jira/browse/FLINK-16581

酷酷的浑蛋  于2020年4月17日周五 下午2:06写道:

其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了




在2020年4月17日 13:07,Benchao Li 写道:
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。

[1] https://issues.apache.org/jira/browse/FLINK-17199

酷酷的浑蛋  于2020年4月17日周五 下午12:51写道:





我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
在2020年4月16日 15:28,酷酷的浑蛋 写道:
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋  于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a



当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Re: 为消息分配时间戳但不想重新分配水印

2020-04-17 Thread taowang
我的测试结果时,把原先的丢弃掉,完全采用最新的逻辑。


 原始邮件 
发件人: lec ssmi
收件人: flink-user-cn
发送时间: 2020年4月17日(周五) 14:43
主题: Re: 为消息分配时间戳但不想重新分配水印


watermark的重新生成,是将新的watermark也加入到watermark队列,然后选出一个最值,还是将原先的watermark直接丢弃掉,改用新的? 
taowang  于2020年4月17日周五 上午10:46写道: > 
是的,是生效了的。没见到别人这样用过,在我的场景里这个操作是实现了我需要的逻辑,但会不会引起其他问题,我暂时还没有发现。 > > > 原始邮件 > 发件人: 
lec ssmi > 收件人: 
flink-user-cn > 发送时间: 2020年4月17日(周五) 09:25 > 主题: Re: 
为消息分配时间戳但不想重新分配水印 > > > 请问,你对DataStream重新声明时间列和水印,生效吗? taowang 
 > 于2020年4月16日周四 下午10:49写道: > 嗯嗯,还是十分感谢。 > > 
那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。 > > > 打扰各位了,祝好!~ > > > 
> 原始邮件 > 发件人: tison > 收件人: user-zh< > 
user-zh@flink.apache.org> > 发送时间: 2020年4月16日(周四) 22:39 > 主题: Re: > 
为消息分配时间戳但不想重新分配水印 > > > 正在载入邮件原文…

Re: 为消息分配时间戳但不想重新分配水印

2020-04-17 Thread lec ssmi
watermark的重新生成,是将新的watermark也加入到watermark队列,然后选出一个最值,还是将原先的watermark直接丢弃掉,改用新的?

taowang  于2020年4月17日周五 上午10:46写道:

> 是的,是生效了的。没见到别人这样用过,在我的场景里这个操作是实现了我需要的逻辑,但会不会引起其他问题,我暂时还没有发现。
>
>
>  原始邮件
> 发件人: lec ssmi
> 收件人: flink-user-cn
> 发送时间: 2020年4月17日(周五) 09:25
> 主题: Re: 为消息分配时间戳但不想重新分配水印
>
>
> 请问,你对DataStream重新声明时间列和水印,生效吗? taowang 
> 于2020年4月16日周四 下午10:49写道: > 嗯嗯,还是十分感谢。 >
> 那我暂时就把这个问题先搁置了,至于issue和我们自己的实现,我自己有时间的时候就去提一个,并添加自己的实现。 > > > 打扰各位了,祝好!~ >
> > > 原始邮件 > 发件人: tison > 收件人: user-zh<
> user-zh@flink.apache.org> > 发送时间: 2020年4月16日(周四) 22:39 > 主题: Re:
> 为消息分配时间戳但不想重新分配水印 > > > 正在载入邮件原文…


Re: Schema with TypeInformation or DataType

2020-04-17 Thread Jark Wu
Hi Tison,

Migration from TypeInformation to DataType is a large work and will across
many releases. As far as I can tell, we will finalize the work in 1.11.
As godfrey said above, Flink SQL & Table API should always use DataType,
DataStream uses TypeInformation.

Schema already supports DataType to register a field, and the the method
using TypeInformation to register field is deprecated since 1.10.

Best,
Jark

On Fri, 17 Apr 2020 at 14:14, tison  wrote:

> Hi,
>
> I notice that our type system has two branches. One  is TypeInformation
> while the other is
> DataType. It is said that Table API will use DataType but there are
> several questions about
> this statement:
>
> 1. Will TypeInformation be deprecated and we use DataType as type system
> everywhere?
> 2. Schema in Table API currently support only TypeInformation to register
> a field, shall we support
> the DataType way as well?
>
> Best,
> tison.
>


Re: Schema with TypeInformation or DataType

2020-04-17 Thread godfrey he
Hi tison,

>1. Will TypeInformation be deprecated and we use DataType as type system
everywhere?
AFAIK, runtime will still supports TypeInformation, while table module
supports DataType

> 2. Schema in Table API currently support only TypeInformation to register
a field, shall we support
the DataType way as well?
Schema also supports DataType since FLINK-14645[1]

[1] https://issues.apache.org/jira/browse/FLINK-14645

Best,
Godfrey

tison  于2020年4月17日周五 下午2:14写道:

> Hi,
>
> I notice that our type system has two branches. One  is TypeInformation
> while the other is
> DataType. It is said that Table API will use DataType but there are
> several questions about
> this statement:
>
> 1. Will TypeInformation be deprecated and we use DataType as type system
> everywhere?
> 2. Schema in Table API currently support only TypeInformation to register
> a field, shall we support
> the DataType way as well?
>
> Best,
> tison.
>


Re: 关于状态TTL

2020-04-17 Thread Benchao Li
这是两个问题,

- 状态只访问一次,可能不会清理。

这个在1.9的早期版本是有这个问题的,因为当时没有enableCleanupInBackgroud,这个在1.9.3中会修复。1.10之后已经默认开了清理策略,所以不会有这个问题。
- 状态已经过期了,但是会被使用到。
  这个现在还存在,但是在这个issue[1] 中会捎带着修复这个事情。

[1] https://issues.apache.org/jira/browse/FLINK-16581

酷酷的浑蛋  于2020年4月17日周五 下午2:06写道:

> 其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了
>
>
>
>
> 在2020年4月17日 13:07,Benchao Li 写道:
> 我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
> 所以这个问题现在是不能完全避免了。
> 我已经建了一个jira[1]来跟踪和改进这一点。
>
> [1] https://issues.apache.org/jira/browse/FLINK-17199
>
> 酷酷的浑蛋  于2020年4月17日周五 下午12:51写道:
>
>
>
>
>
> 我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
> 在2020年4月16日 15:28,酷酷的浑蛋 写道:
> 好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个
>
>
>
>
> 在2020年4月15日 18:04,Benchao Li 写道:
> Hi,
>
> 你用的是哪个版本呢?
> 在1.9版本里面的确是有点问题,默认没有开启cleanup in background
> [1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。
>
> [1] https://issues.apache.org/jira/browse/FLINK-15938
>
> 酷酷的浑蛋  于2020年4月15日周三 下午5:40写道:
>
>
>
> 我在flink sql中设置了
> tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
> sql: select * from test t join test2 t2 on t.a=t2.a
>
>
>
> 当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>
>
>
> --
>
> Benchao Li
> School of Electronics Engineering and Computer Science, Peking University
> Tel:+86-15650713730
> Email: libenc...@gmail.com; libenc...@pku.edu.cn
>


-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn


Schema with TypeInformation or DataType

2020-04-17 Thread tison
Hi,

I notice that our type system has two branches. One  is TypeInformation
while the other is
DataType. It is said that Table API will use DataType but there are several
questions about
this statement:

1. Will TypeInformation be deprecated and we use DataType as type system
everywhere?
2. Schema in Table API currently support only TypeInformation to register a
field, shall we support
the DataType way as well?

Best,
tison.


回复: 关于状态TTL

2020-04-17 Thread 酷酷的浑蛋
其实我这边遇到的问题是,大部分状态就只访问一次,以后不会再访问了,导致状态一直不会自动清理,状态越来越大,最终程序就停止了




在2020年4月17日 13:07,Benchao Li 写道:
我发现我们之前state用的策略是`ReturnExpiredIfNotCleanedUp`策略,而不是`NeverReturnExpired`,
所以这个问题现在是不能完全避免了。
我已经建了一个jira[1]来跟踪和改进这一点。

[1] https://issues.apache.org/jira/browse/FLINK-17199

酷酷的浑蛋  于2020年4月17日周五 下午12:51写道:





我把版本升级到了flink1.10.0,这个问题又复现了,该怎么办呢?
在2020年4月16日 15:28,酷酷的浑蛋 写道:
好的谢谢,我这里是1.9.1,我先试试升级版本能不能解决这个




在2020年4月15日 18:04,Benchao Li 写道:
Hi,

你用的是哪个版本呢?
在1.9版本里面的确是有点问题,默认没有开启cleanup in background
[1],不知道是不是这个问题导致的。不过这个已经在1.9.3中修复了。

[1] https://issues.apache.org/jira/browse/FLINK-15938

酷酷的浑蛋  于2020年4月15日周三 下午5:40写道:



我在flink sql中设置了
tableConfig.setIdleStateRetentionTime(Time.minutes(1), Time.minutes(6));
sql: select * from test t join test2 t2 on t.a=t2.a


当超过过期时间的时候,已经过期的数据还会被关联到一次,然后状态才会被清除,有什么方式可以数据过期了不用等到在关联(读到)一次在清除呢?非sql的话可以用StateTtlConfigttlConfig=StateTtlConfig来设置,sql中要怎么设置呢?官网上没有给出说明



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn



--

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenc...@gmail.com; libenc...@pku.edu.cn