Hi Wangsan,
I opened an issue to document the behavior properly in the future
(https://issues.apache.org/jira/browse/FLINK-8169). Basically, both your
event-time and processing-time timestamps should be GMT. We plan to
support offsets for windows in the future
(https://issues.apache.org/jira/browse/FLINK-8168). Internally, the long
values remain constant in GMT. Only the toString() output is timezone
dependent. For now, I would suggest to implement either some
user-defined scalar functions to implement your desired behavior or just
subtract the offset (ts - INTERVAL '8' HOURS should work).
The timezone support must definitely improved in future versions of
Flink SQL.
Regards,
Timo
Am 11/29/17 um 10:50 AM schrieb wangsan:
Hi Timo,
What I am doing is extracting a timestamp field (may be string format
as “2017-11-28 11:00:00” or a long value base on my current timezone)
as/Event time/ attribute. So In /timestampAndWatermarkAssigner , /for
string//format I should parse the data time string using GMT, and for
long value I should add the offset as opposite to what
/internalToTimestamp /did. But the Processing time attribute can not
keep consistent. Am I understanding that correctly?
Best,
wangsan
On 29 Nov 2017, at 4:43 PM, Timo Walther <twal...@apache.org
<mailto:twal...@apache.org>> wrote:
Hi Wangsan,
currently the timestamps in Flink SQL do not depend on a timezone.
All calculations happen on the UTC timestamp. This also guarantees
that an input with Timestamp.valueOf("XXX") remains consistent when
parsing and outputing it with toString().
Regards,
Timo
Am 11/29/17 um 3:43 AM schrieb wangsan:
Hi Xincan,
Thanks for your reply.
The system default timezone is just as what I expected
(sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]).
I looked into the generated code, and I found the following code
snippet:
```
result$20 =
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
```
And what `internalToTimestamp` function did is:
```
public static Timestamp internalToTimestamp(long v) {
return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```
So, if I give it an event time with unix timestamp 0, then I got the
Timestamp(-28800000). I am confused why `internalToTimestamp` need
to subtract the offset?
Best,
wangsan
On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingc...@gmail.com
<mailto:xingc...@gmail.com>> wrote:
Hi wangsan,
in Flink, the ProcessingTime is just implemented by invoking
System.currentTimeMillis() and the long value will be automatically
wrapped to a Timestamp with the following statement:
`new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
You can check your TimeZone.getDefault() to see if it returns the
right TimeZone. Generally, the returned value should rely on the
default TimeZone of your operating system.
Hope that helps.
Best,
Xingcan
On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamg...@163.com
<mailto:wamg...@163.com>> wrote:
Hi all,
While using Timestamp in Flint SQL, how can I set timezone
info? Since my current timezone is *GMT+8*, and I found the
selected processing time is always *8 hours* late than current
time. So as extracted event time.
Here’s my simplified code:
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
val sTableEnv = TableEnvironment.getTableEnvironment(senv)
println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS",
Locale.CHINA).format(new Date())}")
val stream:DataStream[(String, String, String)]=
senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3,
't.proctime)
sTableEnv.registerTable("foo", table)
val result = sTableEnv.sql("select * from foo")
result.printSchema()
result.toAppendStream[Row].print()
senv.execute("foo")
And here’s the result:
<PastedGraphic-1.png>
Best,
wangsan