[ 
https://issues.apache.org/jira/browse/FLINK-33657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zlzhang0122 updated FLINK-33657:
--------------------------------
    Description: 
The new insert message in top n without row number didn't consider it's order 
and just 
collectInsert() to the next operator, this may not correct when the next 
operator collect all the top n records and aggregate it.
 
For example:
create table user_info(

user_id int,

item_id int,
app string,

dt timestamp
) whith(
'connector'='kafka',

...
);
create table redis_sink (
redis_key string,
hash_key string,
hash_value string
)
with (
'connector' = 'redis',
'command' = 'hmset'
'nodes' = 'xxx',
'additional-ttl' = 'xx'
);
create view base_lastn

as select * from(
select user_id,item_id,app,dt,row_number() over(partition by item_id, app order 
by dt desc) as rn from user_action
)t where rn<=5;

insert into redis_sink

select
concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value from 
base_lastn where rn=1;
insert into redis_sink

select
concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") as 
hash_value from base_lastn where group by item_id, app;
 

There will be a scene that the value in the top 1 will not appear in the first 
or last value of the top 5.

  was:
The new insert message in top n without row number didn't consider it's order 
and just 
collectInsert to the next operator, this may not correct when the next operator 
collect all the top n records and aggregate it.
 
For example:
create table user_info(

user_id int,

item_id int,
app string,

dt timestamp
) whith(
'connector'='kafka',

...
);
create table redis_sink (
redis_key string,
hash_key string,
hash_value string
)
with (
'connector' = 'redis',
'command' = 'hmset'
'nodes' = 'xxx',
'additional-ttl' = 'xx'
);
create view base_lastn

as select * from(
select user_id,item_id,app,dt,row_number() over(partition by item_id, app order 
by dt desc) as rn from user_action
)t where rn<=5;

insert into redis_sink

select
concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value from 
base_lastn where rn=1;
insert into redis_sink

select
concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") as 
hash_value from base_lastn where group by item_id, app;
 

There will be a scene that the value in the top 1 will not appear in the first 
or last value of the top 5.


> Insert message in top n without row number didn't consider it's number and 
> may not correct
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-33657
>                 URL: https://issues.apache.org/jira/browse/FLINK-33657
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>    Affects Versions: 1.16.2, 1.17.1
>            Reporter: zlzhang0122
>            Priority: Major
>
> The new insert message in top n without row number didn't consider it's order 
> and just 
> collectInsert() to the next operator, this may not correct when the next 
> operator collect all the top n records and aggregate it.
>  
> For example:
> create table user_info(
> user_id int,
> item_id int,
> app string,
> dt timestamp
> ) whith(
> 'connector'='kafka',
> ...
> );
> create table redis_sink (
> redis_key string,
> hash_key string,
> hash_value string
> )
> with (
> 'connector' = 'redis',
> 'command' = 'hmset'
> 'nodes' = 'xxx',
> 'additional-ttl' = 'xx'
> );
> create view base_lastn
> as select * from(
> select user_id,item_id,app,dt,row_number() over(partition by item_id, app 
> order by dt desc) as rn from user_action
> )t where rn<=5;
> insert into redis_sink
> select
> concat("id", item_id) as redis_key, app as hash_key,user_id as hash_value 
> from base_lastn where rn=1;
> insert into redis_sink
> select
> concat("list_", item_id) as redis_key, app as hash_key,listagg(user_id,",") 
> as hash_value from base_lastn where group by item_id, app;
>  
> There will be a scene that the value in the top 1 will not appear in the 
> first or last value of the top 5.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to