[ https://issues.apache.org/jira/browse/FLINK-27418?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
zhangbin updated FLINK-27418: ----------------------------- Description: Flink SQL TopN is executed multiple times with different results, sometimes with correct results and sometimes with incorrect results. Example: {code:java} @Test public void flinkSqlJoinRetract() { EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamEnv.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings); tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000)); RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo(); RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo(); SourceFunction<Row> waybillSourceFunction = buildWaybillStreamSource(waybillTableTypeInfo); SourceFunction<Row> itemSourceFunction = buildItemStreamSource(itemTableTypeInfo); String waybillTable = "waybill"; String itemTable = "item"; DataStreamSource<Row> waybillStream = streamEnv.addSource( waybillSourceFunction, waybillTable, waybillTableTypeInfo); DataStreamSource<Row> itemStream = streamEnv.addSource( itemSourceFunction, itemTable, itemTableTypeInfo); Expression[] waybillFields = ExpressionParser .parseExpressionList(String.join(",", waybillTableTypeInfo.getFieldNames()) + ",proctime.proctime").toArray(new Expression[0]); Expression[] itemFields = ExpressionParser .parseExpressionList( String.join(",", itemTableTypeInfo.getFieldNames()) + ",proctime.proctime") .toArray(new Expression[0]); tableEnv.createTemporaryView(waybillTable, waybillStream, waybillFields); tableEnv.createTemporaryView(itemTable, itemStream, itemFields); String sql = "select \n" + " city_id, \n" + " count(*) as cnt\n" + "from (\n" + " select id,city_id\n" + " from (\n" + " select \n" + " id,\n" + " city_id,\n" + " row_number() over(partition by id order by utime desc ) as rno \n" + " from (\n" + " select \n" + " waybill.id as id,\n" + " coalesce(item.city_id, waybill.city_id) as city_id,\n" + " waybill.utime as utime \n" + " from waybill left join item \n" + " on waybill.id = item.id \n" + " ) \n" + " )\n" + " where rno =1\n" + ")\n" + "group by city_id"; StatementSet statementSet = tableEnv.createStatementSet(); Table table = tableEnv.sqlQuery(sql); DataStream<Tuple2<Boolean, Row>> rowDataStream = tableEnv.toRetractStream(table, Row.class); rowDataStream.printToErr(); try { streamEnv.execute(); } catch (Exception e) { e.printStackTrace(); } } private static RowTypeInfo buildWaybillTableTypeInfo() { TypeInformation[] types = new TypeInformation[]{Types.INT(), Types.STRING(), Types.LONG(), Types.LONG()}; String[] fields = new String[]{"id", "city_id", "rider_id", "utime"}; return new RowTypeInfo(types, fields); } private static RowTypeInfo buildItemTableTypeInfo() { TypeInformation[] types = new TypeInformation[]{Types.INT(), Types.STRING(), Types.LONG()}; String[] fields = new String[]{"id", "city_id", "utime"}; return new RowTypeInfo(types, fields); } //id,rider_id,city_id,utime private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo rowTypeInfo) { return new SourceFunction<Row>() { private volatile boolean stopped = false; int count = 0; int[] ids = {111, 222, 333, 111}; String[] cityIds = {"A", "A", "B", "A"}; @Override public void run(SourceContext<Row> ctx) throws Exception { while (!stopped) { int id = ids[count % ids.length]; String cityId = cityIds[count % cityIds.length]; Row row = new Row(4); row.setField(0, id); row.setField(1, cityId); row.setField(2, (long) RandomUtils.nextInt(1000, 2000)); row.setField(3, System.currentTimeMillis()); printRow(rowTypeInfo, row); ctx.collect(row); if (++count > 3) { stopped = true; } } } @Override public void cancel() { stopped = true; } }; } //id,city_id,utime private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo rowTypeInfo) { return new SourceFunction<Row>() { private volatile boolean stopped = false; int count = 0; int[] ids = {111, 333}; String[] cityIds = {"C", "D"}; @Override public void run(SourceContext<Row> ctx) throws Exception { while (!stopped) { Thread.sleep(RandomUtils.nextInt(1000, 2000)); int id = ids[count % ids.length]; String cityId = cityIds[count % cityIds.length]; Row row = new Row(3); row.setField(0, id); row.setField(1, cityId); //row.setField(2, System.currentTimeMillis()); printRow(rowTypeInfo, row); ctx.collect(row); if (++count >= 2) { stopped = true; } } } @Override public void cancel() { stopped = true; } }; } public static void printRow(RowTypeInfo rowTypeInfo, Row row) { String prefix = ""; for (int i = 0; i < rowTypeInfo.getArity(); ++i) { prefix = i > 0 ? "," : ""; System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + row.getField(i)); } System.out.println(); } {code} ------------------------------------------------------------ |*wrong result*||right result|| |id:111,city_id:A,rider_id:1137,utime:1650979957702 id:222,city_id:A,rider_id:1976,utime:1650979957725 id:333,city_id:B,rider_id:1916,utime:1650979957725 id:111,city_id:A,rider_id:1345,utime:1650979957725 (true,A,1) (false,A,1) (true,A,2) (true,B,1) (false,A,2) (true,A,1) (false,A,1) (true,A,2) id:111,city_id:C,utime:null (false,A,2) (true,A,1) (true,C,1) (false,A,1) (false,C,1) (true,C,2) id:333,city_id: D,utime:null (false,B,1) (true,D,1) The final result: C,2 D,1 is wrong.| id:111,city_id:A,rider_id:1155,utime:1650980662019 id:222,city_id:A,rider_id:1875,utime:1650980662042 id:333,city_id:B,rider_id:1430,utime:1650980662042 id:111,city_id:A,rider_id:1308,utime:1650980662042 (true,A,1) (false,A,1) (true,A,2) (true,B,1) (false,A,2) (true,A,1) (false,A,1) (true,A,2) id:111,city_id:C,utime:null (false,A,2) (true,A,1) (false,A,1) (true,A,2) (false,A,2) (true,A,1) (true,C,1) id:333,city_id: D,utime:null (false,B,1) (true,D,1) The final result: A,1 C,1 D,1 is right.| was: Flink SQL TopN is executed multiple times with different results, sometimes with correct results and sometimes with incorrect results. Example: {code:java} @Test public void flinkSqlJoinRetract() { EnvironmentSettings settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); streamEnv.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings); tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000)); RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo(); RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo(); SourceFunction<Row> waybillSourceFunction = buildWaybillStreamSource(waybillTableTypeInfo); SourceFunction<Row> itemSourceFunction = buildItemStreamSource(itemTableTypeInfo); String waybillTable = "waybill"; String itemTable = "item"; DataStreamSource<Row> waybillStream = streamEnv.addSource( waybillSourceFunction, waybillTable, waybillTableTypeInfo); DataStreamSource<Row> itemStream = streamEnv.addSource( itemSourceFunction, itemTable, itemTableTypeInfo); Expression[] waybillFields = ExpressionParser .parseExpressionList(String.join(",", waybillTableTypeInfo.getFieldNames()) + ",proctime.proctime").toArray(new Expression[0]); Expression[] itemFields = ExpressionParser .parseExpressionList( String.join(",", itemTableTypeInfo.getFieldNames()) + ",proctime.proctime") .toArray(new Expression[0]); tableEnv.createTemporaryView(waybillTable, waybillStream, waybillFields); tableEnv.createTemporaryView(itemTable, itemStream, itemFields); String sql = "select \n" + " city_id, \n" + " count(*) as cnt\n" + "from (\n" + " select id,city_id\n" + " from (\n" + " select \n" + " id,\n" + " city_id,\n" + " row_number() over(partition by id order by utime desc ) as rno \n" + " from (\n" + " select \n" + " waybill.id as id,\n" + " coalesce(item.city_id, waybill.city_id) as city_id,\n" + " waybill.utime as utime \n" + " from waybill left join item \n" + " on waybill.id = item.id \n" + " ) \n" + " )\n" + " where rno =1\n" + ")\n" + "group by city_id"; StatementSet statementSet = tableEnv.createStatementSet(); Table table = tableEnv.sqlQuery(sql); DataStream<Tuple2<Boolean, Row>> rowDataStream = tableEnv.toRetractStream(table, Row.class); rowDataStream.printToErr(); try { streamEnv.execute(); } catch (Exception e) { e.printStackTrace(); } } private static RowTypeInfo buildWaybillTableTypeInfo() { TypeInformation[] types = new TypeInformation[]{Types.INT(), Types.STRING(), Types.LONG(), Types.LONG()}; String[] fields = new String[]{"id", "city_id", "rider_id", "utime"}; return new RowTypeInfo(types, fields); } private static RowTypeInfo buildItemTableTypeInfo() { TypeInformation[] types = new TypeInformation[]{Types.INT(), Types.STRING(), Types.LONG()}; String[] fields = new String[]{"id", "city_id", "utime"}; return new RowTypeInfo(types, fields); } //id,rider_id,city_id,utime private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo rowTypeInfo) { return new SourceFunction<Row>() { private volatile boolean stopped = false; int count = 0; int[] ids = {111, 222, 333, 111}; String[] cityIds = {"A", "A", "B", "A"}; @Override public void run(SourceContext<Row> ctx) throws Exception { while (!stopped) { int id = ids[count % ids.length]; String cityId = cityIds[count % cityIds.length]; Row row = new Row(4); row.setField(0, id); row.setField(1, cityId); row.setField(2, (long) RandomUtils.nextInt(1000, 2000)); row.setField(3, System.currentTimeMillis()); printRow(rowTypeInfo, row); ctx.collect(row); if (++count > 3) { stopped = true; } } } @Override public void cancel() { stopped = true; } }; } //id,city_id,utime private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo rowTypeInfo) { return new SourceFunction<Row>() { private volatile boolean stopped = false; int count = 0; int[] ids = {111, 333}; String[] cityIds = {"C", "D"}; @Override public void run(SourceContext<Row> ctx) throws Exception { while (!stopped) { Thread.sleep(RandomUtils.nextInt(1000, 2000)); int id = ids[count % ids.length]; String cityId = cityIds[count % cityIds.length]; Row row = new Row(3); row.setField(0, id); row.setField(1, cityId); //row.setField(2, System.currentTimeMillis()); printRow(rowTypeInfo, row); ctx.collect(row); if (++count >= 2) { stopped = true; } } } @Override public void cancel() { stopped = true; } }; } public static void printRow(RowTypeInfo rowTypeInfo, Row row) { String prefix = ""; for (int i = 0; i < rowTypeInfo.getArity(); ++i) { prefix = i > 0 ? "," : ""; System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + row.getField(i)); } System.out.println(); } {code} ------------------------------------------------------------ |*wrong result*||right result|| |id:111,city_id:A,rider_id:1137,utime:1650979957702 id:222,city_id:A,rider_id:1976,utime:1650979957725 id:333,city_id:B,rider_id:1916,utime:1650979957725 id:111,city_id:A,rider_id:1345,utime:1650979957725 (true,A,1) (false,A,1) (true,A,2) (true,B,1) (false,A,2) (true,A,1) (false,A,1) (true,A,2) id:111,city_id:C,utime:null (false,A,2) (true,A,1) (true,C,1) (false,A,1) (false,C,1) (true,C,2) id:333,city_id: D,utime:null (false,B,1) (true,D,1) The final result: C,2 D,1 is wrong.| id:111,city_id:A,rider_id:1155,utime:1650980662019 id:222,city_id:A,rider_id:1875,utime:1650980662042 id:333,city_id:B,rider_id:1430,utime:1650980662042 id:111,city_id:A,rider_id:1308,utime:1650980662042 (true,A,1) (false,A,1) (true,A,2) (true,B,1) (false,A,2) (true,A,1) (false,A,1) (true,A,2) id:111,city_id:C,utime:null (false,A,2) (true,A,1) (false,A,1) (true,A,2) (false,A,2) (true,A,1) (true,C,1) id:333,city_id: D,utime:null (false,B,1) (true,D,1) The final result: A,1 C,2 D,1 is right.| > Flink SQL TopN result is wrong > ------------------------------ > > Key: FLINK-27418 > URL: https://issues.apache.org/jira/browse/FLINK-27418 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.12.2, 1.14.3 > Environment: Flink 1.12.2 and Flink 1.14.3 test results are sometimes > wrong > Reporter: zhangbin > Priority: Major > > Flink SQL TopN is executed multiple times with different results, sometimes > with correct results and sometimes with incorrect results. > Example: > {code:java} > @Test > public void flinkSqlJoinRetract() { > EnvironmentSettings settings = EnvironmentSettings.newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build(); > StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > streamEnv.setParallelism(1); > StreamTableEnvironment tableEnv = > StreamTableEnvironment.create(streamEnv, settings); > tableEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(10000)); > RowTypeInfo waybillTableTypeInfo = buildWaybillTableTypeInfo(); > RowTypeInfo itemTableTypeInfo = buildItemTableTypeInfo(); > SourceFunction<Row> waybillSourceFunction = > buildWaybillStreamSource(waybillTableTypeInfo); > SourceFunction<Row> itemSourceFunction = > buildItemStreamSource(itemTableTypeInfo); > String waybillTable = "waybill"; > String itemTable = "item"; > DataStreamSource<Row> waybillStream = streamEnv.addSource( > waybillSourceFunction, > waybillTable, > waybillTableTypeInfo); > DataStreamSource<Row> itemStream = streamEnv.addSource( > itemSourceFunction, > itemTable, > itemTableTypeInfo); > Expression[] waybillFields = ExpressionParser > .parseExpressionList(String.join(",", > waybillTableTypeInfo.getFieldNames()) > + ",proctime.proctime").toArray(new Expression[0]); > Expression[] itemFields = ExpressionParser > .parseExpressionList( > String.join(",", itemTableTypeInfo.getFieldNames()) + > ",proctime.proctime") > .toArray(new Expression[0]); > tableEnv.createTemporaryView(waybillTable, waybillStream, > waybillFields); > tableEnv.createTemporaryView(itemTable, itemStream, itemFields); > String sql = > "select \n" > + " city_id, \n" > + " count(*) as cnt\n" > + "from (\n" > + " select id,city_id\n" > + " from (\n" > + " select \n" > + " id,\n" > + " city_id,\n" > + " row_number() over(partition by id order by > utime desc ) as rno \n" > + " from (\n" > + " select \n" > + " waybill.id as id,\n" > + " coalesce(item.city_id, waybill.city_id) as > city_id,\n" > + " waybill.utime as utime \n" > + " from waybill left join item \n" > + " on waybill.id = item.id \n" > + " ) \n" > + " )\n" > + " where rno =1\n" > + ")\n" > + "group by city_id"; > StatementSet statementSet = tableEnv.createStatementSet(); > Table table = tableEnv.sqlQuery(sql); > DataStream<Tuple2<Boolean, Row>> rowDataStream = > tableEnv.toRetractStream(table, Row.class); > rowDataStream.printToErr(); > try { > streamEnv.execute(); > } catch (Exception e) { > e.printStackTrace(); > } > } > private static RowTypeInfo buildWaybillTableTypeInfo() { > TypeInformation[] types = new TypeInformation[]{Types.INT(), > Types.STRING(), Types.LONG(), Types.LONG()}; > String[] fields = new String[]{"id", "city_id", "rider_id", "utime"}; > return new RowTypeInfo(types, fields); > } > private static RowTypeInfo buildItemTableTypeInfo() { > TypeInformation[] types = new TypeInformation[]{Types.INT(), > Types.STRING(), Types.LONG()}; > String[] fields = new String[]{"id", "city_id", "utime"}; > return new RowTypeInfo(types, fields); > } > //id,rider_id,city_id,utime > private static SourceFunction<Row> buildWaybillStreamSource(RowTypeInfo > rowTypeInfo) { > return new SourceFunction<Row>() { > private volatile boolean stopped = false; > int count = 0; > int[] ids = {111, 222, 333, 111}; > String[] cityIds = {"A", "A", "B", "A"}; > @Override > public void run(SourceContext<Row> ctx) throws Exception { > while (!stopped) { > int id = ids[count % ids.length]; > String cityId = cityIds[count % cityIds.length]; > Row row = new Row(4); > row.setField(0, id); > row.setField(1, cityId); > row.setField(2, (long) RandomUtils.nextInt(1000, 2000)); > row.setField(3, System.currentTimeMillis()); > printRow(rowTypeInfo, row); > ctx.collect(row); > if (++count > 3) { > stopped = true; > } > } > } > @Override > public void cancel() { > stopped = true; > } > }; > } > //id,city_id,utime > private static SourceFunction<Row> buildItemStreamSource(RowTypeInfo > rowTypeInfo) { > return new SourceFunction<Row>() { > private volatile boolean stopped = false; > int count = 0; > int[] ids = {111, 333}; > String[] cityIds = {"C", "D"}; > @Override > public void run(SourceContext<Row> ctx) throws Exception { > while (!stopped) { > Thread.sleep(RandomUtils.nextInt(1000, 2000)); > int id = ids[count % ids.length]; > String cityId = cityIds[count % cityIds.length]; > Row row = new Row(3); > row.setField(0, id); > row.setField(1, cityId); > //row.setField(2, System.currentTimeMillis()); > printRow(rowTypeInfo, row); > ctx.collect(row); > if (++count >= 2) { > stopped = true; > } > } > } > @Override > public void cancel() { > stopped = true; > } > }; > } > public static void printRow(RowTypeInfo rowTypeInfo, Row row) { > String prefix = ""; > for (int i = 0; i < rowTypeInfo.getArity(); ++i) { > prefix = i > 0 ? "," : ""; > System.out.print(prefix + rowTypeInfo.getFieldNames()[i] + ":" + > row.getField(i)); > } > System.out.println(); > } > {code} > ------------------------------------------------------------ > |*wrong result*||right result|| > |id:111,city_id:A,rider_id:1137,utime:1650979957702 > id:222,city_id:A,rider_id:1976,utime:1650979957725 > id:333,city_id:B,rider_id:1916,utime:1650979957725 > id:111,city_id:A,rider_id:1345,utime:1650979957725 > (true,A,1) > (false,A,1) > (true,A,2) > (true,B,1) > (false,A,2) > (true,A,1) > (false,A,1) > (true,A,2) > id:111,city_id:C,utime:null > (false,A,2) > (true,A,1) > (true,C,1) > (false,A,1) > (false,C,1) > (true,C,2) > id:333,city_id: D,utime:null > (false,B,1) > (true,D,1) > The final result: > C,2 > D,1 > is wrong.| > id:111,city_id:A,rider_id:1155,utime:1650980662019 > id:222,city_id:A,rider_id:1875,utime:1650980662042 > id:333,city_id:B,rider_id:1430,utime:1650980662042 > id:111,city_id:A,rider_id:1308,utime:1650980662042 > (true,A,1) > (false,A,1) > (true,A,2) > (true,B,1) > (false,A,2) > (true,A,1) > (false,A,1) > (true,A,2) > id:111,city_id:C,utime:null > (false,A,2) > (true,A,1) > (false,A,1) > (true,A,2) > (false,A,2) > (true,A,1) > (true,C,1) > id:333,city_id: D,utime:null > (false,B,1) > (true,D,1) > The final result: > A,1 > C,1 > D,1 > is right.| > > -- This message was sent by Atlassian Jira (v8.20.7#820007)