[ https://issues.apache.org/jira/browse/FLINK-33657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zlzhang0122 updated FLINK-33657: -------------------------------- Summary: Insert message in top n without row number didn't consider it's number and may incorrect (was: Insert message in top n without row number didn't consider it's number and may not correct) > Insert message in top n without row number didn't consider it's number and > may incorrect > ---------------------------------------------------------------------------------------- > > Key: FLINK-33657 > URL: https://issues.apache.org/jira/browse/FLINK-33657 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime > Affects Versions: 1.16.2, 1.17.1 > Reporter: zlzhang0122 > Priority: Major > > The new insert message in top n without row number didn't consider it's order > and just > collectInsert() to the next operator, this may incorrect when the next > operator collect all the top n records and aggregate it. > > For example: > create table user_info( > user_id int, > item_id int, > app string, > dt timestamp > ) whith( > 'connector'='kafka', > ... > ); > create table redis_sink ( > redis_key string, > hash_key string, > hash_value string > ) > with ( > 'connector' = 'redis', > 'command' = 'hmset' > 'nodes' = 'xxx', > 'additional-ttl' = 'xx' > ); > create view base_lastn > as select * from( > select user_id,item_id,app,dt,row_number() over(partition by item_id, app > order by dt desc) as rn from user_action > )t where rn<=5; > insert into redis_sink > select > concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value > from base_lastn where rn=1; > insert into redis_sink > select > concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") > as hash_value from base_lastn where group by item_id, app; > > There will be a scene that the value in the top 1 will not appear in the > first or last value of the top 5. -- This message was sent by Atlassian Jira (v8.20.10#820010)