回复: 求助帖: 流join场景可能出现的重复计算

2020-01-14 Thread Yuan,Youjun
取决于具体的场景。想到的有如下几种方案:
1,group by student_id和student_name,而不是只group by 
student_id。当然前提是修改同名名字不会推送一条消息到流1.
2,过滤掉update的消息
3,基于时间窗口的聚合,对于student表的数据,每n秒输出一个唯一的student_id,然后再与score流join。

-邮件原件-
发件人: xin Destiny  
发送时间: Tuesday, January 14, 2020 6:39 PM
收件人: user-zh@flink.apache.org
主题: Re: 求助帖: 流join场景可能出现的重复计算

Hi,
如果说插入两条update操作呢,一次分数是-97,一次是97




Ren Xie  于2020年1月14日周二 下午6:20写道:

> 实际场景还是有点复杂的, 便于理解 我简化成这样的,  简化后的这个, 没有实际的代码, 抱歉
>
> 大致 写一下 也就是这样了
> ```sql
> select sum(score)
> from
> student t1 inner join score t2 on t1.student_id = t2.std_id where
> t1.student_id = 11
> ```
> 然后
>
> ```Java
> String sql = ↑;
> Table t = tEnv.sqlQuery(sql);
> DataStream stream1 = tEnv.toAppendStream(t, Integer.class); 
> stream1.keyBy("").sum("");
> ```
>
> 这样的一个sql, 在student表插入一个数据, score表插入2个数据后, 会执行一次计算出一个结果97 + 98
>
> update 学生表的name后, 一个新事件进入student的流, 还会触发一次计算, 得到97 + 98
>
> 因为可能有新的成绩插入, 所以对 stream1进行sum操作, 导致 97和98 都被重复计算了一次
>
>
> Caizhi Weng  于2020年1月14日周二 下午5:49写道:
>
> > Hi,
> >
> > 有可能的话,是否方便提供一下代码呢?
> >
> > Ren Xie  于2020年1月14日周二 下午5:38写道:
> >
> > > 学生
> > > student_id name
> > > 11 foo
> > >
> > > 学科分数
> > > id name score std_id
> > > 100 math 97 11
> > > 101 english 98 11
> > >
> > > 有如下一个场景(假设只有一个学生)
> > >
> > > 基于binlog检测这2个表的变化, 计算这个学生的总分数, 使用了Table/SQL API join操作计算
> > >
> > > 假设insert以上数据后到达某时刻, 以上数据都进入了flink, 计算出这个学生总分数 97 + 98 = 195
> > >
> > > 但此时发现学生姓名登记错误, 于是进行了修改,
> > > 结果此时Flink中学生流中有2个事件(insert的一个+update的一个), 分数流中有2个事件, 计算的总分数就会是 2 * 
> > > (97
> +
> > > 98) = 390
> > >
> > > Q: 请问这种场景下使用什么能够解决, 计算出正确的结果 97 + 98 = 193
> > >
> > > 接触flink不久, 不是太了解, 请大佬给个提示, 谢谢!!
> > >
> >
>


回复: 如果用flink sql持续查询过去30分钟登录网站的人数?

2019-12-18 Thread Yuan,Youjun
不好意思,之前没看到这个问题。
Darwin-amd64就是mac上的可执行文件格式。信任他,直接可以执行的。

-邮件原件-
发件人: 陈帅  
发送时间: Saturday, December 7, 2019 10:48 PM
收件人: user-zh@flink.apache.org
主题: Re: 如果用flink sql持续查询过去30分钟登录网站的人数?

请问这个平台生成可执行文件creek是如何实现的?对应的Dawin-amd64环境下载下来的文件是什么格式的?

Yuan,Youjun  于2019年12月7日周六 下午8:32写道:

> 是的,与Flink完全一样的SQL接口,为边缘计算设计的流式计算框架。
>
>
> -邮件原件-
> 发件人: 陈帅 
> 发送时间: Saturday, December 7, 2019 11:36 AM
> 收件人: user-zh@flink.apache.org
> 主题: Re: 如果用flink sql持续查询过去30分钟登录网站的人数?
>
> 你们这个平台还挺方便快速验证的,是扩展了Flink SQL吗?
> 虽然没有完全解决我的问题,但还是要谢谢你。
>
> Yuan,Youjun  于2019年12月5日周四 上午10:41写道:
>
> > 可以用30分钟的range over窗口来处理,但是你提到两个0值得输出恐怕做不到,没有数据,没有产出。
> > 假设你得输入包含ts和userid两个字段,分别为时间戳和用户id,那么SQL应该这样:
> > INSERT INTO mysink
> > SELECT
> >ts, userid,
> >COUNT(userid)
> >OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN 
> > INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc
> >
> > 以如下输入为例:
> > "2019-12-05 12:02:00,user1",
> > "2019-12-05 12:13:00,user1",
> > "2019-12-05 12:15:00,user1",
> > "2019-12-05 12:31:00,user1",
> > "2019-12-05 12:40:00,user1",
> > "2019-12-05 12:45:00,user1"
> > 产出如下结果:
> > {"cnt":1,"ts":157554732,"userid":"user1"}
> > {"cnt":2,"ts":157554798,"userid":"user1"}
> > {"cnt":3,"ts":157554810,"userid":"user1"}
> > {"cnt":4,"ts":157554906,"userid":"user1"}
> > {"cnt":4,"ts":157554960,"userid":"user1"}
> > {"cnt":4,"ts":157554990,"userid":"user1"}
> >
> > 为了验证上述SQL,你可以将如下作业粘贴到http://creek.baidubce.com/
> > 的作业定义输入框中,点击生成可执行文件,运行下载到的可执行文件,就能看到结果:
> > {
> > "sources": [{
> > "schema": {
> > "format": "CSV",
> > "fields": [{
> > "name": "ts",
> > "type": "SQL_TIMESTAMP"
> > },
> > {
> > "name": "userid",
> > "type": "STRING"
> > }]
> > },
> > "watermark": 0,
> > "name": "mysrc",
> > "eventTime": "ts",
> > "type": "COLLECTION",
> > "attr": {
> > "input":[
> > "2019-12-05 12:02:00,user1",
> > "2019-12-05 12:13:00,user1",
> > "2019-12-05 12:15:00,user1",
> > "2019-12-05 12:31:00,user1",
> > "2019-12-05 12:40:00,user1",
> > "2019-12-05 12:45:00,user1"
> >   ]
> >   }
> > }],
> > "sink": {
> > "schema": {
> > "format": "JSON"
> > },
> > "name": "mysink",
> > "type": "STDOUT"
> > },
> > "name": "demojob",
> > "timeType": "EVENTTIME",
> > "sql": "INSERT INTO mysink SELECT rowtime, userid, COUNT(userid) 
> > OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL '30'
> > MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc"
> > }
> >
> >
> > 当然上面的例子是以事件时间,用处理时间也是可以的。为了验证,你可以把source.type从COLLECTION改成STDIN,把tim
> > eT
> > ype从EVENTTIME改成PROCESSTIME,重新生成、运行,从命令行下输入数据。
> >
> > 袁尤军
> >
> > -邮件原件-
> > 发件人: 陈帅 
> > 发送时间: Wednesday, December 4, 2019 11:40 PM
> > 收件人: user-zh@flink.apache.org
> > 主题: 如果用flink sql持续查询过去30分钟登录网站的人数?
> >
> > 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
> > 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
> > 12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4),
> > 12:41 (5), 12:46 (4), 13:16 (0)
> > 即每个元素进来就会设一个30分钟过期时间,窗口状态是维护还当前未过期元素集合。
> >
> > 如果用sliding
> > window的话,步长需要设置成1秒,那么窗口个数会膨胀很多,而实际上我只需要统计其中一个窗口,多余的窗口浪费了。我也考虑过用
> > over window,但是不知道它是否支持处理时间,因为我的场景是需要根据处理时间推移而改变统计值的。我尝试用stream
> > api来实现,利用了timerService设置元素过期时间,但我测下来发现元素过期速度赶不上进入元素的速度,导致state大小一直增长.
> >
> > 所以想问一下:
> > 1. 针对这种case有没有标准做法?sql支持吗?
> > 2. 要怎么解决timerService的性能问题?timerService底层实现是不是单线程处理priority queue?
> >
> > 谢谢!
> > 陈帅
> >
>


回复: flink持续查询过去30分钟登录网站的人数

2019-12-11 Thread Yuan,Youjun
首先通过一个自定义表函数(table function),将每条输入的消息变成3条消息,假设输入消息的时间为ts,产出的三条消息时间分别为:(ts-1, 
0), (ts+1, 1), (ts+31, 0),
然后再用30分钟的range over窗口对前面的数值部分(0或者1)求SUM

袁尤军

-邮件原件-
发件人: 陈帅  
发送时间: Wednesday, December 11, 2019 9:31 PM
收件人: user-zh@flink.apache.org
主题: flink持续查询过去30分钟登录网站的人数

例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无

那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4), 12:41 (5), 
12:46 (4), 13:16 (0)

即每个元素进来就会设一个30分钟过期时间,窗口状态是当前还未过期元素集合。

用flink stream api和flink sql分别要如何实现?如果用timerService的话过多元素ttl会不会造成性能问题?


回复: 窗口去重

2019-12-10 Thread Yuan,Youjun
第一种情况,用firstvalue这种聚合函数; 第二种情况,用min聚合函数,然后group by id,是不是就是你要的结果?

-邮件原件-
发件人: Jimmy Wong  
发送时间: Tuesday, December 10, 2019 4:40 PM
收件人: user-zh@flink.apache.org
主题: 窗口去重

Hi,All:
请教一个问题,现在有个实时场景:需要对每 5 分钟内数据进行去重,然后 Sink。
比如:
数据
{ts: 2019-12-10 16:24:00 id: 1}
{ts: 2019-12-10 16:22:00 id: 1}
{ts: 2019-12-10 16:23:00 id: 2}
{ts: 2019-12-10 16:21:00 id: 1}
{ts: 2019-12-10 16:29:00 id: 2}
{ts: 2019-12-10 16:27:00 id: 3}
{ts: 2019-12-10 16:26:00 id: 2}


第一种情景,不考虑时间去重,结果如下:
{ts: 2019-12-10 16:24:00 id: 1}
{ts: 2019-12-10 16:23:00 id: 2}
{ts: 2019-12-10 16:29:00 id: 2}
{ts: 2019-12-10 16:27:00 id: 3}


第二种情景,考虑时间去重,结果如下:
{ts: 2019-12-10 16:21:00 id: 1}
{ts: 2019-12-10 16:23:00 id: 2}
{ts: 2019-12-10 16:26:00 id: 2}
{ts: 2019-12-10 16:27:00 id: 3}


请教下,对于上面两种情景,分别有什么高效实时的解决方案么, 谢谢?我想了一下用 5min 窗口,和 ProcessWindowFunction 可以解决,但是 
ProcessWindowFunction 要缓存 5min 的窗口数据,但是有延迟。




| |
Jimmy Wong
|
|
wangzmk...@163.com
|
签名由网易邮箱大师定制



回复: 如果用flink sql持续查询过去30分钟登录网站的人数?

2019-12-07 Thread Yuan,Youjun
是的,与Flink完全一样的SQL接口,为边缘计算设计的流式计算框架。


-邮件原件-
发件人: 陈帅  
发送时间: Saturday, December 7, 2019 11:36 AM
收件人: user-zh@flink.apache.org
主题: Re: 如果用flink sql持续查询过去30分钟登录网站的人数?

你们这个平台还挺方便快速验证的,是扩展了Flink SQL吗?
虽然没有完全解决我的问题,但还是要谢谢你。

Yuan,Youjun  于2019年12月5日周四 上午10:41写道:

> 可以用30分钟的range over窗口来处理,但是你提到两个0值得输出恐怕做不到,没有数据,没有产出。
> 假设你得输入包含ts和userid两个字段,分别为时间戳和用户id,那么SQL应该这样:
> INSERT INTO mysink
> SELECT
>ts, userid,
>COUNT(userid)
>OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN 
> INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc
>
> 以如下输入为例:
> "2019-12-05 12:02:00,user1",
> "2019-12-05 12:13:00,user1",
> "2019-12-05 12:15:00,user1",
> "2019-12-05 12:31:00,user1",
> "2019-12-05 12:40:00,user1",
> "2019-12-05 12:45:00,user1"
> 产出如下结果:
> {"cnt":1,"ts":157554732,"userid":"user1"}
> {"cnt":2,"ts":157554798,"userid":"user1"}
> {"cnt":3,"ts":157554810,"userid":"user1"}
> {"cnt":4,"ts":157554906,"userid":"user1"}
> {"cnt":4,"ts":157554960,"userid":"user1"}
> {"cnt":4,"ts":157554990,"userid":"user1"}
>
> 为了验证上述SQL,你可以将如下作业粘贴到http://creek.baidubce.com/
> 的作业定义输入框中,点击生成可执行文件,运行下载到的可执行文件,就能看到结果:
> {
> "sources": [{
> "schema": {
> "format": "CSV",
> "fields": [{
> "name": "ts",
> "type": "SQL_TIMESTAMP"
> },
> {
> "name": "userid",
> "type": "STRING"
> }]
> },
> "watermark": 0,
> "name": "mysrc",
> "eventTime": "ts",
> "type": "COLLECTION",
> "attr": {
> "input":[
> "2019-12-05 12:02:00,user1",
> "2019-12-05 12:13:00,user1",
> "2019-12-05 12:15:00,user1",
> "2019-12-05 12:31:00,user1",
> "2019-12-05 12:40:00,user1",
> "2019-12-05 12:45:00,user1"
>   ]
>   }
> }],
> "sink": {
> "schema": {
> "format": "JSON"
> },
> "name": "mysink",
> "type": "STDOUT"
> },
> "name": "demojob",
> "timeType": "EVENTTIME",
> "sql": "INSERT INTO mysink SELECT rowtime, userid, COUNT(userid)  
> OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL '30' 
> MINUTE PRECEDING AND CURRENT ROW) AS cnt FROM mysrc"
> }
>
>
> 当然上面的例子是以事件时间,用处理时间也是可以的。为了验证,你可以把source.type从COLLECTION改成STDIN,把timeT
> ype从EVENTTIME改成PROCESSTIME,重新生成、运行,从命令行下输入数据。
>
> 袁尤军
>
> -邮件原件-
> 发件人: 陈帅 
> 发送时间: Wednesday, December 4, 2019 11:40 PM
> 收件人: user-zh@flink.apache.org
> 主题: 如果用flink sql持续查询过去30分钟登录网站的人数?
>
> 例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
> 那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
> 12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4), 
> 12:41 (5), 12:46 (4), 13:16 (0)
> 即每个元素进来就会设一个30分钟过期时间,窗口状态是维护还当前未过期元素集合。
>
> 如果用sliding 
> window的话,步长需要设置成1秒,那么窗口个数会膨胀很多,而实际上我只需要统计其中一个窗口,多余的窗口浪费了。我也考虑过用
> over window,但是不知道它是否支持处理时间,因为我的场景是需要根据处理时间推移而改变统计值的。我尝试用stream
> api来实现,利用了timerService设置元素过期时间,但我测下来发现元素过期速度赶不上进入元素的速度,导致state大小一直增长.
>
> 所以想问一下:
> 1. 针对这种case有没有标准做法?sql支持吗?
> 2. 要怎么解决timerService的性能问题?timerService底层实现是不是单线程处理priority queue?
>
> 谢谢!
> 陈帅
>


回复: 回复:如何用SQL表达对设备离在线监控

2019-12-04 Thread Yuan,Youjun
谢谢你的回复。
这种方案比较有意思,只是还不能区分设备第一次心跳产生的count=1的消息(上线),和设备最后一次心跳产生的count=1的消息(下线)

-邮件原件-
发件人: 1193216154 <1193216...@qq.com> 
发送时间: Wednesday, December 4, 2019 9:39 PM
收件人: user-zh 
主题: 回复:如何用SQL表达对设备离在线监控

设定一个滑动窗口,窗口大小大于等于2n,滑动间隔大于等于n,若一次窗口结算,count 大于等于2,则在线,否则下线

---原始邮件---
发件人: "Yuan,Youjun"

回复: 如果用flink sql持续查询过去30分钟登录网站的人数?

2019-12-04 Thread Yuan,Youjun
可以用30分钟的range over窗口来处理,但是你提到两个0值得输出恐怕做不到,没有数据,没有产出。
假设你得输入包含ts和userid两个字段,分别为时间戳和用户id,那么SQL应该这样:
INSERT INTO mysink 
SELECT 
   ts, userid,  
   COUNT(userid)  
   OVER (PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL '30' 
MINUTE PRECEDING AND CURRENT ROW) AS cnt 
FROM mysrc

以如下输入为例:
"2019-12-05 12:02:00,user1",
"2019-12-05 12:13:00,user1",
"2019-12-05 12:15:00,user1",
"2019-12-05 12:31:00,user1",
"2019-12-05 12:40:00,user1",
"2019-12-05 12:45:00,user1"
产出如下结果:
{"cnt":1,"ts":157554732,"userid":"user1"}
{"cnt":2,"ts":157554798,"userid":"user1"}
{"cnt":3,"ts":157554810,"userid":"user1"}
{"cnt":4,"ts":157554906,"userid":"user1"}
{"cnt":4,"ts":157554960,"userid":"user1"}
{"cnt":4,"ts":157554990,"userid":"user1"}

为了验证上述SQL,你可以将如下作业粘贴到http://creek.baidubce.com/ 
的作业定义输入框中,点击生成可执行文件,运行下载到的可执行文件,就能看到结果:
{
"sources": [{
"schema": {
"format": "CSV",
"fields": [{
"name": "ts",
"type": "SQL_TIMESTAMP"
},
{
"name": "userid",
"type": "STRING"
}]
},
"watermark": 0,
"name": "mysrc",
"eventTime": "ts",
"type": "COLLECTION",
"attr": {
"input":[
"2019-12-05 12:02:00,user1",
"2019-12-05 12:13:00,user1",
"2019-12-05 12:15:00,user1",
"2019-12-05 12:31:00,user1",
"2019-12-05 12:40:00,user1",
"2019-12-05 12:45:00,user1"
  ]
  }
}],
"sink": {
"schema": {
"format": "JSON"
},
"name": "mysink",
"type": "STDOUT"
},
"name": "demojob",
"timeType": "EVENTTIME",
"sql": "INSERT INTO mysink SELECT rowtime, userid, COUNT(userid)  OVER 
(PARTITION BY userid ORDER BY rowtime RANGE BETWEEN INTERVAL '30' MINUTE 
PRECEDING AND CURRENT ROW) AS cnt FROM mysrc"
}

当然上面的例子是以事件时间,用处理时间也是可以的。为了验证,你可以把source.type从COLLECTION改成STDIN,把timeType从EVENTTIME改成PROCESSTIME,重新生成、运行,从命令行下输入数据。

袁尤军

-邮件原件-
发件人: 陈帅  
发送时间: Wednesday, December 4, 2019 11:40 PM
收件人: user-zh@flink.apache.org
主题: 如果用flink sql持续查询过去30分钟登录网站的人数?

例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无
那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4), 12:41 (5), 
12:46 (4), 13:16 (0)
即每个元素进来就会设一个30分钟过期时间,窗口状态是维护还当前未过期元素集合。

如果用sliding window的话,步长需要设置成1秒,那么窗口个数会膨胀很多,而实际上我只需要统计其中一个窗口,多余的窗口浪费了。我也考虑过用
over window,但是不知道它是否支持处理时间,因为我的场景是需要根据处理时间推移而改变统计值的。我尝试用stream
api来实现,利用了timerService设置元素过期时间,但我测下来发现元素过期速度赶不上进入元素的速度,导致state大小一直增长.

所以想问一下:
1. 针对这种case有没有标准做法?sql支持吗?
2. 要怎么解决timerService的性能问题?timerService底层实现是不是单线程处理priority queue?

谢谢!
陈帅


回复: flink sql 状态表嵌套窗口查询

2019-11-14 Thread Yuan,Youjun
内层query不是按时间窗查询的话,这恐怕是不支持。
如果内层query是按时间窗的,比如固定窗口,那还可以用tumble_rowtime来传递rowtime属性。

-邮件原件-
发件人: 金圣哲  
发送时间: Friday, November 15, 2019 1:21 PM
收件人: user-zh@flink.apache.org
主题: flink sql 状态表嵌套窗口查询

各位老板好:

"selectrider_id, waybill_status as waybill_status ,count(id) as 
waybill_status_count,   TUMBLE_START(ctime, INTERVAL '1' DAY) as wStart from  
(select   id, min(ctime)  as rowtime ,latest(status, utime) as waybill_status, 
latest(rider_id, utime) as rider_id from  user group by id) group by 
TUMBLE(ctime, INTERVAL '1' DAY),waybill_status,rider_id"

问一下 想用状态表作为子表  外面嵌套窗口 如何实现呢 因为状态表group by之后 ctime会失去时间属性  哪位老板知道


回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

2019-11-14 Thread Yuan,Youjun
SQL没有表达这种“最早一分钟”的逻辑。
如果在你的消息的开头,插入一个temperature=0的消息,那么你得到的第一个输出diff_temperature=0,不知道这种方式是否可以接受。

发件人: Chennet Steven 
发送时间: Thursday, November 14, 2019 5:32 PM
收件人: user-zh@flink.apache.org; Yuan,Youjun 
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

Yuan, 非常感谢大佬的回复和方案,我代码尝试了,这个方案的确可行,但在计算最早一分钟 
diff_temperature时候,由于没有更早的分钟数据,这个diff_temperature会被计算成第一分钟的t,是否能有方法将他设置为null?

运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":5}  这分钟的1.3 
是否能有方法设置为null?
{"deviceid":"dev1","diff_temperature":0.3,"ts":11}
{"deviceid":"dev1","diff_temperature":0.1,"ts":17}


From stevenchen
 webchat 38798579


发件人: Yuan,Youjun mailto:yuanyou...@baidu.com>>
发送时间: Wednesday, November 13, 2019 11:34:53 PM
收件人: user-zh@flink.apache.org<mailto:user-zh@flink.apache.org> 
mailto:user-zh@flink.apache.org>>
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

这个场景应可以通过标准的SQL完成计算。大致思路如下:
1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口
2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS 
sum_temperature
3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。

假设输入消息有三个字段:
Ts: 时间戳
Deviceid:设备编号
Temp: 设备温度

完整的SQL如下:
INSERT INTO mysink
SELECT ts, deviceid,  2 * max_temperature - sum_temperature AS diff_temperature
FROM (
SELECT  deviceid, ts, max_temperature, SUM(max_temperature) OVER 
(PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS 
sum_temperature
FROM (
SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, 
deviceid, max(temp) AS max_temperature  from mysrc group by TUMBLE(rowtime, 
INTERVAL '60' SECOND), deviceid
)
)

我用如下测试数据:
"2,dev1,1.2",
"5,dev1,1.3",
"6,dev1,1.4",
"10,dev1,1.5",
"11,dev1,1.6",
"12,dev1,1.7"
运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":5}
{"deviceid":"dev1","diff_temperature":0.3,"ts":11}
{"deviceid":"dev1","diff_temperature":0.1,"ts":17}

如果你向完整的验证我的方法,你可以:
1,登陆 http://creek.baidubce.com/
2,在作业订阅输入框,输入邮件末尾的作业定义(json)
3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定
耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。

作业定义(json):
{
"注释":{
"说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row 
over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source 
type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。",
"输入示例": "1000,dev1,2.3",
"输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":5}
},
"sources": [{
"schema": {
"format": "CSV",
"fields": [{
"name": "ts",
"type": "SQL_TIMESTAMP"
},
{
"name": "deviceid",
"type": "STRING"
},
{
"name": "temp",
"type": "DOUBLE"
}]
},
"watermark": 0,
"name": "mysrc",
"eventTime": "ts",
"type": "COLLECTION",
"attr": {
"input": [
"1,dev1,1.1",
"2,dev1,1.2",
"5,dev1,1.3",
"6,dev1,1.4",
"10,dev1,1.5",
"11,dev1,1.6",
"12,dev1,1.7"
]
}
}],
"sink": {
"schema": {
"format": "JSON"
},
"name": "mysink",
"type": "STDOUT"
},
"name": "demojob",
"timeType": "EVENTTIME",
"sql": "INSERT INTO mysink SELECT ts, deviceid,  2 * max_temperature - 
sum_temperature AS diff_temperature FROM ( SELECT  deviceid, ts, 
max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts 
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT 
TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS 
max_temperature  from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), 
deviceid)) "
}



-邮件原件-
发件人: Chenn

回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

2019-11-13 Thread Yuan,Youjun
这个场景应可以通过标准的SQL完成计算。大致思路如下:
1,内层查询统计每个设备一分钟的最大温度,max(temp) as max_temperature + tumble窗口
2,外层通过row over窗口,拿到当前分钟的max_temperature,和前后2分钟最大温度的和,即SUM(max_temperature) AS 
sum_temperature
3,最外层,就直接select 2 * max_temperature - sum_temperature就是你需要的前后2个分钟最大温度的差了。

假设输入消息有三个字段:
Ts: 时间戳
Deviceid:设备编号
Temp: 设备温度

完整的SQL如下:
INSERT INTO mysink 
SELECT ts, deviceid,  2 * max_temperature - sum_temperature AS diff_temperature 
FROM ( 
SELECT  deviceid, ts, max_temperature, SUM(max_temperature) OVER 
(PARTITION BY deviceid ORDER BY ts ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS 
sum_temperature 
FROM (
SELECT TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, 
deviceid, max(temp) AS max_temperature  from mysrc group by TUMBLE(rowtime, 
INTERVAL '60' SECOND), deviceid
)
)

我用如下测试数据:
"2,dev1,1.2",
"5,dev1,1.3",
"6,dev1,1.4",
"10,dev1,1.5",
"11,dev1,1.6",
"12,dev1,1.7"
运行得到如下结果:
{"deviceid":"dev1","diff_temperature":1.3,"ts":5}
{"deviceid":"dev1","diff_temperature":0.3,"ts":11}
{"deviceid":"dev1","diff_temperature":0.1,"ts":17}

如果你向完整的验证我的方法,你可以:
1,登陆 http://creek.baidubce.com/
2,在作业订阅输入框,输入邮件末尾的作业定义(json)
3,点击生成可执行文件,在弹出的对话框中,选择你的电脑的OS和CPU ARCH,并且点击确定
耐心等待几秒中,系统会生成完整的可执行文件,你直接执行它,便可从控制台看到计算结果。如果你需要验证更多的数据,请修改source的Type=STDIN,这样你可以从命令行下输入你的数据了。

作业定义(json):
{
"注释":{
"说明": "内层query计算每个设备每分钟的最大温度,max+tumble窗口;外层query计算同一设备前后2分钟最大温度差, Row 
over窗口;计算温度差的计算方法为:当前窗口最大温度 x 2 - 前后2个窗口最大温度的和。本例采用预先配置的输入数据,即source 
type=COLLECTION,如果需要尝试更多的输入,可以将type改为STDIN,即从标准输入读入数据。",
"输入示例": "1000,dev1,2.3",
"输出示例": {"deviceid":"dev1","diff_temperature":1.3,"ts":5}
},
"sources": [{
"schema": {
"format": "CSV",
"fields": [{
"name": "ts",
"type": "SQL_TIMESTAMP"
},
{
"name": "deviceid",
"type": "STRING"
},
{
"name": "temp",
"type": "DOUBLE"
}]
},
"watermark": 0,
"name": "mysrc",
"eventTime": "ts",
"type": "COLLECTION",
"attr": {
"input": [
"1,dev1,1.1",
"2,dev1,1.2",
"5,dev1,1.3",
"6,dev1,1.4",
"10,dev1,1.5",
"11,dev1,1.6",
"12,dev1,1.7"
]
}
}],
"sink": {
"schema": {
"format": "JSON"
},
"name": "mysink",
"type": "STDOUT"
},
"name": "demojob",
"timeType": "EVENTTIME",
"sql": "INSERT INTO mysink SELECT ts, deviceid,  2 * max_temperature - 
sum_temperature AS diff_temperature FROM ( SELECT  deviceid, ts, 
max_temperature, SUM(max_temperature) OVER (PARTITION BY deviceid ORDER BY ts 
ROWS BETWEEN 1 PRECEDING AND CURRENT ROW) AS sum_temperature FROM (SELECT 
TUMBLE_ROWTIME(rowtime, INTERVAL '60' SECOND) as ts, deviceid, max(temp) AS 
max_temperature  from mysrc group by TUMBLE(rowtime, INTERVAL '60' SECOND), 
deviceid)) "
}



-邮件原件-
发件人: Chennet Steven  
发送时间: Wednesday, November 13, 2019 3:36 PM
收件人: user-zh@flink.apache.org
主题: 回复: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

场景是一分钟为窗口计算每分钟传感器的最高温度,同时计算当前分钟与上一分钟最高温度的差值,使用flink-sql。想写一个Table的自定义UDAF,在UDAF中使用State存储上一分钟的最高温度,但是发现UDAF的open函数的FunctionContext中对于RuntimeContext是一个private,无法使用。同时DataView是
 UDAF中ACC的属性,而ACC每个窗口new一个新的,无法将上一个窗口的结果通过ACC/DataView保留到下一个窗口,大佬,我所理解的对么?
请教大佬计算两个窗口之间的聚合值得差值这种场景在FlinkSql中实现的方案是啥?

From stevenchen
 webchat 38798579

发件人: Dian Fu
发送时间: Thursday, November 7, 2019 19:41
收件人: user-zh@flink.apache.org
主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态

可以参考一下Flink代码里已有的例子:https://github.com/apache/flink/blob/c601cfd662c2839f8ebc81b80879ecce55a8cbaf/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/MaxWithRetractAggFunction.java
 

> 在 2019年11月7日,下午7:06,Chennet Steven  写道:
>
> 在flink1.9的flink-table-common中找到DataView这个接口和子类ListView,MapView,但是没有看懂如
> 何在自定义函数中使用,
> 能否给个example或者是test代码的链接啊?
>
> From stevenchen
> webchat 38798579
>
> 
> 发件人: wenlong.lwl 
> 发送时间: Thursday, November 7, 2019 2:13:43 PM
> 收件人: user-zh@flink.apache.org 
> 主题: Re: Flink1.7.2,TableApi的自定义聚合函数中如何使用自定义状态
>
> 可以试试1.9,引入了DataView的机制,可以在Acc中使用state了。
>
> On Thu, 7 Nov 2019 at 09:22, Chennet Steven  wrote:
>
>> 尝试在Flink的自定义聚合函数中使用State,发现open函数中通过FunctionContext无法获取到RuntimeContex
>> t
>> 如何在聚合函数中使用State?
>>
>>
>> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo,
>> 

回复: Flink DataStream KeyedStream 与 AggregateFunction

2019-11-09 Thread Yuan,Youjun
1, 是
2,没有标准答案,是否可以本地先聚合?
3,AggFunc是指定做何种聚合,是sum, 还是avg, 还是count。不指定的话,Flink哪里指导你要计算啥?

-邮件原件-
发件人: 王佩  
发送时间: Saturday, November 9, 2019 11:45 AM
收件人: user-zh 
主题: Flink DataStream KeyedStream 与 AggregateFunction

请教下:

1、DataStream 如按用户ID KeyBy后,同一个用户ID的数据最终会被分到一个Partition中吗?

2、假设1成立,这样就会有数据倾斜的问题。该如何解决?

3、假设1成立,如: DataStream
   .keyBy(userID)
   .timeWindow()
   .aggregate(new
AggregateFunction(...)),这里的AggregateFunction
为啥还需要merge呢。因为同一个Key的数据只会在同一个Partition中被计算,觉得不需要merge啊。

这三个问题有点疑惑,大神们帮忙看下!
感谢!


回复: Flink 周期性创建watermark,200ms的周期是怎么控制的

2019-09-03 Thread Yuan,Youjun
源码参考:PeriodicWatermarkEmitter


-邮件原件-
发件人: Dino Zhang  
发送时间: Tuesday, September 3, 2019 3:14 PM
收件人: user-zh@flink.apache.org
主题: Re: Flink 周期性创建watermark,200ms的周期是怎么控制的

hi venn,


基于EventTIme的Watermark间隔默认200ms,可以通过ExecutionConfig的setAutoWatermarkInterval方法进行设置,见StreamExecutionEnvironment:

  /**
   * Sets the time characteristic for all streams create from this environment, 
e.g., processing
   * time, event time, or ingestion time.
   *
   * If you set the characteristic to IngestionTime of EventTime this will 
set a default
   * watermark update interval of 200 ms. If this is not applicable for your 
application
   * you should change it using {@link
ExecutionConfig#setAutoWatermarkInterval(long)}.
   *
   * @param characteristic The time characteristic.
   */
  @PublicEvolving
  public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
  getConfig().setAutoWatermarkInterval(0);
} else {
  getConfig().setAutoWatermarkInterval(200);
}
  }



On Tue, Sep 3, 2019 at 2:39 PM venn  wrote:

> 各位大佬, 今天看flink 指派Timestamp 和watermark 的源码,发现周期性创建
> watermark 确实是周期性的,从打印到控制台的时间可以看到差不多是200毫秒执行一
> 次, 200毫秒是在哪里控制的,在debug 的调用栈找不到(源码位置)?
>
>
>
> 周期新创建watermark  方法如下:
>
> .assignAscendingTimestamps(element =>
> sdf.parse(element.createTime).getTime)
>
> .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[Event](Time.milliseconds(50))
>
>
>
>
>
> 生成Timestamp的方法:
>
> TimestampsAndPeriodicWatermarksOperator 类的 :
>
>
>
>
> @Override
> public void processElement(StreamRecord element) throws Exception {
>final long newTimestamp =
> userFunction.extractTimestamp(element.getValue(),
>  element.hasTimestamp() ? element.getTimestamp() : 
> Long.MIN_VALUE);
>
>output.collect(element.replace(element.getValue(), newTimestamp)); 
> }
>
>
>
>
>
> 生成watermark的方法:
>
> TimestampsAndPeriodicWatermarksOperator 类的 :
>
>
> @Override
> public void onProcessingTime(long timestamp) throws Exception {
>// 从这里可以看到,每200ms 打印一次
>System.out.println("timestamp : " + timestamp + ", system.current : 
> " + System.currentTimeMillis());
>// register next timer
>Watermark newWatermark = userFunction.getCurrentWatermark();
>if (newWatermark != null && newWatermark.getTimestamp() >
> currentWatermark) {
>   currentWatermark = newWatermark.getTimestamp();
>   // emit watermark
>   output.emitWatermark(newWatermark);
>}
>
>long now = getProcessingTimeService().getCurrentProcessingTime();
>getProcessingTimeService().registerTimer(now + watermarkInterval, 
> this); }
>
>
>
>
>
>
>
> 感谢各位大佬
>
>


回复: [Discuss] What should the "Data Source" be translated into Chinese

2019-08-18 Thread Yuan,Youjun
Sink -> “数据目的地”,我们的团队基本上都这么称呼。仅供参考

-邮件原件-
发件人: zhisheng  
发送时间: Saturday, August 17, 2019 3:49 PM
收件人: user-zh 
主题: Re: [Discuss] What should the "Data Source" be translated into Chinese

hi,
总感觉还是有点别扭,建议如果没找到合适的中文词语来翻译,那么还是用 Data Sink 来表示可能会更友好些

Kurt Young  于2019年8月13日周二 下午4:16写道:

> cc user-zh mailing list, since there are lots of chinese speaking people.
> Best,
> Kurt
>
>
> On Tue, Aug 13, 2019 at 4:02 PM WangHengwei  wrote:
>
> > Hi all,
> >
> >
> > I'm working on [FLINK-13405] Translate "Basic API Concepts" page 
> > into Chinese. I have a problem.
> >
> > Usually we translate "Data Source" into "数据源" but there is no 
> > agreed translation for "Data Sink". Since it often appears in 
> > documents, I think we'd better to have a unified translation. I have some 
> > alternatives, e.g.
> > "数据沉淀","数据归" or "数据终".
> >
> >  Committer Xingcan Cui has a good suggestion for "数据汇" which 
> > corresponds to source ("数据源"). I asked Committer Jark Wu, he is also 
> > fine with it. I think "数据汇" is a good representation of flow 
> > charactiristics
> so
> > I would like to use it.
> >
> >
> > I want to hear more thoughts from the community whether we 
> > should translate it and what it should be translated into.
> >
> >
> > Thanks,
> > WangHW
>


答复: 回复: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题

2019-08-11 Thread Yuan,Youjun
并不是没条消息会触发watermark,而是有一定时间间隔的,默认是200ms触发一次watermark。
当你的数据来的比较集中的时候,经常会发生最新的消息的时间戳已经过了window end,但是window还没fire的情况。


-邮件原件-
发件人: Ever <439674...@qq.com> 
发送时间: Sunday, July 14, 2019 5:00 PM
收件人: user-zh 
主题: 回复: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题

第四条数据来的时间戳是: 03:17:55,  水印时间这时候应该是03:17:50,  
不管是大窗口的关闭时间(第一条数据(03:15:48)的大窗口关闭时间:03:16:50)还是小的滑动窗口关闭时间, 都已经过了, 都应该关闭了啊




-- 原始邮件 --
发件人: "Hequn Cheng";
发送时间: 2019年7月14日(星期天) 中午11:55
收件人: "user-zh";

主题: Re: flink 1.8.1 时间窗口无法关闭以及消息丢失的问题



Hi,

应该是watermark没有达到window的end时间,导致window没有fire。watermark的相关内容可以看这里[1]。其次,你也可以通过job的运行页面[2]查看job当前watermark的值。

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#watermarks-in-parallel-streams
[2]
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/debugging_event_time.html#monitoring-current-event-time

On Fri, Jul 12, 2019 at 4:05 PM Ever <439674...@qq.com> wrote:

> 有一个基于事件时间的流处理程序,每10秒统计一次过去一分钟的数据。
> 数据每隔10秒会过来一批。
> 代码如下图:
> ```
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> env.getConfig.setAutoWatermarkInterval(watermarkGenInterval)
>
>
> env.setParallelism(parallel)
>
>
> env.addSource(source)
>   .map(json => {
>   new InvokingInfoWrapper(xxx)
> })
>   .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor[InvokingInfoWrapper](Time.seco
> nds(5))
> {
> override def extractTimestamp(invoking: InvokingInfoWrapper): 
> Long = {
>   invoking.timestamp
> }
>   })
>   .keyBy(invokingInfo => {
> s"${invokingInfo.caller}_${invokingInfo.callee}"
>   })
>   .timeWindow(Time.seconds(60), Time.seconds(10))
>   .reduce(innerReducer).map(invokingInfo => { // ##2map
>   //some mapping code
>   invokingInfo
>   })
>   .addSink(new
> WebSocketSink[InvokingInfoWrapper](wsHost)).name("Pangolin-websocket-s
> ink")
>
> ```
>
>
>
>
> 由于是在预发布环境上线, 流量不大,我观察到一个现场如下:
> 1. 第一条数据的时间戳为03:15:48
> 2. 第二条数据的时间戳为03:15:59, 触发reduce操作(5次,说明有5个滑动窗口)
> 3. 第三条数据的时间戳为03:16:06,   触发reduce操作(同样5次)
> 4. 第四条数据的时间戳为03:17:55,
>  这时候应该触发前三条数据所在的窗口的关闭(5个滑动窗口起码要关几个),进入到上述##2map这个步骤, 然而并没有。
> 5. 第五条数据的时间戳为03:18:01,  这时候触发了跟第四条数据的reduce操作。
>
>
> 感觉前三条数据给吞了。
>
>
> 为什么呢?


答复: Conversion to relational algebra failed to preserve datatypes

2018-09-14 Thread Yuan,Youjun
Hi Timo,

I believe it has something to do with the number of field in the source. I can 
reproduce the issue with very simple SQL, like:
INSERT INTO IVC_ALARM_PUBLIC_OUT SELECT rowtime as ts FROM IVC_ALARM_PUBLIC_IN
Where rowtime actually represent proctime (I registered it as rowtime.proctime).

If I remove a few fields in the source, then everything goes fine.

Thanks
Youjun

发件人: Timo Walther 
发送时间: Friday, September 14, 2018 9:17 PM
收件人: user@flink.apache.org
主题: Re: Conversion to relational algebra failed to preserve datatypes

Hi,

could you maybe post the query that caused the exception? I guess the exception 
is related to a time attribute [1] for the optimizer time attributes and 
timestamps make no difference however they have a slightly different data type 
that might have caused the error. I think is a bug that should be fixed, once 
we have more context.

Regards,
Timo


[1] 
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/streaming.html#time-attributes


Am 14.09.18 um 10:49 schrieb Yuan,Youjun:
Hi,

I am getting the following error while submitting job to a cluster, which seems 
failed to compare 2 RelDateTypes, though they seems identical (from the error 
message), and everything is OK if I run it locally.
I guess calcite failed to compare the first field named ts, of type 
TIMESTAMP(3), because:

  *   If I don’t select ts, then everything goes fine
  *   If I cast ts to other type, like SELECT cast(ts AS TIMESTAMP), then 
everything is fine
  *   If I switch to EventTime, the issue also goes away. Currently it’s 
ProcessTime

I am using Flink 1.4, and submitting job to a standalone cluster.

Below are the error:

Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
program caused an error:
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
at 
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:76)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
 ... 9 more
Caused by: java.lang.AssertionError: Conversion to relational algebra failed to 
preserve datatypes:
validated type:
RecordType(TIMESTAMP(3) NOT NULL ts, VARCHAR(65536) CHARACTER SET "UTF-16LE" 
COLLATE "ISO-8859-1$en_US$primary" userId, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" batchId, VARCHAR(65536) CHARACTER 
SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" projectId, VARCHAR(65536) 
CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" vehicleId, CHAR(5) 
CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NOT NULL field0, 
VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" 
rule0, DOUBLE threshold0, DOUBLE field_value0, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" alarmId) NOT NULL
converted type:
RecordType(TIMESTAMP(3) NOT NULL ts, VARCHAR(65536) CHARACTER SET "UTF-16LE" 
COLLATE "ISO-8859-1$en_US$primary" userId, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" batchId, VARCHAR(65536) CHARACTER 
SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" projectId, VARCHAR(65536) 
CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" vehicleId, CHAR(5) 
CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NOT NULL field0, 
VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" 
rule0, DOUBLE threshold0, DOUBLE field_value0, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" alarmId) NOT NULL
...
at 
org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:451)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:567)
at 
org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:106)


thanks in advance,
youjun




Conversion to relational algebra failed to preserve datatypes

2018-09-14 Thread Yuan,Youjun
Hi,

I am getting the following error while submitting job to a cluster, which seems 
failed to compare 2 RelDateTypes, though they seems identical (from the error 
message), and everything is OK if I run it locally.
I guess calcite failed to compare the first field named ts, of type 
TIMESTAMP(3), because:

  *   If I don't select ts, then everything goes fine
  *   If I cast ts to other type, like SELECT cast(ts AS TIMESTAMP), then 
everything is fine
  *   If I switch to EventTime, the issue also goes away. Currently it's 
ProcessTime

I am using Flink 1.4, and submitting job to a standalone cluster.

Below are the error:

Caused by: org.apache.flink.client.program.ProgramInvocationException: The 
program caused an error:
at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
at 
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:76)
at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
 ... 9 more
Caused by: java.lang.AssertionError: Conversion to relational algebra failed to 
preserve datatypes:
validated type:
RecordType(TIMESTAMP(3) NOT NULL ts, VARCHAR(65536) CHARACTER SET "UTF-16LE" 
COLLATE "ISO-8859-1$en_US$primary" userId, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" batchId, VARCHAR(65536) CHARACTER 
SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" projectId, VARCHAR(65536) 
CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" vehicleId, CHAR(5) 
CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NOT NULL field0, 
VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" 
rule0, DOUBLE threshold0, DOUBLE field_value0, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" alarmId) NOT NULL
converted type:
RecordType(TIMESTAMP(3) NOT NULL ts, VARCHAR(65536) CHARACTER SET "UTF-16LE" 
COLLATE "ISO-8859-1$en_US$primary" userId, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" batchId, VARCHAR(65536) CHARACTER 
SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" projectId, VARCHAR(65536) 
CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" vehicleId, CHAR(5) 
CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" NOT NULL field0, 
VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" 
rule0, DOUBLE threshold0, DOUBLE field_value0, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" alarmId) NOT NULL
...
at 
org.apache.calcite.sql2rel.SqlToRelConverter.checkConvertedType(SqlToRelConverter.java:451)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:567)
at 
org.apache.flink.table.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:106)


thanks in advance,
youjun


答复: jobmanager holds too many CLOSE_WAIT connection to datanode

2018-08-24 Thread Yuan,Youjun
One more safer approach is to execute cancel with savepoint on all jobs first
>> this sounds great!

Thanks
Youjun

发件人: vino yang 
发送时间: Friday, August 24, 2018 2:43 PM
收件人: Yuan,Youjun ; user 
主题: Re: jobmanager holds too many CLOSE_WAIT connection to datanode

Hi Youjun,

You can see if there is any real data transfer between these connections.
I guess there may be some connection leaks here, and if so, it's a bug.
On the other hand, the 1.4 version is a bit old, can you compare the 1.5 or 1.6 
whether the same problem exists?
I suggest you create an issue on JIRA and maybe get more feedback.

Questions about how to force these connections to be closed.
If you have configured HA mode and the checkpoints are enabled for the job, you 
can try to show off the JM leader, then let ZK conduct the leader election and 
JM to switch.

But please be cautious about this process. One more safer approach is to 
execute cancel with savepoint on all jobs first. Then switch JM.

Thanks, vino.

Yuan,Youjun mailto:yuanyou...@baidu.com>> 于2018年8月24日周五 
下午1:06写道:
Hi vino,

My jobs are running for months now, on a standalone cluster, using flink 1.4.0.
The connections were accumulated over time, not in a short period of time. 
There is no timeout error in Jobmanager log.

So there are two questions:
1, how to force close those connections, ideally without restarting the running 
jobs.
2, in the future, how to avoid jobmanager holing so many, apparently not 
necessary, TCP connections?

Thanks
Youjun

发件人: vino yang mailto:yanghua1...@gmail.com>>
发送时间: Friday, August 24, 2018 10:26 AM
收件人: Yuan,Youjun mailto:yuanyou...@baidu.com>>
抄送: user mailto:user@flink.apache.org>>
主题: Re: jobmanager holds too many CLOSE_WAIT connection to datanode

Hi Youjun,

How long has your job been running for a long time?
As far as I know, if in a short time, for checkpoint, jobmanager will not 
generate so many connections to HDFS.
What is your Flink cluster environment? Standalone or Flink on YARN?
In addition, does JM's log show any timeout information? Has Checkpoint timed 
out?
If you can provide more information, it will help locate the problem.

Thanks, vino.

Yuan,Youjun mailto:yuanyou...@baidu.com>> 于2018年8月23日周四 
下午10:53写道:
Hi,

After running for a while , my job manager holds thousands of CLOSE_WAIT TCP 
connection to HDFS datanode, the number is growing up slowly, and it’s likely 
will hit the max open file limit. My jobs checkpoint to HDFS every minute.
If I run lsof -i -a -p $JMPID, I can get a tons of following output:
java9433  iot  408u  IPv4 4060901898  0t0  TCP 
jmHost:17922->datanode:50010 (CLOSE_WAIT)
java9433  iot  409u  IPv4 4061478455  0t0  TCP 
jmHost:52854->datanode:50010 (CLOSE_WAIT)
java9433  iot  410r  IPv4 4063170767  0t0  TCP 
jmHost:49384->datanode:50010 (CLOSE_WAIT)
java9433  iot  411w  IPv4 4063188376  0t0  TCP 
jmHost:50516->datanode:50010 (CLOSE_WAIT)
java9433  iot  412u  IPv4 4061459881  0t0  TCP 
jmHost:51651->datanode:50010 (CLOSE_WAIT)
java9433  iot  413u  IPv4 4063737603  0t0  TCP 
jmHost:31318->datanode:50010 (CLOSE_WAIT)
java9433  iot  414w  IPv4 4062030625  0t0  TCP 
jmHost:34033->datanode:50010 (CLOSE_WAIT)
java9433  iot  415u  IPv4 4062049134  0t0  TCP 
jmHost:35156->datanode:50010 (CLOSE_WAIT)
java9433  iot  416u  IPv4 4062615550  0t0  TCP 
jmHost:16962->datanode:50010 (CLOSE_WAIT)
java9433  iot  417r  IPv4 4063757056  0t0  TCP 
jmHost:32553->datanode:50010 (CLOSE_WAIT)
java9433  iot  418w  IPv4 4064304789  0t0  TCP 
jmHost:13375->datanode:50010 (CLOSE_WAIT)
java9433  iot  419u  IPv4 4062599328  0t0  TCP 
jmHost:15915->datanode:50010 (CLOSE_WAIT)
java9433  iot  420w  IPv4 4065462963  0t0  TCP 
jmHost:30432->datanode:50010 (CLOSE_WAIT)
java9433  iot  421u  IPv4 4067178257  0t0  TCP 
jmHost:28334->datanode:50010 (CLOSE_WAIT)
java9433  iot  422u  IPv4 4066022066  0t0  TCP 
jmHost:11843->datanode:50010 (CLOSE_WAIT)


I know restarting the job manager should cleanup those connections, but I 
wonder if there is any better solution?
Btw, I am using flink 1.4.0, and running a standalone cluster.

Thanks
Youjun


jobmanager holds too many CLOSE_WAIT connection to datanode

2018-08-23 Thread Yuan,Youjun
Hi,

After running for a while , my job manager holds thousands of CLOSE_WAIT TCP 
connection to HDFS datanode, the number is growing up slowly, and it's likely 
will hit the max open file limit. My jobs checkpoint to HDFS every minute.
If I run lsof -i -a -p $JMPID, I can get a tons of following output:
java9433  iot  408u  IPv4 4060901898  0t0  TCP 
jmHost:17922->datanode:50010 (CLOSE_WAIT)
java9433  iot  409u  IPv4 4061478455  0t0  TCP 
jmHost:52854->datanode:50010 (CLOSE_WAIT)
java9433  iot  410r  IPv4 4063170767  0t0  TCP 
jmHost:49384->datanode:50010 (CLOSE_WAIT)
java9433  iot  411w  IPv4 4063188376  0t0  TCP 
jmHost:50516->datanode:50010 (CLOSE_WAIT)
java9433  iot  412u  IPv4 4061459881  0t0  TCP 
jmHost:51651->datanode:50010 (CLOSE_WAIT)
java9433  iot  413u  IPv4 4063737603  0t0  TCP 
jmHost:31318->datanode:50010 (CLOSE_WAIT)
java9433  iot  414w  IPv4 4062030625  0t0  TCP 
jmHost:34033->datanode:50010 (CLOSE_WAIT)
java9433  iot  415u  IPv4 4062049134  0t0  TCP 
jmHost:35156->datanode:50010 (CLOSE_WAIT)
java9433  iot  416u  IPv4 4062615550  0t0  TCP 
jmHost:16962->datanode:50010 (CLOSE_WAIT)
java9433  iot  417r  IPv4 4063757056  0t0  TCP 
jmHost:32553->datanode:50010 (CLOSE_WAIT)
java9433  iot  418w  IPv4 4064304789  0t0  TCP 
jmHost:13375->datanode:50010 (CLOSE_WAIT)
java9433  iot  419u  IPv4 4062599328  0t0  TCP 
jmHost:15915->datanode:50010 (CLOSE_WAIT)
java9433  iot  420w  IPv4 4065462963  0t0  TCP 
jmHost:30432->datanode:50010 (CLOSE_WAIT)
java9433  iot  421u  IPv4 4067178257  0t0  TCP 
jmHost:28334->datanode:50010 (CLOSE_WAIT)
java9433  iot  422u  IPv4 4066022066  0t0  TCP 
jmHost:11843->datanode:50010 (CLOSE_WAIT)


I know restarting the job manager should cleanup those connections, but I 
wonder if there is any better solution?
Btw, I am using flink 1.4.0, and running a standalone cluster.

Thanks
Youjun


答复: Best way to find the current alive jobmanager with HA mode zookeeper

2018-07-25 Thread Yuan,Youjun
Thanks for the information. Forgot to mention, I am using Flink 1.4, the 
RestClusterClient seems don’t have the ability to retrieve the leader address. 
I did notice there is webMonitorRetrievalService member in Flink 1.5.

I wonder if I can use RestClusterClient@v1.5<mailto:RestClusterClient@v1.5> on 
my client side, to retrieve the leader JM of Flink v1.4 Cluster.

Thanks
Youjun

发件人: vino yang 
发送时间: Wednesday, July 25, 2018 7:11 PM
收件人: Martin Eden 
抄送: Yuan,Youjun ; user@flink.apache.org
主题: Re: Best way to find the current alive jobmanager with HA mode zookeeper

Hi Martin,


For a standalone cluster which exists multiple JM instances, If you do not use 
Rest API, but use Flink provided Cluster client. The client can perceive which 
one this the JM leader from multiple JM instances.

For example, you can use CLI to submit flink job in a non-Leader node.

But I did not verify this case for Flink on Mesos.

Thanks, vino.

2018-07-25 17:22 GMT+08:00 Martin Eden 
mailto:martineden...@gmail.com>>:
Hi,

This is actually very relevant to us as well.

We want to deploy Flink 1.3.2 on a 3 node DCOS cluster. In the case of 
Mesos/DCOS, Flink HA runs only one JobManager which gets restarted on another 
node by Marathon in case of failure and re-load it's state from Zookeeper.

Yuan I am guessing you are using Flink in standalone mode and there it is 
actually running 3 instances of the Job Manager, 1 active and 2 stand-bys.

Either way, in both cases there is the need to "discover" the hostname and port 
of the Job Manager at runtime. This is needed when you want to use the cli to 
submit jobs for instance. Is there an elegant mode to submit jobs other than 
say just trying out all the possible nodes in your cluster?

Grateful if anyone could clarify any of the above, thanks,
M

On Wed, Jul 25, 2018 at 11:37 AM, Yuan,Youjun 
mailto:yuanyou...@baidu.com>> wrote:
Hi all,

I have a standalone cluster with 3 jobmanagers, and set high-availability to 
zookeeper. Our client submits job by REST API(POST /jars/:jarid/run), which 
means we need to know the host of the any of the current alive jobmanagers. The 
problem is that, how can we know which job manager is alive, or the host of 
current leader?  We don’t want to access a dead JM.

Thanks.
Youjun Yuan




Best way to find the current alive jobmanager with HA mode zookeeper

2018-07-24 Thread Yuan,Youjun
Hi all,

I have a standalone cluster with 3 jobmanagers, and set high-availability to 
zookeeper. Our client submits job by REST API(POST /jars/:jarid/run), which 
means we need to know the host of the any of the current alive jobmanagers. The 
problem is that, how can we know which job manager is alive, or the host of 
current leader?  We don't want to access a dead JM.

Thanks.
Youjun Yuan


答复: 答复: 答复: TumblingProcessingTimeWindow emits extra results for a same window

2018-07-15 Thread Yuan,Youjun
Hi Hequn,

To my understand, a processing time window is fired at the last millisecond of 
the window(maxTimestamp). Then what will happen if more elements arrive at the 
last millisecond, but AFTER the window is fired?

Thanks,
Youjun
发件人: Hequn Cheng 
发送时间: Friday, July 13, 2018 9:44 PM
收件人: Yuan,Youjun 
抄送: Timo Walther ; user@flink.apache.org
主题: Re: 答复: 答复: TumblingProcessingTimeWindow emits extra results for a same 
window

Hi Youjun,

The rowtime value in udf:EXTRACT(EPOCH FROM rowtime) is different from the 
rowtime value of window. Sql will be parsed and translated into some nodes, 
Source -> Calc -> Window -> Sink. The Calc is the input node of Window and the 
udf is part of Calc instead of Window. So the max_ts and min_ts is actually the 
time before entering the window, i.e, not the time in window.

However, I still can't find anything valuable to solve the problem. It seems 
the window has been triggered many times for the same key. I will think more 
about it.

Best, Hequn.

On Fri, Jul 13, 2018 at 11:53 AM, Yuan,Youjun 
mailto:yuanyou...@baidu.com>> wrote:
Hi Hequn,

I am using Flink 1.4. The job was running with  1 parallelism.

I don’t think the extra records are caused by different keys, because:

  1.  I ran 2 jobs consuming the same source, jobA with 2-minute window, and 
job with 4-minute window. If there are wired keys, then jobA will get no more 
records than jobB, for the same period. But that not true, jobA got 17 records 
while jobB got 11. Relevant results could be found below.
  2.  For each window, I output the min and max timestamp, and found that those 
extra records always start at the last few milliseconds of the 2 or 4-minte 
windows, just before window got closed. I also noticed the windows did not have 
a clear cut between minutes, as we can see in jobA’s output, ts 1531448399978 
appears in 18 result records, either as start, or end, or both.

jobA(2-minute window) output
{"timestamp":153144804,"cnt":1668052,"userId":"user01","min_ts":1531448040003,"max_ts":1531448159985}
{"timestamp":153144816,"cnt":1613188,"userId":"user01","min_ts":1531448159985,"max_ts":1531448279979}
{"timestamp":153144828,"cnt":1664652,"userId":"user01","min_ts":1531448280004,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":4,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_

答复: 答复: TumblingProcessingTimeWindow emits extra results for a same window

2018-07-12 Thread Yuan,Youjun
Hi Hequn,

I am using Flink 1.4. The job was running with  1 parallelism.

I don’t think the extra records are caused by different keys, because:

  1.  I ran 2 jobs consuming the same source, jobA with 2-minute window, and 
job with 4-minute window. If there are wired keys, then jobA will get no more 
records than jobB, for the same period. But that not true, jobA got 17 records 
while jobB got 11. Relevant results could be found below.
  2.  For each window, I output the min and max timestamp, and found that those 
extra records always start at the last few milliseconds of the 2 or 4-minte 
windows, just before window got closed. I also noticed the windows did not have 
a clear cut between minutes, as we can see in jobA’s output, ts 1531448399978 
appears in 18 result records, either as start, or end, or both.

jobA(2-minute window) output
{"timestamp":153144804,"cnt":1668052,"userId":"user01","min_ts":1531448040003,"max_ts":1531448159985}
{"timestamp":153144816,"cnt":1613188,"userId":"user01","min_ts":1531448159985,"max_ts":1531448279979}
{"timestamp":153144828,"cnt":1664652,"userId":"user01","min_ts":1531448280004,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":4,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":2,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144828,"cnt":3,"userId":"user01","min_ts":1531448399978,"max_ts":1531448399978}
{"timestamp":153144840,"cnt":1593435,"userId":"user01","min_ts":1531448399978,"max_ts":1531448519978}

jobB(4-minute window) output
{"timestamp":153144792,"cnt":3306838,"userId":"user01","min_ts":1531447919981,"max_ts":1531448159975}
{"timestamp":1531448160000,"cnt":3278178,"userId":"user01","min_ts":1531448159098,"max_ts":1531448399977}
{"timestamp":153144816,"cnt":4,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399977}
{"timestamp":153144816,"cnt":5,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399977}
{"timestamp":153144816,"cnt":8,"userId":"user01","min_ts":1531448399977,"max_ts":1531448399978}
{"timestamp":153144816,"cnt":7,"userId":&quo

答复: TumblingProcessingTimeWindow emits extra results for a same window

2018-07-12 Thread Yuan,Youjun
Hi Timo,

This problem happens 4-5 times a day on our online server, with ~15k events per 
second load, and it is using PROCESSING time. So I don’t think I can stably 
reproduce the issue on my local machine.
The user ids are actually the same, I have doubled checked that.

Now, I am wondering could it possible that, after a window fires, some last 
events came but that still fall to the time range of the just fired window, 
hence new windows are assigned, and fired later. This can explain why the extra 
records always contain only a few events (cnt is small).

To verify that, I just modified the SQL to also output the MIN timestamp of 
each windows, and I found the MIN timestamp of the extra records are always 
point to the LAST second of the window.
Here is the output I just got, note 1531395119 is the last second of a 2-minute 
window start from 1531395000.

{"timestamp":153139476,"cnt":1536013,"userId":"user01","min_sec":1531394760}
{"timestamp":153139488,"cnt":1459623,"userId":"user01","min_sec":1531394879}
{"timestamp":153139500,"cnt":1446010,"userId":"user01","min_sec":1531395000}
{"timestamp":153139500,"cnt":7,"userId":"user01","min_sec":1531395119}
{"timestamp":153139500,"cnt":3,"userId":"user01","min_sec":1531395119}
{"timestamp":153139500,"cnt":3,"userId":"user01","min_sec":1531395119}
{"timestamp":153139500,"cnt":6,"userId":"user01","min_sec":1531395119}
{"timestamp":153139500,"cnt":3,"userId":"user01","min_sec":1531395119}
{"timestamp":153139500,"cnt":2,"userId":"user01","min_sec":1531395119}
{"timestamp":153139500,"cnt":2,"userId":"user01","min_sec":1531395119}
{"timestamp":153139500,"cnt":2,"userId":"user01","min_sec":1531395119}

The modified SQL:
INSERT INTO sink
SELECT
TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
count(vehicleId) AS cnt, userId,
MIN(EXTRACT(EPOCH FROM rowtime)) AS min_sec
FROM source
GROUP BY
TUMBLE(rowtime, INTERVAL '2' MINUTE),
userId

thanks
Youjun

发件人: Timo Walther 
发送时间: Thursday, July 12, 2018 5:02 PM
收件人: user@flink.apache.org
主题: Re: TumblingProcessingTimeWindow emits extra results for a same window

Hi Yuan,

this sounds indeed weird. The SQL API uses regular DataStream API windows 
underneath so this problem should have come up earlier if this is problem in 
the implementation. Does this behavior reproducible on your local machine?

One thing that comes to my mind is that the "userId"s might not be 100% 
identical (same hashCode/equals method) because otherwise they would be 
properly grouped.

Regards,
Timo

Am 12.07.18 um 09:35 schrieb Yuan,Youjun:
Hi community,

I have a job which counts event number every 2 minutes, with TumblingWindow in 
ProcessingTime. However, it occasionally produces extra DUPLICATED records. For 
instance, for timestamp 153136848 below, it emits a normal result 
(cnt=1641161), and then followed by a few more records with very small result 
(2, 3, etc).

Can anyone shed some light on the possible reason, or how to fix it?

Below are the sample output.
---
{"timestamp":153136824,"cnt":1537821,"userId":"user01"}
{"timestamp":153136836,"cnt":1521464,"userId":"user01"}
{"timestamp":153136848,"cnt":1641161,"userId":"user01"}
{"timestamp":153136848,"cnt":2,"userId":"user01"}
{"timestamp":153136848,"cnt":3,"userId":"user01"}
{"timestamp":153136848,"cnt":3,"userId":"user01"}

And here is the job SQL:
---
INSERT INTO sink
SELECT
TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
count(vehicleId) AS cnt,
userId
FROM source
GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE),
userId

Thanks,
Youjun Yuan




TumblingProcessingTimeWindow emits extra results for a same window

2018-07-12 Thread Yuan,Youjun
Hi community,

I have a job which counts event number every 2 minutes, with TumblingWindow in 
ProcessingTime. However, it occasionally produces extra DUPLICATED records. For 
instance, for timestamp 153136848 below, it emits a normal result 
(cnt=1641161), and then followed by a few more records with very small result 
(2, 3, etc).

Can anyone shed some light on the possible reason, or how to fix it?

Below are the sample output.
---
{"timestamp":153136824,"cnt":1537821,"userId":"user01"}
{"timestamp":153136836,"cnt":1521464,"userId":"user01"}
{"timestamp":153136848,"cnt":1641161,"userId":"user01"}
{"timestamp":153136848,"cnt":2,"userId":"user01"}
{"timestamp":153136848,"cnt":3,"userId":"user01"}
{"timestamp":153136848,"cnt":3,"userId":"user01"}

And here is the job SQL:
---
INSERT INTO sink
SELECT
TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS `timestamp`,
count(vehicleId) AS cnt,
userId
FROM source
GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE),
userId

Thanks,
Youjun Yuan