Hi Xincan, Thanks for your reply.
The system default timezone is just as what I expected (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). I looked into the generated code, and I found the following code snippet: ``` result$20 = org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19); ``` And what `internalToTimestamp` function did is: ``` public static Timestamp internalToTimestamp(long v) { return new Timestamp(v - (long)LOCAL_TZ.getOffset(v)); } ``` So, if I give it an event time with unix timestamp 0, then I got the Timestamp(-28800000). I am confused why `internalToTimestamp` need to subtract the offset? Best, wangsan > On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingc...@gmail.com> wrote: > > Hi wangsan, > > in Flink, the ProcessingTime is just implemented by invoking > System.currentTimeMillis() and the long value will be automatically wrapped > to a Timestamp with the following statement: > > `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));` > > You can check your TimeZone.getDefault() to see if it returns the right > TimeZone. Generally, the returned value should rely on the default TimeZone > of your operating system. > > Hope that helps. > > Best, > Xingcan > > On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamg...@163.com > <mailto:wamg...@163.com>> wrote: > Hi all, > > While using Timestamp in Flint SQL, how can I set timezone info? Since my > current timezone is GMT+8, and I found the selected processing time is always > 8 hours late than current time. So as extracted event time. > > Here’s my simplified code: > val senv = StreamExecutionEnvironment.getExecutionEnvironment > senv.setParallelism(1) > senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) > > val sTableEnv = TableEnvironment.getTableEnvironment(senv) > println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", > Locale.CHINA).format(new Date())}") > > val stream: DataStream[(String, String, String)] = > senv.socketTextStream("localhost", 9999).map(line => (line, line, line)) > val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime) > sTableEnv.registerTable("foo", table) > val result = sTableEnv.sql("select * from foo") > result.printSchema() > result.toAppendStream[Row].print() > > senv.execute("foo") > And here’s the result: > > <PastedGraphic-1.png> > > Best, > wangsan >