[ https://issues.apache.org/jira/browse/FLINK-15212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kurt Young updated FLINK-15212: ------------------------------- Component/s: (was: Table SQL / API) Table SQL / Legacy Planner > PROCTIME attribute causes problems with timestamp times before 1900 ? > ---------------------------------------------------------------------- > > Key: FLINK-15212 > URL: https://issues.apache.org/jira/browse/FLINK-15212 > Project: Flink > Issue Type: Bug > Components: Table SQL / Legacy Planner > Affects Versions: 1.9.1 > Environment: flink 1.9.1 > jdk1.8.0_211 > idea2019.3 > Reporter: Rockey Cui > Priority: Major > Labels: easyfix > > A simple DataStreamSource with timestamp registered as a table. > > {code:java} > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > env.setParallelism(1); > DataStreamSource<String> stringDataStreamSource = env.fromElements( > "1001,1002,adc0,1900-01-01 00:00:00.0", > "1002,1003,adc1,1910-01-01 00:00:00.0", > "1003,1004,adc2,1920-01-01 00:00:00.0", > "1004,1005,adc3,1930-01-01 00:00:00.0", > "1005,1006,adc4,1970-01-01 00:00:00.0", > "8888,6666,adc5,1971-01-01 00:00:00.0" > ); > TypeInformation<?>[] fieldTypes = new TypeInformation[]{Types.LONG, > Types.LONG, Types.STRING, Types.SQL_TIM > String[] fieldNames = new String[]{"id", "cityId", "url", "clickTime"}; > RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); > DataStream<Row> stream = stringDataStreamSource.map((MapFunction<String, > Row>) s -> { > String[] split = s.split(","); > Row row = new Row(split.length); > for (int i = 0; i < split.length; i++) { > Object value = null; > if (fieldTypes[i].equals(Types.STRING)) { > value = split[i]; > } > if (fieldTypes[i].equals(Types.LONG)) { > value = Long.valueOf(split[i]); > } > if (fieldTypes[i].equals(Types.INT)) { > value = Integer.valueOf(split[i]); > } > if (fieldTypes[i].equals(Types.DOUBLE)) { > value = Double.valueOf(split[i]); > } > if (fieldTypes[i].equals(Types.SQL_TIMESTAMP)) { > value = Timestamp.valueOf(split[i]); > } > row.setField(i, value); > } > //System.out.println(row.toString()); > return row; > }).returns(rowTypeInfo); > tableEnv.registerDataStream("user_click_info", stream, String.join(",", > fieldNames) + ",www.proctime"); > String sql = "select * from user_click_info"; > Table table = tableEnv.sqlQuery(sql); > DataStream<Row> result = tableEnv.toAppendStream(table, Row.class); > result.print(); > table.printSchema(); > tableEnv.execute("Test"); > {code} > result ==> > > root > |-- id: BIGINT > |-- cityId: BIGINT > |-- url: STRING > |-- clickTime: TIMESTAMP(3) > |-- www: TIMESTAMP(3) *PROCTIME* > > 1001,1002,adc0,{color:#FF0000}1899-12-31 23:54:17.0{color},2019-12-12 > 03:37:18.036 > 1002,1003,adc1,1910-01-01 00:00:00.0,2019-12-12 03:37:18.196 > 1003,1004,adc2,1920-01-01 00:00:00.0,2019-12-12 03:37:18.196 > 1004,1005,adc3,1930-01-01 00:00:00.0,2019-12-12 03:37:18.196 > 1005,1006,adc4,1970-01-01 00:00:00.0,2019-12-12 03:37:18.196 > 8888,6666,adc5,1971-01-01 00:00:00.0,2019-12-12 03:37:18.196 > ---- > without PROCTIME attribute is OK ==> > > root > |-- id: BIGINT > |-- cityId: BIGINT > |-- url: STRING > |-- clickTime: TIMESTAMP(3) > > 1001,1002,adc0,1900-01-01 00:00:00.0 > 1002,1003,adc1,1910-01-01 00:00:00.0 > 1003,1004,adc2,1920-01-01 00:00:00.0 > 1004,1005,adc3,1930-01-01 00:00:00.0 > 1005,1006,adc4,1970-01-01 00:00:00.0 > 8888,6666,adc5,1971-01-01 00:00:00.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)