Re: Use event time

2018-12-07 Thread Piotr Nowojski
You are welcome :)

More or less you are correct. Assigning event time doesn’t reorder anything in 
the stream, that’s just a meta information about a record that can be used by 
various functions/operators, not only by windowed operations. As I answered in 
“A question on the Flink "rolling" FoldFunction” topic, one could use event 
time for example for a "best effort" sorting mechanism (which buffers some 
number of elements and tries to smooth the stream out/best effort sort it), 
registering event time timers or for any other custom operation. 

Same applies to watermarks. They are also another kind of “meta data” being 
passed along that can be used and is for example be the before mentioned 
windowed operators to emit the data/clean up the state.

Piotrek 

> On 7 Dec 2018, at 11:19,   wrote:
> 
> Many thanks for sending your email.
>  
> Does this mean that the event time only impacts on the event selection for a 
> time window?
>  
> Without use of a time window, the event time has no impact on the order of 
> any records/events?
>  
> Is my understanding correct?
>  
> Thank you very much for your help.
>  
> Regards,
>  
> Min
>  
>  
>  
> From: Piotr Nowojski [mailto:pi...@data-artisans.com] 
> Sent: Freitag, 7. Dezember 2018 11:11
> To: Tan, Min
> Cc: user
> Subject: [External] Re: Use event time
>  
> Hi again!
>  
> Flink doesn’t order/sort the records according to event time. The preveiling 
> idea is:
> - records will be arriving out of order, operators should handle that
> - watermarks are used for indicators of the current lower bound of the event 
> time “clock”
>  
> For examples windowed joins/aggregations  assign records to one or more time 
> windows, collect all of the data belonging to a window and when watermark 
> exceeds/overtakes the window that when that window is being evaluated.
>  
> Piotrek 
> 
> 
> On 7 Dec 2018, at 09:22, min@ubs.com <mailto:min@ubs.com> wrote:
>  
> Hi,
>  
> I am new to Flink. 
>  
> I have the following small code to use the event time. I did not get the 
> result expected, i.e. it print out events in the order of event time.
>  
> Did I miss something here?
>  
> Regards,
>  
> Min
>  
>  
> --Event time--
>public static void main(String[] args) throws Exception  {
> 
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> long start =System.currentTimeMillis();
> DataStream stream = env.fromElements(new Event(0,start,start),
> new Event(1,start+10,start+10), new 
> Event(2,start+20,start-20),
> new Event(3,start+30,start-30), new 
> Event(4,start+40,start-40));
> 
> stream.map(event -> "RAW order " + event.toString()).print();
> 
> 
> stream.assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor() {
> @Override public long extractAscendingTimestamp(Event element) { 
> return element.time1; } })
> .map(event -> "time1 order:: " + event.toString()).print();
> 
> 
> stream.assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor() {
> @Override public long extractAscendingTimestamp(Event element) { 
> return element.time2; } })
> .map(event -> "time2 order:: " + event.toString()).print();
> 
> env.execute("event time ");
> }
> 
> 
> static public class Event {
> int id;
> long time1;
> long time2;
> 
> Event(int id, long time1, long time2){
> this <http://this.id/>. <http://this.id/>id <http://this.id/> =id;
> this.time1=time1;
> this.time2=time2;
> }
> 
> public String toString() {
> return "id=" + id + "; time1=" + time1 + "; time2=" + time2;
> }
> }
> }
> --
> 
> Check out our new brand campaign: www.ubs.com/together 
> <http://www.ubs.com/together>
> E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, 
> potential manipulation of contents and/or sender's address, incorrect 
> recipient (misdirection), viruses etc. Based on previous e-mail 
> correspondence with you and/or an agreement reached with you, UBS considers 
> itself authorized to contact you via e-mail. UBS assumes no responsibility 
> for any loss or damage resulting from the use of e-mails. 
> The recipient is aware of and accepts the inherent risks of using e-mails, in 
> particular the risk th

RE: Re: Use event time

2018-12-07 Thread min.tan
Many thanks for sending your email.

Does this mean that the event time only impacts on the event selection for a 
time window?

Without use of a time window, the event time has no impact on the order of any 
records/events?

Is my understanding correct?

Thank you very much for your help.

Regards,

Min



From: Piotr Nowojski [mailto:pi...@data-artisans.com]
Sent: Freitag, 7. Dezember 2018 11:11
To: Tan, Min
Cc: user
Subject: [External] Re: Use event time

Hi again!

Flink doesn’t order/sort the records according to event time. The preveiling 
idea is:
- records will be arriving out of order, operators should handle that
- watermarks are used for indicators of the current lower bound of the event 
time “clock”

For examples windowed joins/aggregations  assign records to one or more time 
windows, collect all of the data belonging to a window and when watermark 
exceeds/overtakes the window that when that window is being evaluated.

Piotrek


On 7 Dec 2018, at 09:22, min@ubs.com<mailto:min@ubs.com> wrote:

Hi,

I am new to Flink.

I have the following small code to use the event time. I did not get the result 
expected, i.e. it print out events in the order of event time.

Did I miss something here?

Regards,

Min


--Event time--
   public static void main(String[] args) throws Exception  {

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
long start =System.currentTimeMillis();
DataStream stream = env.fromElements(new Event(0,start,start),
new Event(1,start+10,start+10), new Event(2,start+20,start-20),
new Event(3,start+30,start-30), new Event(4,start+40,start-40));

stream.map(event -> "RAW order " + event.toString()).print();


stream.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor() {
@Override public long extractAscendingTimestamp(Event element) { 
return element.time1; } })
.map(event -> "time1 order:: " + event.toString()).print();


stream.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor() {
@Override public long extractAscendingTimestamp(Event element) { 
return element.time2; } })
.map(event -> "time2 order:: " + event.toString()).print();

env.execute("event time ");
}


static public class Event {
int id;
long time1;
long time2;

Event(int id, long time1, long time2){
this<http://this.id/>.<http://this.id/>id<http://this.id/> =id;
this.time1=time1;
this.time2=time2;
}

public String toString() {
return "id=" + id + "; time1=" + time1 + "; time2=" + time2;
}
}
}
--

Check out our new brand campaign: 
www.ubs.com/together<http://www.ubs.com/together>
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails.
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html


Check out our new brand campaign: www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html

Re: Use event time

2018-12-07 Thread Piotr Nowojski
Hi again!

Flink doesn’t order/sort the records according to event time. The preveiling 
idea is:
- records will be arriving out of order, operators should handle that
- watermarks are used for indicators of the current lower bound of the event 
time “clock”

For examples windowed joins/aggregations  assign records to one or more time 
windows, collect all of the data belonging to a window and when watermark 
exceeds/overtakes the window that when that window is being evaluated.

Piotrek 

> On 7 Dec 2018, at 09:22, min@ubs.com wrote:
> 
> Hi,
>  
> I am new to Flink. 
>  
> I have the following small code to use the event time. I did not get the 
> result expected, i.e. it print out events in the order of event time.
>  
> Did I miss something here?
>  
> Regards,
>  
> Min
>  
>  
> --Event time--
>public static void main(String[] args) throws Exception  {
> 
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> long start =System.currentTimeMillis();
> DataStream stream = env.fromElements(new Event(0,start,start),
> new Event(1,start+10,start+10), new 
> Event(2,start+20,start-20),
> new Event(3,start+30,start-30), new 
> Event(4,start+40,start-40));
> 
> stream.map(event -> "RAW order " + event.toString()).print();
> 
> stream.assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor() {
> @Override public long extractAscendingTimestamp(Event element) { 
> return element.time1; } })
> .map(event -> "time1 order:: " + event.toString()).print();
> 
> stream.assignTimestampsAndWatermarks(new 
> AscendingTimestampExtractor() {
> @Override public long extractAscendingTimestamp(Event element) { 
> return element.time2; } })
> .map(event -> "time2 order:: " + event.toString()).print();
> 
> env.execute("event time ");
> }
> 
> 
> static public class Event {
> int id;
> long time1;
> long time2;
> 
> Event(int id, long time1, long time2){
> this . id  =id;
> this.time1=time1;
> this.time2=time2;
> }
> 
> public String toString() {
> return "id=" + id + "; time1=" + time1 + "; time2=" + time2;
> }
> }
> }
> --
> 
> Check out our new brand campaign: www.ubs.com/together 
> 
> E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, 
> potential manipulation of contents and/or sender's address, incorrect 
> recipient (misdirection), viruses etc. Based on previous e-mail 
> correspondence with you and/or an agreement reached with you, UBS considers 
> itself authorized to contact you via e-mail. UBS assumes no responsibility 
> for any loss or damage resulting from the use of e-mails. 
> The recipient is aware of and accepts the inherent risks of using e-mails, in 
> particular the risk that the banking relationship and confidential 
> information relating thereto are disclosed to third parties.
> UBS reserves the right to retain and monitor all messages. Messages are 
> protected and accessed only in legally justified cases.
> For information on how UBS uses and discloses personal data, how long we 
> retain it, how we keep it secure and your data protection rights, please see 
> our Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html 
> 


Use event time

2018-12-07 Thread min.tan
Hi,

I am new to Flink.

I have the following small code to use the event time. I did not get the result 
expected, i.e. it print out events in the order of event time.

Did I miss something here?

Regards,

Min


--Event time--
   public static void main(String[] args) throws Exception  {

final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
long start =System.currentTimeMillis();
DataStream stream = env.fromElements(new Event(0,start,start),
new Event(1,start+10,start+10), new Event(2,start+20,start-20),
new Event(3,start+30,start-30), new Event(4,start+40,start-40));

stream.map(event -> "RAW order " + event.toString()).print();

stream.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor() {
@Override public long extractAscendingTimestamp(Event element) { 
return element.time1; } })
.map(event -> "time1 order:: " + event.toString()).print();

stream.assignTimestampsAndWatermarks(new 
AscendingTimestampExtractor() {
@Override public long extractAscendingTimestamp(Event element) { 
return element.time2; } })
.map(event -> "time2 order:: " + event.toString()).print();

env.execute("event time ");
}


static public class Event {
int id;
long time1;
long time2;

Event(int id, long time1, long time2){
this.id =id;
this.time1=time1;
this.time2=time2;
}

public String toString() {
return "id=" + id + "; time1=" + time1 + "; time2=" + time2;
}
}
}
--

Check out our new brand campaign: www.ubs.com/together
E-mails can involve SUBSTANTIAL RISKS, e.g. lack of confidentiality, potential 
manipulation of contents and/or sender's address, incorrect recipient 
(misdirection), viruses etc. Based on previous e-mail correspondence with you 
and/or an agreement reached with you, UBS considers itself authorized to 
contact you via e-mail. UBS assumes no responsibility for any loss or damage 
resulting from the use of e-mails. 
The recipient is aware of and accepts the inherent risks of using e-mails, in 
particular the risk that the banking relationship and confidential information 
relating thereto are disclosed to third parties.
UBS reserves the right to retain and monitor all messages. Messages are 
protected and accessed only in legally justified cases.
For information on how UBS uses and discloses personal data, how long we retain 
it, how we keep it secure and your data protection rights, please see our 
Privacy Notice http://www.ubs.com/global/en/legalinfo2/privacy.html