+user@ ________________________________ From: Simone Cavallarin <cavalla...@hotmail.com> Sent: 13 November 2020 16:46 To: Aljoscha Krettek <aljos...@apache.org> Subject: Re: How to use EventTimeSessionWindows.withDynamicGap()
Hi Aljoscha, When you said: You could use a stateful operation (like a ProcessFunction) to put a dynamic "gap" into the records and then use that gap with EventTimeSessionWindows. I understand the theory but I'm struggling to put in practice in code terms. <https://stackoverflow.com/questions/61960485/flink-session-window-not-triggered-even-with-continuouseventtimetrigger> stream = steam .keyBy(new MyKeySelector()) .window(EventTimeSessionWindows.withDynamicGap(new DynamicWindowGapExtractor())) .sideOutputLateData(lateDataSideOutputTag) .trigger(ContinuousEventTimeTrigger.of(Time.minutes(10))) // in case some key is continuously coming within the session window gap .process(new ProcessWindowFunction(……)); Where ProcessWindowFunction(……)update a parameter that is used inside DynamicWindowGapExtractor()... I found this on the following link: https://stackoverflow.com/questions/61960485/flink-session-window-not-triggered-even-with-continuouseventtimetrigger If you could help me with some examples where i can read some code it would be so helpful. Thanks! ________________________________ From: Aljoscha Krettek <aljos...@apache.org> Sent: 13 November 2020 09:43 To: user@flink.apache.org <user@flink.apache.org> Subject: Re: How to use EventTimeSessionWindows.withDynamicGap() Yes, you're right that Flink can do this with session windows but the assignment will be static. In general, the smaller the session gap (or session timeout) the fewer windows there will be. You're also right that you would have to somehow maintain information about how dense you records are in time and then use that to adjust the session gap. So you could use a stateful operation (like a ProcessFunction) to put a dynamic "gap" into the records and then use that gap with EventTimeSessionWindows. Best, Aljoscha On 12.11.20 18:16, Simone Cavallarin wrote: > Hi Aljoscha, > > Yes correct i would like to have more windows when there are more events for > a given time frame. That is when > the events are more dense in time. I can calculate the time difference > between each event and create a parameter that can create windows of > different sizes dynamically based on past events. Maybe on the beginning it > will be starting for a fix parameter but then the parameter should be > learning and accommodate the data accordingly > > Could you please give me an example on how to set the timeout? > > I have been reading all around and I'm a bit confused. I thought that flink > can create more windows when the events are more dense in time quite easily > (https://apc01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.ververica.com%2Fblog%2Fsession-windowing-in-flink&data=04%7C01%7C%7Cdb1c633bb89c45e523ac08d887b8a636%7C84df9e7fe9f640afb435aaaaaaaaaaaa%7C1%7C0%7C637408574413261082%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000&sdata=BniIILdwiAykEhRIOd5ZdaRl%2Ftvhnr2Q88SeCnWxrT4%3D&reserved=0 > ). > > [cid:85daf58a-bc3e-4f39-94c2-d14fe2bf9c16] > > To avoid having the successive sessions become bigger and bigger so should I > create a cap for example 1 min? > > Many thanks for the help! > Best > Simon > > ________________________________ > From: Aljoscha Krettek <aljos...@apache.org> > Sent: 12 November 2020 16:34 > To: user@flink.apache.org <user@flink.apache.org> > Subject: Re: How to use EventTimeSessionWindows.withDynamicGap() > > Hi, > > I'm not sure that what you want is possible. You say you want more > windows when there are more events for a given time frame? That is when > the events are more dense in time? > > Also, using the event timestamp as the gap doesn't look correct. The gap > basically specifies the timeout for a session (and I now realize that > maybe "gap" is not a good word for that). So if your timeout increases > as time goes on your successive sessions will just get bigger and bigger. > > Best, > Aljoscha > > On 12.11.20 15:56, Simone Cavallarin wrote: >> Hi All, >> >> I'm trying to use EventTimeSessionWindows.withDynamicGap in my application. >> I have understood that the gap is computed dynamically by a function on each >> element. What I should be able to obtain is a Flink application that can >> automatically manage the windows based on the frequency of the data. (if I >> have understood correctly) >> >> But I'm wondering if there is any parameter to adjust the computation to do >> more windows or less windows considering the same data. >> >> I have my event that provide "millis" of which I would like to pass to the >> function but I don't understand how, for the moment I'm trying with the code >> below but no luck.. Can you please give me some help? Thanks! >> >> >> FlinkKafkaConsumer<Event> kafkaData = >> new FlinkKafkaConsumer("CorID_1", new >> EventDeserializationSchema(), p); >> WatermarkStrategy<Event> wmStrategy = >> WatermarkStrategy >> .<Event>forMonotonousTimestamps() >> .withIdleness(Duration.ofMinutes(1)) >> .withTimestampAssigner((event, timestamp) -> { >> return event.get_Time(); >> >> }); >> >> DataStream<Event> stream = env.addSource( >> kafkaData.assignTimestampsAndWatermarks(wmStrategy)); >> >> >> DataStream<Event> Data = stream >> .keyBy((Event ride) -> ride.CorrID) >> .window(EventTimeSessionWindows.withDynamicGap((event)->{ >> return event.get_Time();})); >> >> >> >> Where from the load of the message which i receive from Kafka i convert the >> date time in millis. >> >> public long get_Time() { >> long tn = OffsetDateTime.parse(a_t_rt).toInstant().toEpochMilli(); >> this.millis = tn; >> return millis; >> } >> public void set_a_t_rt(String a_t_rt) { >> this.a_t_rt = a_t_rt; >> } >> >> >> >> >> > >