?????? flink-1.12.0 ?????? ???? lag????

2021-09-21 文章 kcz
tks??




----
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-19449

kcz <573693...@qq.com.invalid ??2021??9??22?? 11:41??

 
 behavior,next_bv 


 ??
 {
 nbsp;nbsp;"user_id":nbsp;1,
 nbsp;nbsp;"item_id":nbsp;1,
 nbsp;nbsp;"behavior":"pv1"
 }
 {
 nbsp;nbsp;"user_id":nbsp;1,
 nbsp;nbsp;"item_id":nbsp;1,
 nbsp;nbsp;"behavior":"pv2"
 }






 CREATE TABLE KafkaTable (
 nbsp; `user_id` BIGINT,
 nbsp; `item_id` BIGINT,
 nbsp; `behavior` STRING,
 nbsp; proctime as PROCTIME()
 ) WITH (
 nbsp; 'connector' = 'kafka',
 nbsp; 'topic' = 'user_behavior',
 nbsp; 'properties.bootstrap.servers' = '',
 nbsp; 'properties.group.id' = 'testGroup',
 nbsp; 'scan.startup.mode' = 'earliest-offset',
 nbsp; 'format' = 'json'
 );



 SELECT
 user_id,
 item_id,
 behavior,
 next_bvnbsp;
 FROM
 ( SELECT *, lag( behavior, 
1 ) over ( PARTITION BY user_id ORDER
 BY proctime ) AS next_bv FROM KafkaTable ) t;



-- 

Best,
Benchao Li

flink-1.12.0 ?????? ???? lag????

2021-09-21 文章 kcz

behavior,next_bv 


??
{
"user_id":1,
"item_id":1,
"behavior":"pv1"
}
{
"user_id":1,
"item_id":1,
"behavior":"pv2"
}






CREATE TABLE KafkaTable (
 `user_id` BIGINT,
 `item_id` BIGINT,
 `behavior` STRING,
 proctime as PROCTIME()
) WITH (
 'connector' = 'kafka',
 'topic' = 'user_behavior',
 'properties.bootstrap.servers' = '',
 'properties.group.id' = 'testGroup',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'json'
);



SELECT
user_id,
item_id,
behavior,
next_bv
FROM
( SELECT *, lag( behavior, 1 ) over ( PARTITION BY user_id ORDER BY 
proctime ) AS next_bv FROM KafkaTable ) t;