Re: periodic trigger

2018-01-03 Thread Piotr Nowojski
Hi, Sorry for late response (because of the holiday period). You didn’t mention lateness previously, that’s why I proposed such solution. Another approach would be to calculate max session length per user on the first aggregation level and at the same time remember what was the previously emi

Re: periodic trigger

2017-12-22 Thread Plamen Paskov
I think it will not solve the problem as if i set ContinuousEventTimeTrigger to 10 seconds and allowedLateness(Time.seconds(60)) as i don't want to discard events from different users received later then i might receive more than one row for a single user based on the number of windows created

Re: periodic trigger

2017-12-22 Thread Piotr Nowojski
Ok, I think now I understand your problem. Wouldn’t it be enough, if you change last global window to something like this: lastUserSession .timeWindowAll(Time.seconds(10)) .aggregate(new AverageSessionLengthAcrossAllUsers()) .print(); (As a side note, maybe you should us

Re: periodic trigger

2017-12-21 Thread Plamen Paskov
Imagine a case where i want to run a computation every X seconds for 1 day window. I want the calculate average session length for current day every X seconds. Is there an easy way to achieve that? On 21.12.2017 16:06, Piotr Nowojski wrote: Hi, You defined a tumbling window (https://ci.apac

Re: periodic trigger

2017-12-21 Thread Piotr Nowojski
Hi, You defined a tumbling window (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#tumbling-windows ) of 60 seconds, triggered every 10 seconds. This means that each inpu

periodic trigger

2017-12-21 Thread Plamen Paskov
Hi guys, I have the following code: SingleOutputStreamOperator lastUserSession = env .socketTextStream("localhost",9000,"\n") .map(new MapFunction() { @Override public Event map(String value)throws Exception { String[] row = value.split(",");

Re: Unexpected behaviour of a periodic trigger.

2017-08-23 Thread Tomasz Dobrzycki
.filter(new Filter()) >> >> >> .assignTimestampsAndWatermarks(new >> >> >> BoundedOutOfOrdernessTimestampExtractor(Time.minutes(5)) >> >> >> { >> >> >> @Override >> >> >> public long extractTimestamp(EventTags event) { >> >> >> return event.receivedAt; >> >> >> } >> >> >> }) >> >> >> .keyBy("streamKeys") >> >> >> >> >> >> .window(EventTimeSessionWindows.withGap(Time.minutes(5))); >> >> >> >> >> >> // WARNING! This has to go before periodic triggered metrics as >> >> >> Flink >> >> >> will trigger this as well >> >> >> // if it comes second >> >> >> DataStream rawEvents = sessionWindow >> >> >> .reduce(new CollectRawData()) >> >> >> .map(new ParseRawData()); >> >> >> >> >> >> DataStream metrics = sessionWindow >> >> >> .trigger(SessionTrigger.every(Time.milliseconds(2))) >> >> >> .apply(new ExtractMetrics()); >> >> >> >> >> >> >> >> >> This works as expected, rawEvents is calculated when the session >> >> >> window is completed and metrics is calculated periodically and at >> >> >> the >> >> >> windows end. But if I change the order of rawEvents and metrics >> >> >> (code >> >> >> should work the same in my mind), rawEvents is also triggered >> >> >> periodically. Is this expected to work this way? I'm not assigning >> >> >> periodic trigger to rawEvents. Thanks for your help. >> >> >> >> >> >> Kind Regards, >> >> >> Tomasz >> >> > >> >> > >> > >> > > >

Re: Unexpected behaviour of a periodic trigger.

2017-08-23 Thread Tony Wei
t; >> .assignTimestampsAndWatermarks(new > >> >> BoundedOutOfOrdernessTimestampExtractor(Time.minutes(5)) > { > >> >> @Override > >> >> public long extractTimestamp(EventTags event) { > >>

Re: Unexpected behaviour of a periodic trigger.

2017-08-23 Thread Tomasz Dobrzycki
gt; >> .window(EventTimeSessionWindows.withGap(Time.minutes(5))); >> >> >> >> // WARNING! This has to go before periodic triggered metrics as Flink >> >> will trigger this as well >> >> // if it comes second >> >> DataStream rawEvents = sessionWindow >> >> .reduce(new CollectRawData()) >> >> .map(new ParseRawData()); >> >> >> >> DataStream metrics = sessionWindow >> >> .trigger(SessionTrigger.every(Time.milliseconds(2))) >> >> .apply(new ExtractMetrics()); >> >> >> >> >> >> This works as expected, rawEvents is calculated when the session >> >> window is completed and metrics is calculated periodically and at the >> >> windows end. But if I change the order of rawEvents and metrics (code >> >> should work the same in my mind), rawEvents is also triggered >> >> periodically. Is this expected to work this way? I'm not assigning >> >> periodic trigger to rawEvents. Thanks for your help. >> >> >> >> Kind Regards, >> >> Tomasz >> > >> > > >

Re: Unexpected behaviour of a periodic trigger.

2017-08-23 Thread Tony Wei
DataStream rawEvents = sessionWindow > >> .reduce(new CollectRawData()) > >> .map(new ParseRawData()); > >> > >> DataStream metrics = sessionWindow > >> .trigger(SessionTrigger.every(Time.milliseconds(2))) > >> .apply(new ExtractMetrics()); > >> > >> > >> This works as expected, rawEvents is calculated when the session > >> window is completed and metrics is calculated periodically and at the > >> windows end. But if I change the order of rawEvents and metrics (code > >> should work the same in my mind), rawEvents is also triggered > >> periodically. Is this expected to work this way? I'm not assigning > >> periodic trigger to rawEvents. Thanks for your help. > >> > >> Kind Regards, > >> Tomasz > > > > >

Re: Unexpected behaviour of a periodic trigger.

2017-08-23 Thread Tomasz Dobrzycki
; .map(new ParseRawData()); >> >> DataStream metrics = sessionWindow >> .trigger(SessionTrigger.every(Time.milliseconds(2))) >> .apply(new ExtractMetrics()); >> >> >> This works as expected, rawEvents is calculated when the session >> window is completed and metrics is calculated periodically and at the >> windows end. But if I change the order of rawEvents and metrics (code >> should work the same in my mind), rawEvents is also triggered >> periodically. Is this expected to work this way? I'm not assigning >> periodic trigger to rawEvents. Thanks for your help. >> >> Kind Regards, >> Tomasz > >

Re: Unexpected behaviour of a periodic trigger.

2017-08-23 Thread 魏偉哲
y(Time.milliseconds(2))) > .apply(new ExtractMetrics()); > > > This works as expected, rawEvents is calculated when the session > window is completed and metrics is calculated periodically and at the > windows end. But if I change the order of rawEvents and metrics (code

Unexpected behaviour of a periodic trigger.

2017-08-17 Thread Tomasz Dobrzycki
way? I'm not assigning periodic trigger to rawEvents. Thanks for your help. Kind Regards, Tomasz