[ https://issues.apache.org/jira/browse/FLINK-27418?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528202#comment-17528202 ]
zhangbin commented on FLINK-27418: ---------------------------------- Hi,[~jark] ,I see you've fixed several topn bugs. I have found the cause of this wrong result, and I hope to discuss it with you. Thanks. > Flink SQL TopN result is wrong > ------------------------------ > > Key: FLINK-27418 > URL: https://issues.apache.org/jira/browse/FLINK-27418 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.12.2, 1.14.3 > Environment: Flink 1.12.2 and Flink 1.14.3 test results are sometimes > wrong > Reporter: zhangbin > Priority: Major > > Flink SQL TopN is executed multiple times with different results, sometimes > with correct results and sometimes with incorrect results. > Example: > {code:java} > @Test > public void flinkSqlJoinRetract() { > EnvironmentSettings settings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > streamEnv.setParallelism(1); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(streamEnv, settings); > tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000)); > RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo(); > RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo(); > SourceFunction<Row> waybillSourceFunction = > buildWaybillStreamSource(waybillTableTypeInfo); > SourceFunction<Row> itemSourceFunction = > buildItemStreamSource(itemTableTypeInfo); > String waybillTable = "waybill"; > String itemTable = "item"; > DataStreamSource<Row> waybillStream = streamEnv.addSource( > waybillSourceFunction, > waybillTable, > waybillTableTypeInfo); > DataStreamSource<Row> itemStream = streamEnv.addSource( > itemSourceFunction, > itemTable, > itemTableTypeInfo); > Expression[] waybillFields = ExpressionParser > .parseExpressionList(String.join(",", > waybillTableTypeInfo.getFieldNames()) > + ",proctime.proctime").toArray(new Expression[0]); > Expression[] itemFields = ExpressionParser > .parseExpressionList( > String.join(",", itemTableTypeInfo.getFieldNames()) + > ",proctime.proctime") > .toArray(new Expression[0]); > tableEnv.createTemporaryView(waybillTable, waybillStream, > waybillFields); > tableEnv.createTemporaryView(itemTable, itemStream, itemFields); > String sql = > "select \n" > + " city_id, \n" > + " count(*) as cnt\n" > + "from (\n" > + " select id,city_id\n" > + " from (\n" > + " select \n" > + " id,\n" > + " city_id,\n" > + " row_number() over(partition by id order by > utime desc ) as rno \n" > + " from (\n" > + " select \n" > + " waybill.id as id,\n" > + " coalesce(item.city_id, waybill.city_id) as > city_id,\n" > + " waybill.utime as utime \n" > + " from waybill left join item \n" > + " on waybill.id = item.id \n" > + " ) \n" > + " )\n" > + " where rno =1\n" > + ")\n" > + "group by city_id"; > StatementSet statementSet = tableEnv.createStatementSet(); > Table table = tableEnv.sqlQuery(sql); > DataStream<Tuple2<Boolean, Row>> rowDataStream = > tableEnv.toRetractStream(table, Row.class); > rowDataStream.printToErr(); > try { > streamEnv.execute(); > } catch (Exception e) { > e.printStackTrace(); > } > } > private static RowTypeInfo buildWaybillTableTypeInfo() { > TypeInformation[] types = new TypeInformation[]{Types.INT(), > Types.STRING(), Types.LONG(), Types.LONG()}; > String[] fields = new String[]{"id", "city_id", "rider_id", "utime"}; > return new RowTypeInfo(types, fields); > } > private static RowTypeInfo buildItemTableTypeInfo() { > TypeInformation[] types = new TypeInformation[]{Types.INT(), > Types.STRING(), Types.LONG()}; > String[] fields = new String[]{"id", "city_id", "utime"}; > return new RowTypeInfo(types, fields); > } > //id,rider_id,city_id,utime > private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo > rowTypeInfo) { > return new SourceFunction<Row>() { > private volatile boolean stopped = false; > int count = 0; > int[] ids = {111, 222, 333, 111}; > String[] cityIds = {"A", "A", "B", "A"}; > @Override > public void run(SourceContext<Row> ctx) throws Exception { > while (!stopped) { > int id = ids[count % ids.length]; > String cityId = cityIds[count % cityIds.length]; > Row row = new Row(4); > row.setField(0, id); > row.setField(1, cityId); > row.setField(2, (long) RandomUtils.nextInt(1000, 2000)); > row.setField(3, System.currentTimeMillis()); > printRow(rowTypeInfo, row); > ctx.collect(row); > if (++count > 3) { > stopped = true; > } > } > } > @Override > public void cancel() { > stopped = true; > } > }; > } > //id,city_id,utime > private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo > rowTypeInfo) { > return new SourceFunction<Row>() { > private volatile boolean stopped = false; > int count = 0; > int[] ids = {111, 333}; > String[] cityIds = {"C", "D"}; > @Override > public void run(SourceContext<Row> ctx) throws Exception { > while (!stopped) { > Thread.sleep(RandomUtils.nextInt(1000, 2000)); > int id = ids[count % ids.length]; > String cityId = cityIds[count % cityIds.length]; > Row row = new Row(3); > row.setField(0, id); > row.setField(1, cityId); > //row.setField(2, System.currentTimeMillis()); > printRow(rowTypeInfo, row); > ctx.collect(row); > if (++count >= 2) { > stopped = true; > } > } > } > @Override > public void cancel() { > stopped = true; > } > }; > } > public static void printRow(RowTypeInfo rowTypeInfo, Row row) { > String prefix = ""; > for (int i = 0; i < rowTypeInfo.getArity(); ++i) { > prefix = i > 0 ? "," : ""; > System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + > row.getField(i)); > } > System.out.println(); > } > {code} > ------------------------------------------------------------ > |*wrong result*||right result|| > |id:111,city_id:A,rider_id:1137,utime:1650979957702 > id:222,city_id:A,rider_id:1976,utime:1650979957725 > id:333,city_id:B,rider_id:1916,utime:1650979957725 > id:111,city_id:A,rider_id:1345,utime:1650979957725 > (true,A,1) > (false,A,1) > (true,A,2) > (true,B,1) > (false,A,2) > (true,A,1) > (false,A,1) > (true,A,2) > id:111,city_id:C,utime:null > (false,A,2) > (true,A,1) > (true,C,1) > (false,A,1) > (false,C,1) > (true,C,2) > id:333,city_id: D,utime:null > (false,B,1) > (true,D,1) > The final result: > C,2 > D,1 > is wrong.| > id:111,city_id:A,rider_id:1155,utime:1650980662019 > id:222,city_id:A,rider_id:1875,utime:1650980662042 > id:333,city_id:B,rider_id:1430,utime:1650980662042 > id:111,city_id:A,rider_id:1308,utime:1650980662042 > (true,A,1) > (false,A,1) > (true,A,2) > (true,B,1) > (false,A,2) > (true,A,1) > (false,A,1) > (true,A,2) > id:111,city_id:C,utime:null > (false,A,2) > (true,A,1) > (false,A,1) > (true,A,2) > (false,A,2) > (true,A,1) > (true,C,1) > id:333,city_id: D,utime:null > (false,B,1) > (true,D,1) > The final result: > A,1 > C,2 > D,1 > is right.| > > -- This message was sent by Atlassian Jira (v8.20.7#820007)