Hi Tony, Won't that increase the amount of processing Flink has to do? It would have to window twice, right?
Thanks, Tomasz On 23 August 2017 at 11:02, Tony Wei <tony19920...@gmail.com> wrote: > Hi Tomasz, > > In my opinion, I would move .window() function down to these two DataStream. > (rawEvent.window().reduce().map(), and so does metrics) > It makes sure that they won't share the same constructor. > > Regards, > Tony Wei > > 2017-08-23 17:51 GMT+08:00 Tomasz Dobrzycki <dobrzycki.tom...@gmail.com>: >> >> Hi Tony, >> >> Thank you for your answer, it definitely helps with understanding this >> situation. >> Is there any reliable way to split the stream so I get 2 outputs that >> avoids this behaviour? Eventually I want to have 2 sinks that output >> different data (one being just a copy of the stream, but organised in >> session windows and the other being metrics which I derive from the >> data itself). >> >> Thanks, >> Tomasz >> >> On 23 August 2017 at 10:32, 魏偉哲 <tony19920...@gmail.com> wrote: >> > Hi Tomasz, >> > >> > I think this is because .window() is a lazy operator. >> > It just creates a WindowedStream class but not create a corresponding >> > operator. >> > The operator will be created after you called .reduce() and .apply(). >> > >> > rawEvents and metrics actually shared the same object to create their >> > own >> > operators. >> > You can see the detail in WindowedStream.trigger() that it only set >> > this.trigger = trigger and then return iteself. >> > Because of this, when you used the same object to create operator for >> > rawEvents, it took the same settings for both WindowAssigner and Trigger >> > as >> > well. >> > That's why you changed the order then the behavior changed as well. >> > >> > Hope this will help you. >> > >> > Regards, >> > Tony Wei >> > >> > 2017-08-17 16:25 GMT+08:00 Tomasz Dobrzycki >> > <dobrzycki.tom...@gmail.com>: >> >> >> >> Hi, >> >> >> >> I'm working on a custom trigger that is supposed to trigger >> >> periodically and at the end of session window. These are the main >> >> methods from my trigger: >> >> >> >> public TriggerResult onElement(Object element, long timestamp, >> >> TimeWindow window, TriggerContext ctx) throws Exception { >> >> long currentTime = System.currentTimeMillis(); >> >> if (currentTime - lastTriggerTime >= this.delay) { >> >> lastTriggerTime = currentTime; >> >> return TriggerResult.FIRE; >> >> } else { >> >> return TriggerResult.CONTINUE; >> >> } >> >> } >> >> >> >> public TriggerResult onEventTime(long time, TimeWindow window, >> >> TriggerContext ctx) { >> >> return time == window.maxTimestamp() ? >> >> TriggerResult.FIRE : >> >> TriggerResult.CONTINUE; >> >> } >> >> >> >> When I use this trigger in my main processing method, I'm getting >> >> unexpected behaviour. This is how I use it: >> >> >> >> // MAIN PROCESSING >> >> WindowedStream<EventTags, Tuple, TimeWindow> sessionWindow = dataStream >> >> .map(new ParseEvent()) >> >> .filter(new Filter()) >> >> .assignTimestampsAndWatermarks(new >> >> BoundedOutOfOrdernessTimestampExtractor<EventTags>(Time.minutes(5)) { >> >> @Override >> >> public long extractTimestamp(EventTags event) { >> >> return event.receivedAt; >> >> } >> >> }) >> >> .keyBy("streamKeys") >> >> >> >> .window(EventTimeSessionWindows.withGap(Time.minutes(5))); >> >> >> >> // WARNING! This has to go before periodic triggered metrics as Flink >> >> will trigger this as well >> >> // if it comes second >> >> DataStream<String> rawEvents = sessionWindow >> >> .reduce(new CollectRawData()) >> >> .map(new ParseRawData()); >> >> >> >> DataStream<String> metrics = sessionWindow >> >> .trigger(SessionTrigger.every(Time.milliseconds(2))) >> >> .apply(new ExtractMetrics()); >> >> >> >> >> >> This works as expected, rawEvents is calculated when the session >> >> window is completed and metrics is calculated periodically and at the >> >> windows end. But if I change the order of rawEvents and metrics (code >> >> should work the same in my mind), rawEvents is also triggered >> >> periodically. Is this expected to work this way? I'm not assigning >> >> periodic trigger to rawEvents. Thanks for your help. >> >> >> >> Kind Regards, >> >> Tomasz >> > >> > > >