[ 
https://issues.apache.org/jira/browse/FLINK-32778?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

simenliuxing updated FLINK-32778:
---------------------------------
    Description: 
– flink version:1.16.1
– parallelism.default: 1

CREATE TABLE s1(
    id string,
    gk bigint,
    price int
 )WITH(
    'connector' = 'kafka'
    ,'properties.bootstrap.servers' = 'xx:9092'
    ,'properties.group.id' = 'xx-xx'
    ,'scan.startup.mode' = 'earliest-offset'
    ,'value.format' = 'json'
    ,'topic' = 'topic1'
 );

CREATE TABLE s2(
    id string,
    name string
 )WITH(
    'connector' = 'kafka'
    ,'properties.bootstrap.servers' = 'xx:9092'
    ,'properties.group.id' = 'xx-xx'
    ,'scan.startup.mode' = 'earliest-offset'
    ,'value.format' = 'json'
    ,'topic' = 'topic2'
 );

create table sink(
    id string,
    name string,
    gk bigint,
    price int
 )with(
    'connector'='print'
 );

create view v1 as select
    id,
    gk,
    last_value(price) price
from s1
group by id,gk;

insert into sink
select
    v1.id,
    s2.name,
    v1.gk,
    v1.price
from v1
left join s2 on v1.id=s2.id;

1.Enter two pieces of data into the topic1 topic:

{"id":"1","gk":758,"price":100}

{"id":"1","gk":1818,"price":200}

The output is as follows:
+I[1, null, 758, 100]
+I[1, null, 1818, 200]

2.Enter two pieces of data into the topic2 topic:

{"id":1,"name":"z3"}

The output is as follows:
-D[1, null, 1818, 200]
-D[1, null, 758, 100]
+I[1, z3, 1818, 200]
+I[1, z3, 758, 100]

My doubt is that the output should be in the order of input , like below:
-D[1, null, 758, 100]
-D[1, null, 1818, 200]
+I[1, z3, 758, 100]
+I[1, z3, 1818, 200]

3.When I re-run the above sql, the results are output in the order of input
+I[1, z3, 758, 100]
+I[1, z3, 1818, 200]

Is there a way to control this uncertainty?

 

  was:
-- flink version:1.16.1
-- parallelism.default: 1

CREATE TABLE s1(
    id string,
    gk bigint,
    price int
 )WITH(
    'connector' = 'kafka'
    ,'properties.bootstrap.servers' = 'xx:9092'
    ,'properties.group.id' = 'xx-xx'
    ,'scan.startup.mode' = 'earliest-offset'
    ,'value.format' = 'json'
    ,'topic' = 'topic1'
 );

CREATE TABLE s2(
    id string,
    name string
 )WITH(
    'connector' = 'kafka'
    ,'properties.bootstrap.servers' = 'xx:9092'
    ,'properties.group.id' = 'xx-xx'
    ,'scan.startup.mode' = 'earliest-offset'
    ,'value.format' = 'json'
    ,'topic' = 'topic2'
 );

create table sink(
    id string,
    name string,
    gk bigint,
    price int
 )with(
    'connector'='print'
 );

create view v1 as select
    id,
    gk,
    last_value(price) price
from s1
group by id,gk;

insert into sink
select
    v1.id,
    s2.name,
    v1.gk,
    v1.price
from v1
left join s2 on v1.id=s2.id;

1.Enter two pieces of data into the topic1 topic:
{"id":"1","gk":758,"price":100}
{"id":"1","gk":1818,"price":200}

The output is as follows:
+I[1, null, 758, 100]
+I[1, null, 1818, 200]

2.Enter two pieces of data into the topic2 topic:
{"id":1,"name":"z3"}

The output is as follows:
-D[1, null, 1818, 200]
-D[1, null, 758, 100]
+I[1, z3, 1818, 200]
+I[1, z3, 758, 100]

My doubt is that the output should be in the order of input , like below:
-D[1, null, 758, 100]
-D[1, null, 1818, 200]
+I[1, z3, 758, 100]
+I[1, z3, 1818, 200]

3.When I re-run the above sql, the results are output in the order of input
+I[1, z3, 758, 100]
+I[1, z3, 1818, 200]

Is there a way to control this uncertainty?

 


> Stream join data output sequence is inconsistent with input sequence
> --------------------------------------------------------------------
>
>                 Key: FLINK-32778
>                 URL: https://issues.apache.org/jira/browse/FLINK-32778
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>    Affects Versions: 1.16.1
>            Reporter: simenliuxing
>            Priority: Major
>             Fix For: 1.7.3
>
>
> – flink version:1.16.1
> – parallelism.default: 1
> CREATE TABLE s1(
>     id string,
>     gk bigint,
>     price int
>  )WITH(
>     'connector' = 'kafka'
>     ,'properties.bootstrap.servers' = 'xx:9092'
>     ,'properties.group.id' = 'xx-xx'
>     ,'scan.startup.mode' = 'earliest-offset'
>     ,'value.format' = 'json'
>     ,'topic' = 'topic1'
>  );
> CREATE TABLE s2(
>     id string,
>     name string
>  )WITH(
>     'connector' = 'kafka'
>     ,'properties.bootstrap.servers' = 'xx:9092'
>     ,'properties.group.id' = 'xx-xx'
>     ,'scan.startup.mode' = 'earliest-offset'
>     ,'value.format' = 'json'
>     ,'topic' = 'topic2'
>  );
> create table sink(
>     id string,
>     name string,
>     gk bigint,
>     price int
>  )with(
>     'connector'='print'
>  );
> create view v1 as select
>     id,
>     gk,
>     last_value(price) price
> from s1
> group by id,gk;
> insert into sink
> select
>     v1.id,
>     s2.name,
>     v1.gk,
>     v1.price
> from v1
> left join s2 on v1.id=s2.id;
> 1.Enter two pieces of data into the topic1 topic:
> {"id":"1","gk":758,"price":100}
> {"id":"1","gk":1818,"price":200}
> The output is as follows:
> +I[1, null, 758, 100]
> +I[1, null, 1818, 200]
> 2.Enter two pieces of data into the topic2 topic:
> {"id":1,"name":"z3"}
> The output is as follows:
> -D[1, null, 1818, 200]
> -D[1, null, 758, 100]
> +I[1, z3, 1818, 200]
> +I[1, z3, 758, 100]
> My doubt is that the output should be in the order of input , like below:
> -D[1, null, 758, 100]
> -D[1, null, 1818, 200]
> +I[1, z3, 758, 100]
> +I[1, z3, 1818, 200]
> 3.When I re-run the above sql, the results are output in the order of input
> +I[1, z3, 758, 100]
> +I[1, z3, 1818, 200]
> Is there a way to control this uncertainty?
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to