[ 
https://issues.apache.org/jira/browse/FLINK-27418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

zhangbin updated FLINK-27418:
-----------------------------
    Description: 
Flink SQL TopN is executed multiple times with different results, sometimes 
with correct results and sometimes with incorrect results.

Example:
{code:java}
@Test
    public void flinkSqlJoinRetract() {
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
        streamEnv.setParallelism(1);
        StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(streamEnv, settings);
        tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000));

        RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo();
        RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo();
        SourceFunction<Row> waybillSourceFunction = 
buildWaybillStreamSource(waybillTableTypeInfo);
        SourceFunction<Row> itemSourceFunction = 
buildItemStreamSource(itemTableTypeInfo);
        String waybillTable = "waybill";
        String itemTable = "item";

        DataStreamSource<Row> waybillStream = streamEnv.addSource(
                waybillSourceFunction,
                waybillTable,
                waybillTableTypeInfo);
        DataStreamSource<Row> itemStream = streamEnv.addSource(
                itemSourceFunction,
                itemTable,
                itemTableTypeInfo);

        Expression[] waybillFields = ExpressionParser
                .parseExpressionList(String.join(",", 
waybillTableTypeInfo.getFieldNames())
                        + ",proctime.proctime").toArray(new Expression[0]);
        Expression[] itemFields = ExpressionParser
                .parseExpressionList(
                        String.join(",", itemTableTypeInfo.getFieldNames()) + 
",proctime.proctime")
                .toArray(new Expression[0]);

        tableEnv.createTemporaryView(waybillTable, waybillStream, 
waybillFields);
        tableEnv.createTemporaryView(itemTable, itemStream, itemFields);

        String sql =
                "select \n"
                + "    city_id, \n"
                + "    count(*) as cnt\n"
                + "from (\n"
                + "    select id,city_id\n"
                + "    from (\n"
                + "        select \n"
                + "            id,\n"
                + "            city_id,\n"
                + "            row_number() over(partition by id order by utime 
desc ) as rno \n"
                + "        from (\n"
                + "            select \n"
                + "                waybill.id as id,\n"
                + "                coalesce(item.city_id, waybill.city_id) as 
city_id,\n"
                + "                waybill.utime as utime \n"
                + "            from waybill left join item \n"
                + "            on waybill.id = item.id \n"
                + "        ) \n"
                + "    )\n"
                + "    where rno =1\n"
                + ")\n"
                + "group by city_id";

        StatementSet statementSet = tableEnv.createStatementSet();
        Table table = tableEnv.sqlQuery(sql);
        DataStream<Tuple2<Boolean, Row>> rowDataStream = 
tableEnv.toRetractStream(table, Row.class);
        rowDataStream.printToErr();
        try {
            streamEnv.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static RowTypeInfo buildWaybillTableTypeInfo() {
        TypeInformation[] types = new TypeInformation[]{Types.INT(), 
Types.STRING(), Types.LONG(), Types.LONG()};
        String[] fields = new String[]{"id", "city_id", "rider_id", "utime"};
        return new RowTypeInfo(types, fields);
    }

    private static RowTypeInfo buildItemTableTypeInfo() {
        TypeInformation[] types = new TypeInformation[]{Types.INT(), 
Types.STRING(), Types.LONG()};
        String[] fields = new String[]{"id", "city_id", "utime"};
        return new RowTypeInfo(types, fields);
    }

    //id,rider_id,city_id,utime
    private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo 
rowTypeInfo) {
        return new SourceFunction<Row>() {
            private volatile boolean stopped = false;
            int count = 0;
            int[] ids = {111, 222, 333, 111};
            String[] cityIds = {"A", "A", "B", "A"};

            @Override
            public void run(SourceContext<Row> ctx) throws Exception {
                while (!stopped) {
                    int id = ids[count % ids.length];
                    String cityId = cityIds[count % cityIds.length];
                    Row row = new Row(4);
                    row.setField(0, id);
                    row.setField(1, cityId);
                    row.setField(2, (long) RandomUtils.nextInt(1000, 2000));
                    row.setField(3, System.currentTimeMillis());
                    printRow(rowTypeInfo, row);
                    ctx.collect(row);
                    if (++count > 3) {
                        stopped = true;
                    }
                }
            }

            @Override
            public void cancel() {
                stopped = true;
            }
        };
    }

    //id,city_id,utime
    private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo 
rowTypeInfo) {
        return new SourceFunction<Row>() {
            private volatile boolean stopped = false;
            int count = 0;
            int[] ids = {111, 333};
            String[] cityIds = {"C", "D"};

            @Override
            public void run(SourceContext<Row> ctx) throws Exception {
                while (!stopped) {
                    Thread.sleep(RandomUtils.nextInt(1000, 2000));
                    int id = ids[count % ids.length];
                    String cityId = cityIds[count % cityIds.length];
                    Row row = new Row(3);
                    row.setField(0, id);
                    row.setField(1, cityId);
                    //row.setField(2, System.currentTimeMillis());
                    printRow(rowTypeInfo, row);
                    ctx.collect(row);
                    if (++count >= 2) {
                        stopped = true;
                    }

                }
            }

            @Override
            public void cancel() {
                stopped = true;
            }
        };
    }

    public static void printRow(RowTypeInfo rowTypeInfo, Row row) {
        String prefix = "";
        for (int i = 0; i < rowTypeInfo.getArity(); ++i) {
            prefix = i > 0 ? "," : "";
            System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + 
row.getField(i));
        }
        System.out.println();
    }

{code}
------------------------------------------------------------
|*wrong result*||right result||
|id:111,city_id:A,rider_id:1137,utime:1650979957702
id:222,city_id:A,rider_id:1976,utime:1650979957725
id:333,city_id:B,rider_id:1916,utime:1650979957725
id:111,city_id:A,rider_id:1345,utime:1650979957725
(true,A,1)
(false,A,1)
(true,A,2)
(true,B,1)
(false,A,2)
(true,A,1)
(false,A,1)
(true,A,2)
id:111,city_id:C,utime:null
(false,A,2)
(true,A,1)
(true,C,1)
(false,A,1)
(false,C,1)
(true,C,2)
id:333,city_id: D,utime:null
(false,B,1)
(true,D,1)
The final result:
C,2
D,1
is wrong.| 
id:111,city_id:A,rider_id:1155,utime:1650980662019
id:222,city_id:A,rider_id:1875,utime:1650980662042
id:333,city_id:B,rider_id:1430,utime:1650980662042
id:111,city_id:A,rider_id:1308,utime:1650980662042
(true,A,1)
(false,A,1)
(true,A,2)
(true,B,1)
(false,A,2)
(true,A,1)
(false,A,1)
(true,A,2)
id:111,city_id:C,utime:null
(false,A,2)
(true,A,1)
(false,A,1)
(true,A,2)
(false,A,2)
(true,A,1)
(true,C,1)
id:333,city_id: D,utime:null
(false,B,1)
(true,D,1)
The final result:
A,1
C,1
D,1
is right.|

 

 

  was:
Flink SQL TopN is executed multiple times with different results, sometimes 
with correct results and sometimes with incorrect results.

Example:
{code:java}
@Test
    public void flinkSqlJoinRetract() {
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
        streamEnv.setParallelism(1);
        StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(streamEnv, settings);
        tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000));

        RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo();
        RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo();
        SourceFunction<Row> waybillSourceFunction = 
buildWaybillStreamSource(waybillTableTypeInfo);
        SourceFunction<Row> itemSourceFunction = 
buildItemStreamSource(itemTableTypeInfo);
        String waybillTable = "waybill";
        String itemTable = "item";

        DataStreamSource<Row> waybillStream = streamEnv.addSource(
                waybillSourceFunction,
                waybillTable,
                waybillTableTypeInfo);
        DataStreamSource<Row> itemStream = streamEnv.addSource(
                itemSourceFunction,
                itemTable,
                itemTableTypeInfo);

        Expression[] waybillFields = ExpressionParser
                .parseExpressionList(String.join(",", 
waybillTableTypeInfo.getFieldNames())
                        + ",proctime.proctime").toArray(new Expression[0]);
        Expression[] itemFields = ExpressionParser
                .parseExpressionList(
                        String.join(",", itemTableTypeInfo.getFieldNames()) + 
",proctime.proctime")
                .toArray(new Expression[0]);

        tableEnv.createTemporaryView(waybillTable, waybillStream, 
waybillFields);
        tableEnv.createTemporaryView(itemTable, itemStream, itemFields);

        String sql =
                "select \n"
                + "    city_id, \n"
                + "    count(*) as cnt\n"
                + "from (\n"
                + "    select id,city_id\n"
                + "    from (\n"
                + "        select \n"
                + "            id,\n"
                + "            city_id,\n"
                + "            row_number() over(partition by id order by utime 
desc ) as rno \n"
                + "        from (\n"
                + "            select \n"
                + "                waybill.id as id,\n"
                + "                coalesce(item.city_id, waybill.city_id) as 
city_id,\n"
                + "                waybill.utime as utime \n"
                + "            from waybill left join item \n"
                + "            on waybill.id = item.id \n"
                + "        ) \n"
                + "    )\n"
                + "    where rno =1\n"
                + ")\n"
                + "group by city_id";

        StatementSet statementSet = tableEnv.createStatementSet();
        Table table = tableEnv.sqlQuery(sql);
        DataStream<Tuple2<Boolean, Row>> rowDataStream = 
tableEnv.toRetractStream(table, Row.class);
        rowDataStream.printToErr();
        try {
            streamEnv.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static RowTypeInfo buildWaybillTableTypeInfo() {
        TypeInformation[] types = new TypeInformation[]{Types.INT(), 
Types.STRING(), Types.LONG(), Types.LONG()};
        String[] fields = new String[]{"id", "city_id", "rider_id", "utime"};
        return new RowTypeInfo(types, fields);
    }

    private static RowTypeInfo buildItemTableTypeInfo() {
        TypeInformation[] types = new TypeInformation[]{Types.INT(), 
Types.STRING(), Types.LONG()};
        String[] fields = new String[]{"id", "city_id", "utime"};
        return new RowTypeInfo(types, fields);
    }

    //id,rider_id,city_id,utime
    private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo 
rowTypeInfo) {
        return new SourceFunction<Row>() {
            private volatile boolean stopped = false;
            int count = 0;
            int[] ids = {111, 222, 333, 111};
            String[] cityIds = {"A", "A", "B", "A"};

            @Override
            public void run(SourceContext<Row> ctx) throws Exception {
                while (!stopped) {
                    int id = ids[count % ids.length];
                    String cityId = cityIds[count % cityIds.length];
                    Row row = new Row(4);
                    row.setField(0, id);
                    row.setField(1, cityId);
                    row.setField(2, (long) RandomUtils.nextInt(1000, 2000));
                    row.setField(3, System.currentTimeMillis());
                    printRow(rowTypeInfo, row);
                    ctx.collect(row);
                    if (++count > 3) {
                        stopped = true;
                    }
                }
            }

            @Override
            public void cancel() {
                stopped = true;
            }
        };
    }

    //id,city_id,utime
    private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo 
rowTypeInfo) {
        return new SourceFunction<Row>() {
            private volatile boolean stopped = false;
            int count = 0;
            int[] ids = {111, 333};
            String[] cityIds = {"C", "D"};

            @Override
            public void run(SourceContext<Row> ctx) throws Exception {
                while (!stopped) {
                    Thread.sleep(RandomUtils.nextInt(1000, 2000));
                    int id = ids[count % ids.length];
                    String cityId = cityIds[count % cityIds.length];
                    Row row = new Row(3);
                    row.setField(0, id);
                    row.setField(1, cityId);
                    //row.setField(2, System.currentTimeMillis());
                    printRow(rowTypeInfo, row);
                    ctx.collect(row);
                    if (++count >= 2) {
                        stopped = true;
                    }

                }
            }

            @Override
            public void cancel() {
                stopped = true;
            }
        };
    }

    public static void printRow(RowTypeInfo rowTypeInfo, Row row) {
        String prefix = "";
        for (int i = 0; i < rowTypeInfo.getArity(); ++i) {
            prefix = i > 0 ? "," : "";
            System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + 
row.getField(i));
        }
        System.out.println();
    }

{code}
------------------------------------------------------------
|*wrong result*||right result||
|id:111,city_id:A,rider_id:1137,utime:1650979957702
id:222,city_id:A,rider_id:1976,utime:1650979957725
id:333,city_id:B,rider_id:1916,utime:1650979957725
id:111,city_id:A,rider_id:1345,utime:1650979957725
(true,A,1)
(false,A,1)
(true,A,2)
(true,B,1)
(false,A,2)
(true,A,1)
(false,A,1)
(true,A,2)
id:111,city_id:C,utime:null
(false,A,2)
(true,A,1)
(true,C,1)
(false,A,1)
(false,C,1)
(true,C,2)
id:333,city_id: D,utime:null
(false,B,1)
(true,D,1)
The final result:
C,2
D,1
is wrong.| 
id:111,city_id:A,rider_id:1155,utime:1650980662019
id:222,city_id:A,rider_id:1875,utime:1650980662042
id:333,city_id:B,rider_id:1430,utime:1650980662042
id:111,city_id:A,rider_id:1308,utime:1650980662042
(true,A,1)
(false,A,1)
(true,A,2)
(true,B,1)
(false,A,2)
(true,A,1)
(false,A,1)
(true,A,2)
id:111,city_id:C,utime:null
(false,A,2)
(true,A,1)
(false,A,1)
(true,A,2)
(false,A,2)
(true,A,1)
(true,C,1)
id:333,city_id: D,utime:null
(false,B,1)
(true,D,1)
The final result:
A,1
C,2
D,1
is right.|

 

 


> Flink SQL TopN result is wrong
> ------------------------------
>
>                 Key: FLINK-27418
>                 URL: https://issues.apache.org/jira/browse/FLINK-27418
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.12.2, 1.14.3
>         Environment: Flink 1.12.2 and Flink 1.14.3 test results are sometimes 
> wrong
>            Reporter: zhangbin
>            Priority: Major
>
> Flink SQL TopN is executed multiple times with different results, sometimes 
> with correct results and sometimes with incorrect results.
> Example:
> {code:java}
> @Test
>     public void flinkSqlJoinRetract() {
>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
>         StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         streamEnv.setParallelism(1);
>         StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(streamEnv, settings);
>         tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000));
>         RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo();
>         RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo();
>         SourceFunction<Row> waybillSourceFunction = 
> buildWaybillStreamSource(waybillTableTypeInfo);
>         SourceFunction<Row> itemSourceFunction = 
> buildItemStreamSource(itemTableTypeInfo);
>         String waybillTable = "waybill";
>         String itemTable = "item";
>         DataStreamSource<Row> waybillStream = streamEnv.addSource(
>                 waybillSourceFunction,
>                 waybillTable,
>                 waybillTableTypeInfo);
>         DataStreamSource<Row> itemStream = streamEnv.addSource(
>                 itemSourceFunction,
>                 itemTable,
>                 itemTableTypeInfo);
>         Expression[] waybillFields = ExpressionParser
>                 .parseExpressionList(String.join(",", 
> waybillTableTypeInfo.getFieldNames())
>                         + ",proctime.proctime").toArray(new Expression[0]);
>         Expression[] itemFields = ExpressionParser
>                 .parseExpressionList(
>                         String.join(",", itemTableTypeInfo.getFieldNames()) + 
> ",proctime.proctime")
>                 .toArray(new Expression[0]);
>         tableEnv.createTemporaryView(waybillTable, waybillStream, 
> waybillFields);
>         tableEnv.createTemporaryView(itemTable, itemStream, itemFields);
>         String sql =
>                 "select \n"
>                 + "    city_id, \n"
>                 + "    count(*) as cnt\n"
>                 + "from (\n"
>                 + "    select id,city_id\n"
>                 + "    from (\n"
>                 + "        select \n"
>                 + "            id,\n"
>                 + "            city_id,\n"
>                 + "            row_number() over(partition by id order by 
> utime desc ) as rno \n"
>                 + "        from (\n"
>                 + "            select \n"
>                 + "                waybill.id as id,\n"
>                 + "                coalesce(item.city_id, waybill.city_id) as 
> city_id,\n"
>                 + "                waybill.utime as utime \n"
>                 + "            from waybill left join item \n"
>                 + "            on waybill.id = item.id \n"
>                 + "        ) \n"
>                 + "    )\n"
>                 + "    where rno =1\n"
>                 + ")\n"
>                 + "group by city_id";
>         StatementSet statementSet = tableEnv.createStatementSet();
>         Table table = tableEnv.sqlQuery(sql);
>         DataStream<Tuple2<Boolean, Row>> rowDataStream = 
> tableEnv.toRetractStream(table, Row.class);
>         rowDataStream.printToErr();
>         try {
>             streamEnv.execute();
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>     }
>     private static RowTypeInfo buildWaybillTableTypeInfo() {
>         TypeInformation[] types = new TypeInformation[]{Types.INT(), 
> Types.STRING(), Types.LONG(), Types.LONG()};
>         String[] fields = new String[]{"id", "city_id", "rider_id", "utime"};
>         return new RowTypeInfo(types, fields);
>     }
>     private static RowTypeInfo buildItemTableTypeInfo() {
>         TypeInformation[] types = new TypeInformation[]{Types.INT(), 
> Types.STRING(), Types.LONG()};
>         String[] fields = new String[]{"id", "city_id", "utime"};
>         return new RowTypeInfo(types, fields);
>     }
>     //id,rider_id,city_id,utime
>     private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo 
> rowTypeInfo) {
>         return new SourceFunction<Row>() {
>             private volatile boolean stopped = false;
>             int count = 0;
>             int[] ids = {111, 222, 333, 111};
>             String[] cityIds = {"A", "A", "B", "A"};
>             @Override
>             public void run(SourceContext<Row> ctx) throws Exception {
>                 while (!stopped) {
>                     int id = ids[count % ids.length];
>                     String cityId = cityIds[count % cityIds.length];
>                     Row row = new Row(4);
>                     row.setField(0, id);
>                     row.setField(1, cityId);
>                     row.setField(2, (long) RandomUtils.nextInt(1000, 2000));
>                     row.setField(3, System.currentTimeMillis());
>                     printRow(rowTypeInfo, row);
>                     ctx.collect(row);
>                     if (++count > 3) {
>                         stopped = true;
>                     }
>                 }
>             }
>             @Override
>             public void cancel() {
>                 stopped = true;
>             }
>         };
>     }
>     //id,city_id,utime
>     private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo 
> rowTypeInfo) {
>         return new SourceFunction<Row>() {
>             private volatile boolean stopped = false;
>             int count = 0;
>             int[] ids = {111, 333};
>             String[] cityIds = {"C", "D"};
>             @Override
>             public void run(SourceContext<Row> ctx) throws Exception {
>                 while (!stopped) {
>                     Thread.sleep(RandomUtils.nextInt(1000, 2000));
>                     int id = ids[count % ids.length];
>                     String cityId = cityIds[count % cityIds.length];
>                     Row row = new Row(3);
>                     row.setField(0, id);
>                     row.setField(1, cityId);
>                     //row.setField(2, System.currentTimeMillis());
>                     printRow(rowTypeInfo, row);
>                     ctx.collect(row);
>                     if (++count >= 2) {
>                         stopped = true;
>                     }
>                 }
>             }
>             @Override
>             public void cancel() {
>                 stopped = true;
>             }
>         };
>     }
>     public static void printRow(RowTypeInfo rowTypeInfo, Row row) {
>         String prefix = "";
>         for (int i = 0; i < rowTypeInfo.getArity(); ++i) {
>             prefix = i > 0 ? "," : "";
>             System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + 
> row.getField(i));
>         }
>         System.out.println();
>     }
> {code}
> ------------------------------------------------------------
> |*wrong result*||right result||
> |id:111,city_id:A,rider_id:1137,utime:1650979957702
> id:222,city_id:A,rider_id:1976,utime:1650979957725
> id:333,city_id:B,rider_id:1916,utime:1650979957725
> id:111,city_id:A,rider_id:1345,utime:1650979957725
> (true,A,1)
> (false,A,1)
> (true,A,2)
> (true,B,1)
> (false,A,2)
> (true,A,1)
> (false,A,1)
> (true,A,2)
> id:111,city_id:C,utime:null
> (false,A,2)
> (true,A,1)
> (true,C,1)
> (false,A,1)
> (false,C,1)
> (true,C,2)
> id:333,city_id: D,utime:null
> (false,B,1)
> (true,D,1)
> The final result:
> C,2
> D,1
> is wrong.| 
> id:111,city_id:A,rider_id:1155,utime:1650980662019
> id:222,city_id:A,rider_id:1875,utime:1650980662042
> id:333,city_id:B,rider_id:1430,utime:1650980662042
> id:111,city_id:A,rider_id:1308,utime:1650980662042
> (true,A,1)
> (false,A,1)
> (true,A,2)
> (true,B,1)
> (false,A,2)
> (true,A,1)
> (false,A,1)
> (true,A,2)
> id:111,city_id:C,utime:null
> (false,A,2)
> (true,A,1)
> (false,A,1)
> (true,A,2)
> (false,A,2)
> (true,A,1)
> (true,C,1)
> id:333,city_id: D,utime:null
> (false,B,1)
> (true,D,1)
> The final result:
> A,1
> C,1
> D,1
> is right.|
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to