tks??
----
??:
"user-zh"
https://issues.apache.org/jira/browse/FLINK-19449
kcz <573693...@qq.com.invalid ??2021??9??22?? 11:41??
behavior,next_bv
??
{
nbsp;nbsp;"user_id":nbsp;1,
nbsp;nbsp;"item_id":nbsp;1,
nbsp;nbsp;"behavior":"pv1"
}
{
nbsp;nbsp;"user_id":nbsp;1,
nbsp;nbsp;"item_id":nbsp;1,
nbsp;nbsp;"behavior":"pv2"
}
CREATE TABLE KafkaTable (
nbsp; `user_id` BIGINT,
nbsp; `item_id` BIGINT,
nbsp; `behavior` STRING,
nbsp; proctime as PROCTIME()
) WITH (
nbsp; 'connector' = 'kafka',
nbsp; 'topic' = 'user_behavior',
nbsp; 'properties.bootstrap.servers' = '',
nbsp; 'properties.group.id' = 'testGroup',
nbsp; 'scan.startup.mode' = 'earliest-offset',
nbsp; 'format' = 'json'
);
SELECT
user_id,
item_id,
behavior,
next_bvnbsp;
FROM
( SELECT *, lag( behavior,
1 ) over ( PARTITION BY user_id ORDER
BY proctime ) AS next_bv FROM KafkaTable ) t;
--
Best,
Benchao Li