Hi Wangsan,
currently the timestamps in Flink SQL do not depend on a timezone. All
calculations happen on the UTC timestamp. This also guarantees that an
input with Timestamp.valueOf("XXX") remains consistent when parsing and
outputing it with toString().
Regards,
Timo
Am 11/29/17 um 3:43 AM schrieb wangsan:
Hi Xincan,
Thanks for your reply.
The system default timezone is just as what I expected
(sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]).
I looked into the generated code, and I found the following code snippet:
```
result$20 =
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
```
And what `internalToTimestamp` function did is:
```
public static Timestamp internalToTimestamp(long v) {
return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```
So, if I give it an event time with unix timestamp 0, then I got the
Timestamp(-28800000). I am confused why `internalToTimestamp` need to
subtract the offset?
Best,
wangsan
On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingc...@gmail.com
<mailto:xingc...@gmail.com>> wrote:
Hi wangsan,
in Flink, the ProcessingTime is just implemented by invoking
System.currentTimeMillis() and the long value will be automatically
wrapped to a Timestamp with the following statement:
`new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
You can check your TimeZone.getDefault() to see if it returns the
right TimeZone. Generally, the returned value should rely on the
default TimeZone of your operating system.
Hope that helps.
Best,
Xingcan
On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamg...@163.com
<mailto:wamg...@163.com>> wrote:
Hi all,
While using Timestamp in Flint SQL, how can I set timezone info?
Since my current timezone is *GMT+8*, and I found the selected
processing time is always *8 hours* late than current time. So as
extracted event time.
Here’s my simplified code:
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val sTableEnv = TableEnvironment.getTableEnvironment(senv)
println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS",
Locale.CHINA).format(new Date())}")
val stream:DataStream[(String, String, String)]=
senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3,
't.proctime)
sTableEnv.registerTable("foo", table)
val result = sTableEnv.sql("select * from foo")
result.printSchema()
result.toAppendStream[Row].print()
senv.execute("foo")
And here’s the result:
<PastedGraphic-1.png>
Best,
wangsan