[ https://issues.apache.org/jira/browse/FLINK-33657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zlzhang0122 updated FLINK-33657: -------------------------------- Description: The new insert message in top n without row number didn't consider it's order and just collectInsert() to the next operator, this may not correct when the next operator collect all the top n records and aggregate it. For example: create table user_info( user_id int, item_id int, app string, dt timestamp ) whith( 'connector'='kafka', ... ); create table redis_sink ( redis_key string, hash_key string, hash_value string ) with ( 'connector' = 'redis', 'command' = 'hmset' 'nodes' = 'xxx', 'additional-ttl' = 'xx' ); create view base_lastn as select * from( select user_id,item_id,app,dt,row_number() over(partition by item_id, app order by dt desc) as rn from user_action )t where rn<=5; insert into redis_sink select concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value from base_lastn where rn=1; insert into redis_sink select concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") as hash_value from base_lastn where group by item_id, app; There will be a scene that the value in the top 1 will not appear in the first or last value of the top 5. was: The new insert message in top n without row number didn't consider it's order and just collectInsert to the next operator, this may not correct when the next operator collect all the top n records and aggregate it. For example: create table user_info( user_id int, item_id int, app string, dt timestamp ) whith( 'connector'='kafka', ... ); create table redis_sink ( redis_key string, hash_key string, hash_value string ) with ( 'connector' = 'redis', 'command' = 'hmset' 'nodes' = 'xxx', 'additional-ttl' = 'xx' ); create view base_lastn as select * from( select user_id,item_id,app,dt,row_number() over(partition by item_id, app order by dt desc) as rn from user_action )t where rn<=5; insert into redis_sink select concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value from base_lastn where rn=1; insert into redis_sink select concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") as hash_value from base_lastn where group by item_id, app; There will be a scene that the value in the top 1 will not appear in the first or last value of the top 5. > Insert message in top n without row number didn't consider it's number and > may not correct > ------------------------------------------------------------------------------------------ > > Key: FLINK-33657 > URL: https://issues.apache.org/jira/browse/FLINK-33657 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime > Affects Versions: 1.16.2, 1.17.1 > Reporter: zlzhang0122 > Priority: Major > > The new insert message in top n without row number didn't consider it's order > and just > collectInsert() to the next operator, this may not correct when the next > operator collect all the top n records and aggregate it. > > For example: > create table user_info( > user_id int, > item_id int, > app string, > dt timestamp > ) whith( > 'connector'='kafka', > ... > ); > create table redis_sink ( > redis_key string, > hash_key string, > hash_value string > ) > with ( > 'connector' = 'redis', > 'command' = 'hmset' > 'nodes' = 'xxx', > 'additional-ttl' = 'xx' > ); > create view base_lastn > as select * from( > select user_id,item_id,app,dt,row_number() over(partition by item_id, app > order by dt desc) as rn from user_action > )t where rn<=5; > insert into redis_sink > select > concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value > from base_lastn where rn=1; > insert into redis_sink > select > concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") > as hash_value from base_lastn where group by item_id, app; > > There will be a scene that the value in the top 1 will not appear in the > first or last value of the top 5. -- This message was sent by Atlassian Jira (v8.20.10#820010)