[jira] [Updated] (FLINK-25802) OverWindow in batch mode failed

2022-01-25 Thread Martijn Visser (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser updated FLINK-25802:
---
Component/s: Table SQL / Planner
 Table SQL / Runtime

> OverWindow in batch mode failed
> ---
>
> Key: FLINK-25802
> URL: https://issues.apache.org/jira/browse/FLINK-25802
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner, Table SQL / Runtime
>Affects Versions: 1.14.0
>Reporter: Zoyo Pei
>Priority: Major
>
> {code:java}
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
> DataStream userStream = env
> .fromElements(
> Row.of(LocalDateTime.parse("2021-08-21T13:00:00"), 1, 
> "Alice"),
> Row.of(LocalDateTime.parse("2021-08-21T13:05:00"), 2, "Bob"),
> Row.of(LocalDateTime.parse("2021-08-21T13:10:00"), 2, "Bob"))
> .returns(
> Types.ROW_NAMED(
> new String[]{"ts", "uid", "name"},
> Types.LOCAL_DATE_TIME, Types.INT, Types.STRING));
> tEnv.createTemporaryView(
> "UserTable",
> userStream,
> Schema.newBuilder()
> .column("ts", DataTypes.TIMESTAMP(3))
> .column("uid", DataTypes.INT())
> .column("name", DataTypes.STRING())
> .watermark("ts", "ts - INTERVAL '1' SECOND")
> .build());
> String statement = "SELECT name, ts, COUNT(name) OVER w AS cnt FROM UserTable 
> " +
> "WINDOW w AS (" +
> " PARTITION BY name" +
> " ORDER BY ts" +
> " RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW" +
> ")";
> tEnv.executeSql(statement).print();
>  {code}
>  
> {code:java}
> /* 1 */
> /* 2 */      public class RangeBoundComparator$38 implements 
> org.apache.flink.table.runtime.generated.RecordComparator {
> /* 3 */
> /* 4 */        private final Object[] references;
> /* 5 */        
> /* 6 */
> /* 7 */        public RangeBoundComparator$38(Object[] references) {
> /* 8 */          this.references = references;
> /* 9 */          
> /* 10 */          
> /* 11 */        }
> /* 12 */
> /* 13 */        @Override
> /* 14 */        public int compare(org.apache.flink.table.data.RowData in1, 
> org.apache.flink.table.data.RowData in2) {
> /* 15 */          
> /* 16 */                  org.apache.flink.table.data.TimestampData field$39;
> /* 17 */                  boolean isNull$39;
> /* 18 */                  org.apache.flink.table.data.TimestampData field$40;
> /* 19 */                  boolean isNull$40;
> /* 20 */                  isNull$39 = in1.isNullAt(0);
> /* 21 */                  field$39 = null;
> /* 22 */                  if (!isNull$39) {
> /* 23 */                    field$39 = in1.getTimestamp(0, 3);
> /* 24 */                  }
> /* 25 */                  isNull$40 = in2.isNullAt(0);
> /* 26 */                  field$40 = null;
> /* 27 */                  if (!isNull$40) {
> /* 28 */                    field$40 = in2.getTimestamp(0, 3);
> /* 29 */                  }
> /* 30 */                  if (isNull$39 && isNull$40) {
> /* 31 */                     return 1;
> /* 32 */                  } else if (isNull$39 || isNull$40) {
> /* 33 */                     return -1;
> /* 34 */                  } else {
> /* 35 */                     
> /* 36 */                            
> /* 37 */                            long result$41;
> /* 38 */                            boolean isNull$41;
> /* 39 */                            long result$42;
> /* 40 */                            boolean isNull$42;
> /* 41 */                            boolean isNull$43;
> /* 42 */                            long result$44;
> /* 43 */                            boolean isNull$45;
> /* 44 */                            boolean result$46;
> /* 45 */                            isNull$41 = (java.lang.Long) field$39 == 
> null;
> /* 46 */                            result$41 = -1L;
> /* 47 */                            if (!isNull$41) {
> /* 48 */                              result$41 = (java.lang.Long) field$39;
> /* 49 */                            }
> /* 50 */                            isNull$42 = (java.lang.Long) field$40 == 
> null;
> /* 51 */                            result$42 = -1L;
> /* 52 */                            if (!isNull$42) {
> /* 53 */                              result$42 = (java.lang.Long) field$40;
> /* 54 */                            }
> /* 55 */                            
> /* 56 */                            
> /* 57 */                            
> /* 58 */                            
> /* 59 */                            

[jira] [Updated] (FLINK-25802) OverWindow in batch mode failed

2022-01-24 Thread Zoyo Pei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zoyo Pei updated FLINK-25802:
-
Description: 
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream userStream = env
.fromElements(
Row.of(LocalDateTime.parse("2021-08-21T13:00:00"), 1, "Alice"),
Row.of(LocalDateTime.parse("2021-08-21T13:05:00"), 2, "Bob"),
Row.of(LocalDateTime.parse("2021-08-21T13:10:00"), 2, "Bob"))
.returns(
Types.ROW_NAMED(
new String[]{"ts", "uid", "name"},
Types.LOCAL_DATE_TIME, Types.INT, Types.STRING));
tEnv.createTemporaryView(
"UserTable",
userStream,
Schema.newBuilder()
.column("ts", DataTypes.TIMESTAMP(3))
.column("uid", DataTypes.INT())
.column("name", DataTypes.STRING())
.watermark("ts", "ts - INTERVAL '1' SECOND")
.build());
String statement = "SELECT name, ts, COUNT(name) OVER w AS cnt FROM UserTable " 
+
"WINDOW w AS (" +
" PARTITION BY name" +
" ORDER BY ts" +
" RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW" +
")";
tEnv.executeSql(statement).print();
 {code}
 
{code:java}
/* 1 */
/* 2 */      public class RangeBoundComparator$38 implements 
org.apache.flink.table.runtime.generated.RecordComparator {
/* 3 */
/* 4 */        private final Object[] references;
/* 5 */        
/* 6 */
/* 7 */        public RangeBoundComparator$38(Object[] references) {
/* 8 */          this.references = references;
/* 9 */          
/* 10 */          
/* 11 */        }
/* 12 */
/* 13 */        @Override
/* 14 */        public int compare(org.apache.flink.table.data.RowData in1, 
org.apache.flink.table.data.RowData in2) {
/* 15 */          
/* 16 */                  org.apache.flink.table.data.TimestampData field$39;
/* 17 */                  boolean isNull$39;
/* 18 */                  org.apache.flink.table.data.TimestampData field$40;
/* 19 */                  boolean isNull$40;
/* 20 */                  isNull$39 = in1.isNullAt(0);
/* 21 */                  field$39 = null;
/* 22 */                  if (!isNull$39) {
/* 23 */                    field$39 = in1.getTimestamp(0, 3);
/* 24 */                  }
/* 25 */                  isNull$40 = in2.isNullAt(0);
/* 26 */                  field$40 = null;
/* 27 */                  if (!isNull$40) {
/* 28 */                    field$40 = in2.getTimestamp(0, 3);
/* 29 */                  }
/* 30 */                  if (isNull$39 && isNull$40) {
/* 31 */                     return 1;
/* 32 */                  } else if (isNull$39 || isNull$40) {
/* 33 */                     return -1;
/* 34 */                  } else {
/* 35 */                     
/* 36 */                            
/* 37 */                            long result$41;
/* 38 */                            boolean isNull$41;
/* 39 */                            long result$42;
/* 40 */                            boolean isNull$42;
/* 41 */                            boolean isNull$43;
/* 42 */                            long result$44;
/* 43 */                            boolean isNull$45;
/* 44 */                            boolean result$46;
/* 45 */                            isNull$41 = (java.lang.Long) field$39 == 
null;
/* 46 */                            result$41 = -1L;
/* 47 */                            if (!isNull$41) {
/* 48 */                              result$41 = (java.lang.Long) field$39;
/* 49 */                            }
/* 50 */                            isNull$42 = (java.lang.Long) field$40 == 
null;
/* 51 */                            result$42 = -1L;
/* 52 */                            if (!isNull$42) {
/* 53 */                              result$42 = (java.lang.Long) field$40;
/* 54 */                            }
/* 55 */                            
/* 56 */                            
/* 57 */                            
/* 58 */                            
/* 59 */                            isNull$43 = isNull$41 || isNull$42;
/* 60 */                            result$44 = -1L;
/* 61 */                            if (!isNull$43) {
/* 62 */                              
/* 63 */                              result$44 = (long) (result$41 - 
result$42);
/* 64 */                              
/* 65 */                            }
/* 66 */                            
/* 67 */                            
/* 68 */                            isNull$45 = isNull$43 || false;
/* 69 */                            result$46 = false;
/* 70 */                            if (!isNull$45) {
/* 71 */                              
/* 72 */                       

[jira] [Updated] (FLINK-25802) OverWindow in batch mode failed

2022-01-24 Thread Zoyo Pei (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-25802?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zoyo Pei updated FLINK-25802:
-
Description: 
{code:java}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
DataStream userStream = env
.fromElements(
Row.of(LocalDateTime.parse("2021-08-21T13:00:00"), 1, "Alice"),
Row.of(LocalDateTime.parse("2021-08-21T13:05:00"), 2, "Bob"),
Row.of(LocalDateTime.parse("2021-08-21T13:10:00"), 2, "Bob"))
.returns(
Types.ROW_NAMED(
new String[]{"ts", "uid", "name"},
Types.LOCAL_DATE_TIME, Types.INT, Types.STRING));
tEnv.createTemporaryView(
"UserTable",
userStream,
Schema.newBuilder()
.column("ts", DataTypes.TIMESTAMP(3))
.column("uid", DataTypes.INT())
.column("name", DataTypes.STRING())
.watermark("ts", "ts - INTERVAL '1' SECOND")
.build());
String statement = "SELECT name, ts, COUNT(name) OVER w AS cnt FROM UserTable " 
+
"WINDOW w AS (" +
" PARTITION BY name" +
" ORDER BY ts" +
" RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW" +
")";
tEnv.executeSql(statement).print();
 {code}
 
{code:java}
/* 1 */
/* 2 */      public class RangeBoundComparator$38 implements 
org.apache.flink.table.runtime.generated.RecordComparator {
/* 3 */
/* 4 */        private final Object[] references;
/* 5 */        
/* 6 */
/* 7 */        public RangeBoundComparator$38(Object[] references) {
/* 8 */          this.references = references;
/* 9 */          
/* 10 */          
/* 11 */        }
/* 12 */
/* 13 */        @Override
/* 14 */        public int compare(org.apache.flink.table.data.RowData in1, 
org.apache.flink.table.data.RowData in2) {
/* 15 */          
/* 16 */                  org.apache.flink.table.data.TimestampData field$39;
/* 17 */                  boolean isNull$39;
/* 18 */                  org.apache.flink.table.data.TimestampData field$40;
/* 19 */                  boolean isNull$40;
/* 20 */                  isNull$39 = in1.isNullAt(0);
/* 21 */                  field$39 = null;
/* 22 */                  if (!isNull$39) {
/* 23 */                    field$39 = in1.getTimestamp(0, 3);
/* 24 */                  }
/* 25 */                  isNull$40 = in2.isNullAt(0);
/* 26 */                  field$40 = null;
/* 27 */                  if (!isNull$40) {
/* 28 */                    field$40 = in2.getTimestamp(0, 3);
/* 29 */                  }
/* 30 */                  if (isNull$39 && isNull$40) {
/* 31 */                     return 1;
/* 32 */                  } else if (isNull$39 || isNull$40) {
/* 33 */                     return -1;
/* 34 */                  } else {
/* 35 */                     
/* 36 */                            
/* 37 */                            long result$41;
/* 38 */                            boolean isNull$41;
/* 39 */                            long result$42;
/* 40 */                            boolean isNull$42;
/* 41 */                            boolean isNull$43;
/* 42 */                            long result$44;
/* 43 */                            boolean isNull$45;
/* 44 */                            boolean result$46;
/* 45 */                            isNull$41 = (java.lang.Long) field$39 == 
null;
/* 46 */                            result$41 = -1L;
/* 47 */                            if (!isNull$41) {
/* 48 */                              result$41 = (java.lang.Long) field$39;
/* 49 */                            }
/* 50 */                            isNull$42 = (java.lang.Long) field$40 == 
null;
/* 51 */                            result$42 = -1L;
/* 52 */                            if (!isNull$42) {
/* 53 */                              result$42 = (java.lang.Long) field$40;
/* 54 */                            }
/* 55 */                            
/* 56 */                            
/* 57 */                            
/* 58 */                            
/* 59 */                            isNull$43 = isNull$41 || isNull$42;
/* 60 */                            result$44 = -1L;
/* 61 */                            if (!isNull$43) {
/* 62 */                              
/* 63 */                              result$44 = (long) (result$41 - 
result$42);
/* 64 */                              
/* 65 */                            }
/* 66 */                            
/* 67 */                            
/* 68 */                            isNull$45 = isNull$43 || false;
/* 69 */                            result$46 = false;
/* 70 */                            if (!isNull$45) {
/* 71 */                              
/* 72 */