Hi Timo, What I am doing is extracting a timestamp field (may be string format as “2017-11-28 11:00:00” or a long value base on my current timezone) as Event time attribute. So In timestampAndWatermarkAssigner , for string format I should parse the data time string using GMT, and for long value I should add the offset as opposite to what internalToTimestamp did. But the Processing time attribute can not keep consistent. Am I understanding that correctly?
Best, wangsan > On 29 Nov 2017, at 4:43 PM, Timo Walther <twal...@apache.org> wrote: > > Hi Wangsan, > > currently the timestamps in Flink SQL do not depend on a timezone. All > calculations happen on the UTC timestamp. This also guarantees that an input > with Timestamp.valueOf("XXX") remains consistent when parsing and outputing > it with toString(). > > Regards, > Timo > > > Am 11/29/17 um 3:43 AM schrieb wangsan: >> Hi Xincan, >> >> Thanks for your reply. >> >> The system default timezone is just as what I expected >> (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). >> >> I looked into the generated code, and I found the following code snippet: >> >> ``` >> result$20 = >> org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19); >> ``` >> >> And what `internalToTimestamp` function did is: >> >> ``` >> public static Timestamp internalToTimestamp(long v) { >> return new Timestamp(v - (long)LOCAL_TZ.getOffset(v)); >> } >> ``` >> >> So, if I give it an event time with unix timestamp 0, then I got the >> Timestamp(-28800000). I am confused why `internalToTimestamp` need to >> subtract the offset? >> >> Best, >> wangsan >> >> >>> On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingc...@gmail.com >>> <mailto:xingc...@gmail.com>> wrote: >>> >>> Hi wangsan, >>> >>> in Flink, the ProcessingTime is just implemented by invoking >>> System.currentTimeMillis() and the long value will be automatically wrapped >>> to a Timestamp with the following statement: >>> >>> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` >>> >>> You can check your TimeZone.getDefault() to see if it returns the right >>> TimeZone. Generally, the returned value should rely on the default TimeZone >>> of your operating system. >>> >>> Hope that helps. >>> >>> Best, >>> Xingcan >>> >>> On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamg...@163.com >>> <mailto:wamg...@163.com>> wrote: >>> Hi all, >>> >>> While using Timestamp in Flint SQL, how can I set timezone info? Since my >>> current timezone is GMT+8, and I found the selected processing time is >>> always 8 hours late than current time. So as extracted event time. >>> >>> Here’s my simplified code: >>> val senv = StreamExecutionEnvironment.getExecutionEnvironment >>> senv.setParallelism(1) >>> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) >>> >>> val sTableEnv = TableEnvironment.getTableEnvironment(senv) >>> println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", >>> Locale.CHINA).format(new Date())}") >>> >>> val stream: DataStream[(String, String, String)] = >>> senv.socketTextStream("localhost", 9999).map(line => (line, line, line)) >>> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, >>> 't.proctime) >>> sTableEnv.registerTable("foo", table) >>> val result = sTableEnv.sql("select * from foo") >>> result.printSchema() >>> result.toAppendStream[Row].print() >>> >>> senv.execute("foo") >>> And here’s the result: >>> >>> <PastedGraphic-1.png> >>> >>> Best, >>> wangsan >>> >> >