Hi Xincan,

Thanks for your reply. 

The system default timezone is just as what I expected 
(sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]).
 
I looked into the generated code, and I found the following code snippet:

```
result$20 = 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
```

And what `internalToTimestamp` function did is:

```
public static Timestamp internalToTimestamp(long v) {
    return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```

So, if I give it an event time with unix timestamp 0, then I got the 
Timestamp(-28800000). I am confused why `internalToTimestamp` need to subtract 
the offset?

Best,
wangsan


> On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingc...@gmail.com> wrote:
> 
> Hi wangsan,
> 
> in Flink, the ProcessingTime is just implemented by invoking 
> System.currentTimeMillis() and the long value will be automatically wrapped 
> to a Timestamp with the following statement:
> 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
> 
> You can check your TimeZone.getDefault() to see if it returns the right 
> TimeZone. Generally, the returned value should rely on the default TimeZone 
> of your operating system.
> 
> Hope that helps.
> 
> Best,
> Xingcan
> 
> On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamg...@163.com 
> <mailto:wamg...@163.com>> wrote:
> Hi all,
> 
> While using Timestamp in Flint SQL, how can I set timezone info? Since my 
> current timezone is GMT+8, and I found the selected processing time is always 
> 8 hours late than current time. So as extracted event time.
> 
> Here’s my simplified code:
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setParallelism(1)
> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
> 
> val sTableEnv = TableEnvironment.getTableEnvironment(senv)
> println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", 
> Locale.CHINA).format(new Date())}")
> 
> val stream: DataStream[(String, String, String)] = 
> senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
> sTableEnv.registerTable("foo", table)
> val result = sTableEnv.sql("select * from foo")
> result.printSchema()
> result.toAppendStream[Row].print()
> 
> senv.execute("foo")
> And here’s the result:
> 
> <PastedGraphic-1.png>
> 
> Best,
> wangsan
> 

Reply via email to