Hi Timo,

What I am doing is extracting a timestamp field (may be string format as 
“2017-11-28 11:00:00” or a long value base on my current timezone) as Event 
time attribute. So In timestampAndWatermarkAssigner , for string format I 
should parse the data time string using GMT, and for long value I should add 
the offset as opposite to what internalToTimestamp did. But the Processing time 
attribute can not keep consistent. Am I understanding that correctly?

Best,
wangsan



> On 29 Nov 2017, at 4:43 PM, Timo Walther <twal...@apache.org> wrote:
> 
> Hi Wangsan,
> 
> currently the timestamps in Flink SQL do not depend on a timezone. All 
> calculations happen on the UTC timestamp. This also guarantees that an input 
> with Timestamp.valueOf("XXX") remains consistent when parsing and outputing 
> it with toString().
> 
> Regards,
> Timo
> 
> 
> Am 11/29/17 um 3:43 AM schrieb wangsan:
>> Hi Xincan,
>> 
>> Thanks for your reply. 
>> 
>> The system default timezone is just as what I expected 
>> (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]).
>>  
>> I looked into the generated code, and I found the following code snippet:
>> 
>> ```
>> result$20 = 
>> org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
>> ```
>> 
>> And what `internalToTimestamp` function did is:
>> 
>> ```
>> public static Timestamp internalToTimestamp(long v) {
>>     return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
>> }
>> ```
>> 
>> So, if I give it an event time with unix timestamp 0, then I got the 
>> Timestamp(-28800000). I am confused why `internalToTimestamp` need to 
>> subtract the offset?
>> 
>> Best,
>> wangsan
>> 
>> 
>>> On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingc...@gmail.com 
>>> <mailto:xingc...@gmail.com>> wrote:
>>> 
>>> Hi wangsan,
>>> 
>>> in Flink, the ProcessingTime is just implemented by invoking 
>>> System.currentTimeMillis() and the long value will be automatically wrapped 
>>> to a Timestamp with the following statement:
>>> 
>>> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
>>> 
>>> You can check your TimeZone.getDefault() to see if it returns the right 
>>> TimeZone. Generally, the returned value should rely on the default TimeZone 
>>> of your operating system.
>>> 
>>> Hope that helps.
>>> 
>>> Best,
>>> Xingcan
>>> 
>>> On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamg...@163.com 
>>> <mailto:wamg...@163.com>> wrote:
>>> Hi all,
>>> 
>>> While using Timestamp in Flint SQL, how can I set timezone info? Since my 
>>> current timezone is GMT+8, and I found the selected processing time is 
>>> always 8 hours late than current time. So as extracted event time.
>>> 
>>> Here’s my simplified code:
>>> val senv = StreamExecutionEnvironment.getExecutionEnvironment
>>> senv.setParallelism(1)
>>> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>> 
>>> val sTableEnv = TableEnvironment.getTableEnvironment(senv)
>>> println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", 
>>> Locale.CHINA).format(new Date())}")
>>> 
>>> val stream: DataStream[(String, String, String)] = 
>>> senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
>>> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 
>>> 't.proctime)
>>> sTableEnv.registerTable("foo", table)
>>> val result = sTableEnv.sql("select * from foo")
>>> result.printSchema()
>>> result.toAppendStream[Row].print()
>>> 
>>> senv.execute("foo")
>>> And here’s the result:
>>> 
>>> <PastedGraphic-1.png>
>>> 
>>> Best,
>>> wangsan
>>> 
>> 
> 

Reply via email to