[ 
https://issues.apache.org/jira/browse/FLINK-27418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528202#comment-17528202
 ] 

zhangbin commented on FLINK-27418:
----------------------------------

Hi,[~jark] ,I see you've fixed several topn bugs.
I have found the cause of this wrong result, and I hope to discuss it with you.

Thanks.

> Flink SQL TopN result is wrong
> ------------------------------
>
>                 Key: FLINK-27418
>                 URL: https://issues.apache.org/jira/browse/FLINK-27418
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>    Affects Versions: 1.12.2, 1.14.3
>         Environment: Flink 1.12.2 and Flink 1.14.3 test results are sometimes 
> wrong
>            Reporter: zhangbin
>            Priority: Major
>
> Flink SQL TopN is executed multiple times with different results, sometimes 
> with correct results and sometimes with incorrect results.
> Example:
> {code:java}
> @Test
>     public void flinkSqlJoinRetract() {
>         EnvironmentSettings settings = EnvironmentSettings.newInstance()
>                 .useBlinkPlanner()
>                 .inStreamingMode()
>                 .build();
>         StreamExecutionEnvironment streamEnv = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         streamEnv.setParallelism(1);
>         StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(streamEnv, settings);
>         tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000));
>         RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo();
>         RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo();
>         SourceFunction<Row> waybillSourceFunction = 
> buildWaybillStreamSource(waybillTableTypeInfo);
>         SourceFunction<Row> itemSourceFunction = 
> buildItemStreamSource(itemTableTypeInfo);
>         String waybillTable = "waybill";
>         String itemTable = "item";
>         DataStreamSource<Row> waybillStream = streamEnv.addSource(
>                 waybillSourceFunction,
>                 waybillTable,
>                 waybillTableTypeInfo);
>         DataStreamSource<Row> itemStream = streamEnv.addSource(
>                 itemSourceFunction,
>                 itemTable,
>                 itemTableTypeInfo);
>         Expression[] waybillFields = ExpressionParser
>                 .parseExpressionList(String.join(",", 
> waybillTableTypeInfo.getFieldNames())
>                         + ",proctime.proctime").toArray(new Expression[0]);
>         Expression[] itemFields = ExpressionParser
>                 .parseExpressionList(
>                         String.join(",", itemTableTypeInfo.getFieldNames()) + 
> ",proctime.proctime")
>                 .toArray(new Expression[0]);
>         tableEnv.createTemporaryView(waybillTable, waybillStream, 
> waybillFields);
>         tableEnv.createTemporaryView(itemTable, itemStream, itemFields);
>         String sql =
>                 "select \n"
>                 + "    city_id, \n"
>                 + "    count(*) as cnt\n"
>                 + "from (\n"
>                 + "    select id,city_id\n"
>                 + "    from (\n"
>                 + "        select \n"
>                 + "            id,\n"
>                 + "            city_id,\n"
>                 + "            row_number() over(partition by id order by 
> utime desc ) as rno \n"
>                 + "        from (\n"
>                 + "            select \n"
>                 + "                waybill.id as id,\n"
>                 + "                coalesce(item.city_id, waybill.city_id) as 
> city_id,\n"
>                 + "                waybill.utime as utime \n"
>                 + "            from waybill left join item \n"
>                 + "            on waybill.id = item.id \n"
>                 + "        ) \n"
>                 + "    )\n"
>                 + "    where rno =1\n"
>                 + ")\n"
>                 + "group by city_id";
>         StatementSet statementSet = tableEnv.createStatementSet();
>         Table table = tableEnv.sqlQuery(sql);
>         DataStream<Tuple2<Boolean, Row>> rowDataStream = 
> tableEnv.toRetractStream(table, Row.class);
>         rowDataStream.printToErr();
>         try {
>             streamEnv.execute();
>         } catch (Exception e) {
>             e.printStackTrace();
>         }
>     }
>     private static RowTypeInfo buildWaybillTableTypeInfo() {
>         TypeInformation[] types = new TypeInformation[]{Types.INT(), 
> Types.STRING(), Types.LONG(), Types.LONG()};
>         String[] fields = new String[]{"id", "city_id", "rider_id", "utime"};
>         return new RowTypeInfo(types, fields);
>     }
>     private static RowTypeInfo buildItemTableTypeInfo() {
>         TypeInformation[] types = new TypeInformation[]{Types.INT(), 
> Types.STRING(), Types.LONG()};
>         String[] fields = new String[]{"id", "city_id", "utime"};
>         return new RowTypeInfo(types, fields);
>     }
>     //id,rider_id,city_id,utime
>     private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo 
> rowTypeInfo) {
>         return new SourceFunction<Row>() {
>             private volatile boolean stopped = false;
>             int count = 0;
>             int[] ids = {111, 222, 333, 111};
>             String[] cityIds = {"A", "A", "B", "A"};
>             @Override
>             public void run(SourceContext<Row> ctx) throws Exception {
>                 while (!stopped) {
>                     int id = ids[count % ids.length];
>                     String cityId = cityIds[count % cityIds.length];
>                     Row row = new Row(4);
>                     row.setField(0, id);
>                     row.setField(1, cityId);
>                     row.setField(2, (long) RandomUtils.nextInt(1000, 2000));
>                     row.setField(3, System.currentTimeMillis());
>                     printRow(rowTypeInfo, row);
>                     ctx.collect(row);
>                     if (++count > 3) {
>                         stopped = true;
>                     }
>                 }
>             }
>             @Override
>             public void cancel() {
>                 stopped = true;
>             }
>         };
>     }
>     //id,city_id,utime
>     private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo 
> rowTypeInfo) {
>         return new SourceFunction<Row>() {
>             private volatile boolean stopped = false;
>             int count = 0;
>             int[] ids = {111, 333};
>             String[] cityIds = {"C", "D"};
>             @Override
>             public void run(SourceContext<Row> ctx) throws Exception {
>                 while (!stopped) {
>                     Thread.sleep(RandomUtils.nextInt(1000, 2000));
>                     int id = ids[count % ids.length];
>                     String cityId = cityIds[count % cityIds.length];
>                     Row row = new Row(3);
>                     row.setField(0, id);
>                     row.setField(1, cityId);
>                     //row.setField(2, System.currentTimeMillis());
>                     printRow(rowTypeInfo, row);
>                     ctx.collect(row);
>                     if (++count >= 2) {
>                         stopped = true;
>                     }
>                 }
>             }
>             @Override
>             public void cancel() {
>                 stopped = true;
>             }
>         };
>     }
>     public static void printRow(RowTypeInfo rowTypeInfo, Row row) {
>         String prefix = "";
>         for (int i = 0; i < rowTypeInfo.getArity(); ++i) {
>             prefix = i > 0 ? "," : "";
>             System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + 
> row.getField(i));
>         }
>         System.out.println();
>     }
> {code}
> ------------------------------------------------------------
> |*wrong result*||right result||
> |id:111,city_id:A,rider_id:1137,utime:1650979957702
> id:222,city_id:A,rider_id:1976,utime:1650979957725
> id:333,city_id:B,rider_id:1916,utime:1650979957725
> id:111,city_id:A,rider_id:1345,utime:1650979957725
> (true,A,1)
> (false,A,1)
> (true,A,2)
> (true,B,1)
> (false,A,2)
> (true,A,1)
> (false,A,1)
> (true,A,2)
> id:111,city_id:C,utime:null
> (false,A,2)
> (true,A,1)
> (true,C,1)
> (false,A,1)
> (false,C,1)
> (true,C,2)
> id:333,city_id: D,utime:null
> (false,B,1)
> (true,D,1)
> The final result:
> C,2
> D,1
> is wrong.| 
> id:111,city_id:A,rider_id:1155,utime:1650980662019
> id:222,city_id:A,rider_id:1875,utime:1650980662042
> id:333,city_id:B,rider_id:1430,utime:1650980662042
> id:111,city_id:A,rider_id:1308,utime:1650980662042
> (true,A,1)
> (false,A,1)
> (true,A,2)
> (true,B,1)
> (false,A,2)
> (true,A,1)
> (false,A,1)
> (true,A,2)
> id:111,city_id:C,utime:null
> (false,A,2)
> (true,A,1)
> (false,A,1)
> (true,A,2)
> (false,A,2)
> (true,A,1)
> (true,C,1)
> id:333,city_id: D,utime:null
> (false,B,1)
> (true,D,1)
> The final result:
> A,1
> C,2
> D,1
> is right.|
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to