[jira] [Updated] (FLINK-25802) OverWindow in batch mode failed
[ https://issues.apache.org/jira/browse/FLINK-25802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Martijn Visser updated FLINK-25802: --- Component/s: Table SQL / Planner Table SQL / Runtime > OverWindow in batch mode failed > --- > > Key: FLINK-25802 > URL: https://issues.apache.org/jira/browse/FLINK-25802 > Project: Flink > Issue Type: Bug > Components: Table SQL / Planner, Table SQL / Runtime >Affects Versions: 1.14.0 >Reporter: Zoyo Pei >Priority: Major > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); > DataStream userStream = env > .fromElements( > Row.of(LocalDateTime.parse("2021-08-21T13:00:00"), 1, > "Alice"), > Row.of(LocalDateTime.parse("2021-08-21T13:05:00"), 2, "Bob"), > Row.of(LocalDateTime.parse("2021-08-21T13:10:00"), 2, "Bob")) > .returns( > Types.ROW_NAMED( > new String[]{"ts", "uid", "name"}, > Types.LOCAL_DATE_TIME, Types.INT, Types.STRING)); > tEnv.createTemporaryView( > "UserTable", > userStream, > Schema.newBuilder() > .column("ts", DataTypes.TIMESTAMP(3)) > .column("uid", DataTypes.INT()) > .column("name", DataTypes.STRING()) > .watermark("ts", "ts - INTERVAL '1' SECOND") > .build()); > String statement = "SELECT name, ts, COUNT(name) OVER w AS cnt FROM UserTable > " + > "WINDOW w AS (" + > " PARTITION BY name" + > " ORDER BY ts" + > " RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW" + > ")"; > tEnv.executeSql(statement).print(); > {code} > > {code:java} > /* 1 */ > /* 2 */ public class RangeBoundComparator$38 implements > org.apache.flink.table.runtime.generated.RecordComparator { > /* 3 */ > /* 4 */ private final Object[] references; > /* 5 */ > /* 6 */ > /* 7 */ public RangeBoundComparator$38(Object[] references) { > /* 8 */ this.references = references; > /* 9 */ > /* 10 */ > /* 11 */ } > /* 12 */ > /* 13 */ @Override > /* 14 */ public int compare(org.apache.flink.table.data.RowData in1, > org.apache.flink.table.data.RowData in2) { > /* 15 */ > /* 16 */ org.apache.flink.table.data.TimestampData field$39; > /* 17 */ boolean isNull$39; > /* 18 */ org.apache.flink.table.data.TimestampData field$40; > /* 19 */ boolean isNull$40; > /* 20 */ isNull$39 = in1.isNullAt(0); > /* 21 */ field$39 = null; > /* 22 */ if (!isNull$39) { > /* 23 */ field$39 = in1.getTimestamp(0, 3); > /* 24 */ } > /* 25 */ isNull$40 = in2.isNullAt(0); > /* 26 */ field$40 = null; > /* 27 */ if (!isNull$40) { > /* 28 */ field$40 = in2.getTimestamp(0, 3); > /* 29 */ } > /* 30 */ if (isNull$39 && isNull$40) { > /* 31 */ return 1; > /* 32 */ } else if (isNull$39 || isNull$40) { > /* 33 */ return -1; > /* 34 */ } else { > /* 35 */ > /* 36 */ > /* 37 */ long result$41; > /* 38 */ boolean isNull$41; > /* 39 */ long result$42; > /* 40 */ boolean isNull$42; > /* 41 */ boolean isNull$43; > /* 42 */ long result$44; > /* 43 */ boolean isNull$45; > /* 44 */ boolean result$46; > /* 45 */ isNull$41 = (java.lang.Long) field$39 == > null; > /* 46 */ result$41 = -1L; > /* 47 */ if (!isNull$41) { > /* 48 */ result$41 = (java.lang.Long) field$39; > /* 49 */ } > /* 50 */ isNull$42 = (java.lang.Long) field$40 == > null; > /* 51 */ result$42 = -1L; > /* 52 */ if (!isNull$42) { > /* 53 */ result$42 = (java.lang.Long) field$40; > /* 54 */ } > /* 55 */ > /* 56 */ > /* 57 */ > /* 58 */ > /* 59 */
[jira] [Updated] (FLINK-25802) OverWindow in batch mode failed
[ https://issues.apache.org/jira/browse/FLINK-25802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zoyo Pei updated FLINK-25802: - Description: {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); DataStream userStream = env .fromElements( Row.of(LocalDateTime.parse("2021-08-21T13:00:00"), 1, "Alice"), Row.of(LocalDateTime.parse("2021-08-21T13:05:00"), 2, "Bob"), Row.of(LocalDateTime.parse("2021-08-21T13:10:00"), 2, "Bob")) .returns( Types.ROW_NAMED( new String[]{"ts", "uid", "name"}, Types.LOCAL_DATE_TIME, Types.INT, Types.STRING)); tEnv.createTemporaryView( "UserTable", userStream, Schema.newBuilder() .column("ts", DataTypes.TIMESTAMP(3)) .column("uid", DataTypes.INT()) .column("name", DataTypes.STRING()) .watermark("ts", "ts - INTERVAL '1' SECOND") .build()); String statement = "SELECT name, ts, COUNT(name) OVER w AS cnt FROM UserTable " + "WINDOW w AS (" + " PARTITION BY name" + " ORDER BY ts" + " RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW" + ")"; tEnv.executeSql(statement).print(); {code} {code:java} /* 1 */ /* 2 */ public class RangeBoundComparator$38 implements org.apache.flink.table.runtime.generated.RecordComparator { /* 3 */ /* 4 */ private final Object[] references; /* 5 */ /* 6 */ /* 7 */ public RangeBoundComparator$38(Object[] references) { /* 8 */ this.references = references; /* 9 */ /* 10 */ /* 11 */ } /* 12 */ /* 13 */ @Override /* 14 */ public int compare(org.apache.flink.table.data.RowData in1, org.apache.flink.table.data.RowData in2) { /* 15 */ /* 16 */ org.apache.flink.table.data.TimestampData field$39; /* 17 */ boolean isNull$39; /* 18 */ org.apache.flink.table.data.TimestampData field$40; /* 19 */ boolean isNull$40; /* 20 */ isNull$39 = in1.isNullAt(0); /* 21 */ field$39 = null; /* 22 */ if (!isNull$39) { /* 23 */ field$39 = in1.getTimestamp(0, 3); /* 24 */ } /* 25 */ isNull$40 = in2.isNullAt(0); /* 26 */ field$40 = null; /* 27 */ if (!isNull$40) { /* 28 */ field$40 = in2.getTimestamp(0, 3); /* 29 */ } /* 30 */ if (isNull$39 && isNull$40) { /* 31 */ return 1; /* 32 */ } else if (isNull$39 || isNull$40) { /* 33 */ return -1; /* 34 */ } else { /* 35 */ /* 36 */ /* 37 */ long result$41; /* 38 */ boolean isNull$41; /* 39 */ long result$42; /* 40 */ boolean isNull$42; /* 41 */ boolean isNull$43; /* 42 */ long result$44; /* 43 */ boolean isNull$45; /* 44 */ boolean result$46; /* 45 */ isNull$41 = (java.lang.Long) field$39 == null; /* 46 */ result$41 = -1L; /* 47 */ if (!isNull$41) { /* 48 */ result$41 = (java.lang.Long) field$39; /* 49 */ } /* 50 */ isNull$42 = (java.lang.Long) field$40 == null; /* 51 */ result$42 = -1L; /* 52 */ if (!isNull$42) { /* 53 */ result$42 = (java.lang.Long) field$40; /* 54 */ } /* 55 */ /* 56 */ /* 57 */ /* 58 */ /* 59 */ isNull$43 = isNull$41 || isNull$42; /* 60 */ result$44 = -1L; /* 61 */ if (!isNull$43) { /* 62 */ /* 63 */ result$44 = (long) (result$41 - result$42); /* 64 */ /* 65 */ } /* 66 */ /* 67 */ /* 68 */ isNull$45 = isNull$43 || false; /* 69 */ result$46 = false; /* 70 */ if (!isNull$45) { /* 71 */ /* 72 */
[jira] [Updated] (FLINK-25802) OverWindow in batch mode failed
[ https://issues.apache.org/jira/browse/FLINK-25802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Zoyo Pei updated FLINK-25802: - Description: {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); DataStream userStream = env .fromElements( Row.of(LocalDateTime.parse("2021-08-21T13:00:00"), 1, "Alice"), Row.of(LocalDateTime.parse("2021-08-21T13:05:00"), 2, "Bob"), Row.of(LocalDateTime.parse("2021-08-21T13:10:00"), 2, "Bob")) .returns( Types.ROW_NAMED( new String[]{"ts", "uid", "name"}, Types.LOCAL_DATE_TIME, Types.INT, Types.STRING)); tEnv.createTemporaryView( "UserTable", userStream, Schema.newBuilder() .column("ts", DataTypes.TIMESTAMP(3)) .column("uid", DataTypes.INT()) .column("name", DataTypes.STRING()) .watermark("ts", "ts - INTERVAL '1' SECOND") .build()); String statement = "SELECT name, ts, COUNT(name) OVER w AS cnt FROM UserTable " + "WINDOW w AS (" + " PARTITION BY name" + " ORDER BY ts" + " RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW" + ")"; tEnv.executeSql(statement).print(); {code} {code:java} /* 1 */ /* 2 */ public class RangeBoundComparator$38 implements org.apache.flink.table.runtime.generated.RecordComparator { /* 3 */ /* 4 */ private final Object[] references; /* 5 */ /* 6 */ /* 7 */ public RangeBoundComparator$38(Object[] references) { /* 8 */ this.references = references; /* 9 */ /* 10 */ /* 11 */ } /* 12 */ /* 13 */ @Override /* 14 */ public int compare(org.apache.flink.table.data.RowData in1, org.apache.flink.table.data.RowData in2) { /* 15 */ /* 16 */ org.apache.flink.table.data.TimestampData field$39; /* 17 */ boolean isNull$39; /* 18 */ org.apache.flink.table.data.TimestampData field$40; /* 19 */ boolean isNull$40; /* 20 */ isNull$39 = in1.isNullAt(0); /* 21 */ field$39 = null; /* 22 */ if (!isNull$39) { /* 23 */ field$39 = in1.getTimestamp(0, 3); /* 24 */ } /* 25 */ isNull$40 = in2.isNullAt(0); /* 26 */ field$40 = null; /* 27 */ if (!isNull$40) { /* 28 */ field$40 = in2.getTimestamp(0, 3); /* 29 */ } /* 30 */ if (isNull$39 && isNull$40) { /* 31 */ return 1; /* 32 */ } else if (isNull$39 || isNull$40) { /* 33 */ return -1; /* 34 */ } else { /* 35 */ /* 36 */ /* 37 */ long result$41; /* 38 */ boolean isNull$41; /* 39 */ long result$42; /* 40 */ boolean isNull$42; /* 41 */ boolean isNull$43; /* 42 */ long result$44; /* 43 */ boolean isNull$45; /* 44 */ boolean result$46; /* 45 */ isNull$41 = (java.lang.Long) field$39 == null; /* 46 */ result$41 = -1L; /* 47 */ if (!isNull$41) { /* 48 */ result$41 = (java.lang.Long) field$39; /* 49 */ } /* 50 */ isNull$42 = (java.lang.Long) field$40 == null; /* 51 */ result$42 = -1L; /* 52 */ if (!isNull$42) { /* 53 */ result$42 = (java.lang.Long) field$40; /* 54 */ } /* 55 */ /* 56 */ /* 57 */ /* 58 */ /* 59 */ isNull$43 = isNull$41 || isNull$42; /* 60 */ result$44 = -1L; /* 61 */ if (!isNull$43) { /* 62 */ /* 63 */ result$44 = (long) (result$41 - result$42); /* 64 */ /* 65 */ } /* 66 */ /* 67 */ /* 68 */ isNull$45 = isNull$43 || false; /* 69 */ result$46 = false; /* 70 */ if (!isNull$45) { /* 71 */ /* 72 */