maxOutOfOrderness

2018-09-07 Thread Nicos Maris
Hello,


Does maxOutOfOrderness
<https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#with-periodic-watermarks>
affect performance?

Setting it to Long.MAX_VALUE doesn't affect performance, so either flink is
really fast in my simple pipeline or my understanding is terribly wrong :p


Re: maxOutOfOrderness

2018-09-07 Thread Hequn Cheng
Hi Nicos,

Setting it to Long.MAX_VALUE makes watermark always smaller than timestamp.
In this case, the event time window will never be triggered. It is
meaningless.

Best, Hequn

On Fri, Sep 7, 2018 at 11:17 PM Nicos Maris  wrote:

> Hello,
>
>
> Does maxOutOfOrderness
> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_timestamps_watermarks.html#with-periodic-watermarks>
> affect performance?
>
> Setting it to Long.MAX_VALUE doesn't affect performance, so either flink
> is really fast in my simple pipeline or my understanding is terribly wrong
> :p
>


flink eventTime, lateness, maxoutoforderness

2017-12-16 Thread chen
eventTime, lateness,  maxoutoforderness are all about time.
event Time is the water mark time on the record.
lateness is record time or the real word time?
maxoutoforderness is record time or the real word time?

dataStream.keyBy(row -> (String)row.getField(0))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
 .allowedLateness(Time.seconds(5))
 .fold(initRow(), new MyFoldFunction())

public Watermark getCurrentWatermark() {
return new Watermark(currentTime - 5000);}

Does anyone could explain the time of eventTime,lateness,maxoutoforderness?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink eventTime, lateness, maxoutoforderness

2017-12-16 Thread Eron Wright
Take a look at the section of Flink documentation titled "Event Time and
Watermarks":
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_time.html#event-time-and-watermarks

Also read the excellent series "Streaming 101" and "102", has useful
animations depicting the flow of time:
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

Think of the watermark as a clock, ticking along due to information from
the connector or from a watermark generator.

Hope this helps,
Eron


On Sat, Dec 16, 2017 at 8:07 AM, chen  wrote:

> eventTime, lateness,  maxoutoforderness are all about time.
> event Time is the water mark time on the record.
> lateness is record time or the real word time?
> maxoutoforderness is record time or the real word time?
>
> dataStream.keyBy(row -> (String)row.getField(0))
> .window(TumblingEventTimeWindows.of(Time.seconds(5)))
>  .allowedLateness(Time.seconds(5))
>  .fold(initRow(), new MyFoldFunction())
>
> public Watermark getCurrentWatermark() {
> return new Watermark(currentTime - 5000);}
>
> Does anyone could explain the time of eventTime,lateness,
> maxoutoforderness?
>
>
>
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/
>


Re: flink eventTime, lateness, maxoutoforderness

2017-12-18 Thread Tzu-Li (Gordon) Tai
Hi,

lateness is record time or the real word time? 
maxoutoforderness is record time or the real word time? 

Both allow lateness of window operators, or maxOutOfOrderness of the 
BoundedOutOfOrdernessTimestampExtractor, refer to event time.

i.e.,
- given the end timestamp of a window is x (in event time) and allowed lateness 
is y, the window state is cleared only when the current watermark of the window 
operator passes x+y
- the BoundedOutOfOrdernessTimestampExtractor emits watermarks that lag behind 
the max record timestamp (in event time) by a fixed amount of time (again, in 
event time).

Hope this clarifies things for you!

Cheers,
Gordon
On 16 December 2017 at 8:07:15 AM, chen (eric__...@126.com) wrote:

eventTime, lateness, maxoutoforderness are all about time.  
event Time is the water mark time on the record.  
lateness is record time or the real word time?  
maxoutoforderness is record time or the real word time?  

dataStream.keyBy(row -> (String)row.getField(0))  
.window(TumblingEventTimeWindows.of(Time.seconds(5)))  
.allowedLateness(Time.seconds(5))  
.fold(initRow(), new MyFoldFunction())  

public Watermark getCurrentWatermark() {  
return new Watermark(currentTime - 5000);}  

Does anyone could explain the time of eventTime,lateness,maxoutoforderness?  



--  
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/ 
 


Re: flink eventTime, lateness, maxoutoforderness

2017-12-22 Thread chen
Hi Eron,
Thanks for your help. Actually I know maxoutoforder, lateness is based
on Event Time. But in my test it is not. Following is my code and test data.
 "key1|148325064|",
 "key1|1483250636000|",
 "key1|1483250649000|",
 "key1|1483250642000|",
 "key1|148325065|",
 "key1|1483250641000|",
 "key1|1483250653000|",
 "key1|1483250648000|",
 "key1|1483250645000|",
 "key1|1483250658000|",
 "key1|1483250647000|",
 "key1|1483250643000|",
 "key1|1483250661000|",
 "key1|1483250662000|",
 "key1|1483250667000|",
 "key1|1483250663000|",

 dataStream.keyBy(row -> (String)row.getField(0))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(5))
.fold(initRow(), new FoldFunction() {
@Override
public Row fold(Row ret, Row o) throws Exception {
ret.setField(0, (int)ret.getField(0) + 1);
ret.setField(1, (String)ret.getField(1)+
o.getField(1) + "|");
return  ret;
}
})

1. Send Data *WITHOUT*Thread.sleep(), the result is like this :
 1,1483250636000|
 4,148325064|1483250642000|1483250641000|1483250643000|
 4,1483250649000|1483250648000|1483250645000|1483250647000|
 2,148325065|1483250653000|
 1,1483250658000|
 3,1483250661000|1483250662000|1483250663000|
 1,1483250667000|
2. Send Data WITH Thread.sleep(), the result is like this, we will see the
function of allowedLateness, it will trigger the window to calculate again,
the result will come out again.
  1,1483250636000|
  1,148325064|
  2,148325064|1483250642000|
  1,1483250649000|
  2,1483250649000|1483250648000|
  3,1483250649000|1483250648000|1483250645000|
  2,148325065|1483250653000|
  1,1483250658000|
  2,1483250661000|1483250662000|
  3,1483250661000|1483250662000|1483250663000|
  1,1483250667000|




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink eventTime, lateness, maxoutoforderness

2017-12-22 Thread chen

CODE with maxOutOfOrdernesstime effect:
dataStream.keyBy(row -> (String)row.getField(0))
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.fold(initRow(), new FoldFunction() {
@Override
public Row fold(Row ret, Row o) throws Exception {
ret.setField(0, (int)ret.getField(0) + 1);
ret.setField(1, (String)ret.getField(1)+
o.getField(1) + "|");
return  ret;
}
});
 public Watermark getCurrentWatermark(){
  return new Watermark(currentTime - 5000);}

1. Send Data *WITHOUT*Thread.sleep(), the result is like this :  
1,1483250636000|
4,148325064|1483250642000|1483250641000|1483250643000|
4,1483250649000|1483250648000|1483250645000|1483250647000|
2,148325065|1483250653000|
1,1483250658000|
3,1483250661000|1483250662000|1483250663000|
1,1483250667000|

2. Send Data WITH Thread.sleep(), the result is like this, we will see the
function of maxOutOfOrdernesstime, it will delay calculate, then coming out
result.
1,1483250636000|
2,148325064|1483250642000|
3,1483250649000|1483250648000|1483250645000|
2,148325065|1483250653000|
1,1483250658000|
3,1483250661000|1483250662000|1483250663000|
1,1483250667000|

I don`t know how to explain the eventTime, lateness, maxOutOfOrderness.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: flink eventTime, lateness, maxoutoforderness

2017-12-22 Thread chen
Thanks Gordon, Please see the rely. I use code, but the result it doesn`t
like what the doc explain.




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/