Re: Question about Timestamp in Flink SQL

2017-11-28 Thread wangsan
Hi Xincan,

Thanks for your reply. 

The system default timezone is just as what I expected 
(sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=2880,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]).
 
I looked into the generated code, and I found the following code snippet:

```
result$20 = 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
```

And what `internalToTimestamp` function did is:

```
public static Timestamp internalToTimestamp(long v) {
return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```

So, if I give it an event time with unix timestamp 0, then I got the 
Timestamp(-2880). I am confused why `internalToTimestamp` need to subtract 
the offset?

Best,
wangsan


> On 28 Nov 2017, at 11:32 PM, Xingcan Cui  wrote:
> 
> Hi wangsan,
> 
> in Flink, the ProcessingTime is just implemented by invoking 
> System.currentTimeMillis() and the long value will be automatically wrapped 
> to a Timestamp with the following statement:
> 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
> 
> You can check your TimeZone.getDefault() to see if it returns the right 
> TimeZone. Generally, the returned value should rely on the default TimeZone 
> of your operating system.
> 
> Hope that helps.
> 
> Best,
> Xingcan
> 
> On Tue, Nov 28, 2017 at 9:31 PM, wangsan  > wrote:
> Hi all,
> 
> While using Timestamp in Flint SQL, how can I set timezone info? Since my 
> current timezone is GMT+8, and I found the selected processing time is always 
> 8 hours late than current time. So as extracted event time.
> 
> Here’s my simplified code:
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setParallelism(1)
> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
> 
> val sTableEnv = TableEnvironment.getTableEnvironment(senv)
> println(s"current time: ${new SimpleDateFormat(".MM.dd HH:mm:ss.SSS", 
> Locale.CHINA).format(new Date())}")
> 
> val stream: DataStream[(String, String, String)] = 
> senv.socketTextStream("localhost", ).map(line => (line, line, line))
> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
> sTableEnv.registerTable("foo", table)
> val result = sTableEnv.sql("select * from foo")
> result.printSchema()
> result.toAppendStream[Row].print()
> 
> senv.execute("foo")
> And here’s the result:
> 
> 
> 
> Best,
> wangsan
> 



Re: Question about Timestamp in Flink SQL

2017-11-29 Thread Timo Walther

Hi Wangsan,

currently the timestamps in Flink SQL do not depend on a timezone. All 
calculations happen on the UTC timestamp. This also guarantees that an 
input with Timestamp.valueOf("XXX") remains consistent when parsing and 
outputing it with toString().


Regards,
Timo


Am 11/29/17 um 3:43 AM schrieb wangsan:

Hi Xincan,

Thanks for your reply.

The system default timezone is just as what I expected 
(sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=2880,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). 


I looked into the generated code, and I found the following code snippet:

```
result$20 = 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);

```

And what `internalToTimestamp` function did is:

```
public static Timestamp internalToTimestamp(long v) {
    return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```

So, if I give it an event time with unix timestamp 0, then I got the 
Timestamp(-2880). I am confused why `internalToTimestamp` need to 
subtract the offset?


Best,
wangsan


On 28 Nov 2017, at 11:32 PM, Xingcan Cui > wrote:


Hi wangsan,

in Flink, the ProcessingTime is just implemented by invoking 
System.currentTimeMillis() and the long value will be automatically 
wrapped to a Timestamp with the following statement:


`new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`

You can check your TimeZone.getDefault() to see if it returns the 
right TimeZone. Generally, the returned value should rely on the 
default TimeZone of your operating system.


Hope that helps.

Best,
Xingcan

On Tue, Nov 28, 2017 at 9:31 PM, wangsan > wrote:


Hi all,

While using Timestamp in Flint SQL, how can I set timezone info?
Since my current timezone is *GMT+8*, and I found the selected
processing time is always *8 hours* late than current time. So as
extracted event time.

Here’s my simplified code:

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val sTableEnv = TableEnvironment.getTableEnvironment(senv)
println(s"current time: ${new SimpleDateFormat(".MM.dd HH:mm:ss.SSS", 
Locale.CHINA).format(new Date())}")

val stream:DataStream[(String, String, String)]= 
senv.socketTextStream("localhost", ).map(line => (line, line, line))
val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 
't.proctime)
sTableEnv.registerTable("foo", table)
val result = sTableEnv.sql("select * from foo")
result.printSchema()
result.toAppendStream[Row].print()

senv.execute("foo")

And here’s the result:



Best,
wangsan








Re: Question about Timestamp in Flink SQL

2017-11-29 Thread wangsan
Hi Timo,

What I am doing is extracting a timestamp field (may be string format as 
“2017-11-28 11:00:00” or a long value base on my current timezone) as Event 
time attribute. So In timestampAndWatermarkAssigner , for string format I 
should parse the data time string using GMT, and for long value I should add 
the offset as opposite to what internalToTimestamp did. But the Processing time 
attribute can not keep consistent. Am I understanding that correctly?

Best,
wangsan



> On 29 Nov 2017, at 4:43 PM, Timo Walther  wrote:
> 
> Hi Wangsan,
> 
> currently the timestamps in Flink SQL do not depend on a timezone. All 
> calculations happen on the UTC timestamp. This also guarantees that an input 
> with Timestamp.valueOf("XXX") remains consistent when parsing and outputing 
> it with toString().
> 
> Regards,
> Timo
> 
> 
> Am 11/29/17 um 3:43 AM schrieb wangsan:
>> Hi Xincan,
>> 
>> Thanks for your reply. 
>> 
>> The system default timezone is just as what I expected 
>> (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=2880,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]).
>>  
>> I looked into the generated code, and I found the following code snippet:
>> 
>> ```
>> result$20 = 
>> org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
>> ```
>> 
>> And what `internalToTimestamp` function did is:
>> 
>> ```
>> public static Timestamp internalToTimestamp(long v) {
>> return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
>> }
>> ```
>> 
>> So, if I give it an event time with unix timestamp 0, then I got the 
>> Timestamp(-2880). I am confused why `internalToTimestamp` need to 
>> subtract the offset?
>> 
>> Best,
>> wangsan
>> 
>> 
>>> On 28 Nov 2017, at 11:32 PM, Xingcan Cui >> > wrote:
>>> 
>>> Hi wangsan,
>>> 
>>> in Flink, the ProcessingTime is just implemented by invoking 
>>> System.currentTimeMillis() and the long value will be automatically wrapped 
>>> to a Timestamp with the following statement:
>>> 
>>> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
>>> 
>>> You can check your TimeZone.getDefault() to see if it returns the right 
>>> TimeZone. Generally, the returned value should rely on the default TimeZone 
>>> of your operating system.
>>> 
>>> Hope that helps.
>>> 
>>> Best,
>>> Xingcan
>>> 
>>> On Tue, Nov 28, 2017 at 9:31 PM, wangsan >> > wrote:
>>> Hi all,
>>> 
>>> While using Timestamp in Flint SQL, how can I set timezone info? Since my 
>>> current timezone is GMT+8, and I found the selected processing time is 
>>> always 8 hours late than current time. So as extracted event time.
>>> 
>>> Here’s my simplified code:
>>> val senv = StreamExecutionEnvironment.getExecutionEnvironment
>>> senv.setParallelism(1)
>>> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>> 
>>> val sTableEnv = TableEnvironment.getTableEnvironment(senv)
>>> println(s"current time: ${new SimpleDateFormat(".MM.dd HH:mm:ss.SSS", 
>>> Locale.CHINA).format(new Date())}")
>>> 
>>> val stream: DataStream[(String, String, String)] = 
>>> senv.socketTextStream("localhost", ).map(line => (line, line, line))
>>> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 
>>> 't.proctime)
>>> sTableEnv.registerTable("foo", table)
>>> val result = sTableEnv.sql("select * from foo")
>>> result.printSchema()
>>> result.toAppendStream[Row].print()
>>> 
>>> senv.execute("foo")
>>> And here’s the result:
>>> 
>>> 
>>> 
>>> Best,
>>> wangsan
>>> 
>> 
> 



Re: Question about Timestamp in Flink SQL

2017-11-29 Thread Timo Walther

Hi Wangsan,

I opened an issue to document the behavior properly in the future 
(https://issues.apache.org/jira/browse/FLINK-8169). Basically, both your 
event-time and processing-time timestamps should be GMT. We plan to 
support offsets for windows in the future 
(https://issues.apache.org/jira/browse/FLINK-8168). Internally, the long 
values remain constant in GMT. Only the toString() output is timezone 
dependent. For now, I would suggest to implement either some 
user-defined scalar functions to implement your desired behavior or just 
subtract the offset (ts - INTERVAL '8' HOURS should work).


The timezone support must definitely improved in future versions of 
Flink SQL.


Regards,
Timo



Am 11/29/17 um 10:50 AM schrieb wangsan:

Hi Timo,

What I am doing is extracting a timestamp field (may be string format 
as “2017-11-28 11:00:00” or a long value base on my current timezone) 
as/Event time/ attribute. So In /timestampAndWatermarkAssigner , /for 
string//format I should parse the data time string using GMT, and for 
long value I should add the offset as opposite to what 
/internalToTimestamp /did. But the Processing time attribute can not 
keep consistent. Am I understanding that correctly?


Best,
wangsan



On 29 Nov 2017, at 4:43 PM, Timo Walther > wrote:


Hi Wangsan,

currently the timestamps in Flink SQL do not depend on a timezone. 
All calculations happen on the UTC timestamp. This also guarantees 
that an input with Timestamp.valueOf("XXX") remains consistent when 
parsing and outputing it with toString().


Regards,
Timo


Am 11/29/17 um 3:43 AM schrieb wangsan:

Hi Xincan,

Thanks for your reply.

The system default timezone is just as what I expected 
(sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=2880,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). 

I looked into the generated code, and I found the following code 
snippet:


```
result$20 = 
org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);

```

And what `internalToTimestamp` function did is:

```
public static Timestamp internalToTimestamp(long v) {
    return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```

So, if I give it an event time with unix timestamp 0, then I got the 
Timestamp(-2880). I am confused why `internalToTimestamp` need 
to subtract the offset?


Best,
wangsan


On 28 Nov 2017, at 11:32 PM, Xingcan Cui > wrote:


Hi wangsan,

in Flink, the ProcessingTime is just implemented by invoking 
System.currentTimeMillis() and the long value will be automatically 
wrapped to a Timestamp with the following statement:


`new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`

You can check your TimeZone.getDefault() to see if it returns the 
right TimeZone. Generally, the returned value should rely on the 
default TimeZone of your operating system.


Hope that helps.

Best,
Xingcan

On Tue, Nov 28, 2017 at 9:31 PM, wangsan > wrote:


Hi all,

While using Timestamp in Flint SQL, how can I set timezone
info? Since my current timezone is *GMT+8*, and I found the
selected processing time is always *8 hours* late than current
time. So as extracted event time.

Here’s my simplified code:

val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val sTableEnv = TableEnvironment.getTableEnvironment(senv)
println(s"current time: ${new SimpleDateFormat(".MM.dd HH:mm:ss.SSS", 
Locale.CHINA).format(new Date())}")

val stream:DataStream[(String, String, String)]= 
senv.socketTextStream("localhost", ).map(line => (line, line, line))
val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 
't.proctime)
sTableEnv.registerTable("foo", table)
val result = sTableEnv.sql("select * from foo")
result.printSchema()
result.toAppendStream[Row].print()

senv.execute("foo")

And here’s the result:



Best,
wangsan