[ https://issues.apache.org/jira/browse/FLINK-32778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
simenliuxing updated FLINK-32778: --------------------------------- Description: – flink version:1.16.1 – parallelism.default: 1 CREATE TABLE s1( id string, gk bigint, price int )WITH( 'connector' = 'kafka' ,'properties.bootstrap.servers' = 'xx:9092' ,'properties.group.id' = 'xx-xx' ,'scan.startup.mode' = 'earliest-offset' ,'value.format' = 'json' ,'topic' = 'topic1' ); CREATE TABLE s2( id string, name string )WITH( 'connector' = 'kafka' ,'properties.bootstrap.servers' = 'xx:9092' ,'properties.group.id' = 'xx-xx' ,'scan.startup.mode' = 'earliest-offset' ,'value.format' = 'json' ,'topic' = 'topic2' ); create table sink( id string, name string, gk bigint, price int )with( 'connector'='print' ); create view v1 as select id, gk, last_value(price) price from s1 group by id,gk; insert into sink select v1.id, s2.name, v1.gk, v1.price from v1 left join s2 on v1.id=s2.id; 1.Enter two pieces of data into the topic1 topic: {"id":"1","gk":758,"price":100} {"id":"1","gk":1818,"price":200} The output is as follows: +I[1, null, 758, 100] +I[1, null, 1818, 200] 2.Enter two pieces of data into the topic2 topic: {"id":1,"name":"z3"} The output is as follows: -D[1, null, 1818, 200] -D[1, null, 758, 100] +I[1, z3, 1818, 200] +I[1, z3, 758, 100] My doubt is that the output should be in the order of input , like below: -D[1, null, 758, 100] -D[1, null, 1818, 200] +I[1, z3, 758, 100] +I[1, z3, 1818, 200] 3.When I re-run the above sql, the results are output in the order of input +I[1, z3, 758, 100] +I[1, z3, 1818, 200] Is there a way to control this uncertainty? was: -- flink version:1.16.1 -- parallelism.default: 1 CREATE TABLE s1( id string, gk bigint, price int )WITH( 'connector' = 'kafka' ,'properties.bootstrap.servers' = 'xx:9092' ,'properties.group.id' = 'xx-xx' ,'scan.startup.mode' = 'earliest-offset' ,'value.format' = 'json' ,'topic' = 'topic1' ); CREATE TABLE s2( id string, name string )WITH( 'connector' = 'kafka' ,'properties.bootstrap.servers' = 'xx:9092' ,'properties.group.id' = 'xx-xx' ,'scan.startup.mode' = 'earliest-offset' ,'value.format' = 'json' ,'topic' = 'topic2' ); create table sink( id string, name string, gk bigint, price int )with( 'connector'='print' ); create view v1 as select id, gk, last_value(price) price from s1 group by id,gk; insert into sink select v1.id, s2.name, v1.gk, v1.price from v1 left join s2 on v1.id=s2.id; 1.Enter two pieces of data into the topic1 topic: {"id":"1","gk":758,"price":100} {"id":"1","gk":1818,"price":200} The output is as follows: +I[1, null, 758, 100] +I[1, null, 1818, 200] 2.Enter two pieces of data into the topic2 topic: {"id":1,"name":"z3"} The output is as follows: -D[1, null, 1818, 200] -D[1, null, 758, 100] +I[1, z3, 1818, 200] +I[1, z3, 758, 100] My doubt is that the output should be in the order of input , like below: -D[1, null, 758, 100] -D[1, null, 1818, 200] +I[1, z3, 758, 100] +I[1, z3, 1818, 200] 3.When I re-run the above sql, the results are output in the order of input +I[1, z3, 758, 100] +I[1, z3, 1818, 200] Is there a way to control this uncertainty? > Stream join data output sequence is inconsistent with input sequence > -------------------------------------------------------------------- > > Key: FLINK-32778 > URL: https://issues.apache.org/jira/browse/FLINK-32778 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Runtime > Affects Versions: 1.16.1 > Reporter: simenliuxing > Priority: Major > Fix For: 1.7.3 > > > – flink version:1.16.1 > – parallelism.default: 1 > CREATE TABLE s1( > id string, > gk bigint, > price int > )WITH( > 'connector' = 'kafka' > ,'properties.bootstrap.servers' = 'xx:9092' > ,'properties.group.id' = 'xx-xx' > ,'scan.startup.mode' = 'earliest-offset' > ,'value.format' = 'json' > ,'topic' = 'topic1' > ); > CREATE TABLE s2( > id string, > name string > )WITH( > 'connector' = 'kafka' > ,'properties.bootstrap.servers' = 'xx:9092' > ,'properties.group.id' = 'xx-xx' > ,'scan.startup.mode' = 'earliest-offset' > ,'value.format' = 'json' > ,'topic' = 'topic2' > ); > create table sink( > id string, > name string, > gk bigint, > price int > )with( > 'connector'='print' > ); > create view v1 as select > id, > gk, > last_value(price) price > from s1 > group by id,gk; > insert into sink > select > v1.id, > s2.name, > v1.gk, > v1.price > from v1 > left join s2 on v1.id=s2.id; > 1.Enter two pieces of data into the topic1 topic: > {"id":"1","gk":758,"price":100} > {"id":"1","gk":1818,"price":200} > The output is as follows: > +I[1, null, 758, 100] > +I[1, null, 1818, 200] > 2.Enter two pieces of data into the topic2 topic: > {"id":1,"name":"z3"} > The output is as follows: > -D[1, null, 1818, 200] > -D[1, null, 758, 100] > +I[1, z3, 1818, 200] > +I[1, z3, 758, 100] > My doubt is that the output should be in the order of input , like below: > -D[1, null, 758, 100] > -D[1, null, 1818, 200] > +I[1, z3, 758, 100] > +I[1, z3, 1818, 200] > 3.When I re-run the above sql, the results are output in the order of input > +I[1, z3, 758, 100] > +I[1, z3, 1818, 200] > Is there a way to control this uncertainty? > -- This message was sent by Atlassian Jira (v8.20.10#820010)