Hi Rafi, I tried your approach with: > windowStream.trigger(ContinuousEventTimeTrigger.of(Time.minutes(5))); > > I can use .trigger with ProcessWindowFunction but it doesn't accumulate data across windows i.e I want to collect data for a 5h window with data sent to output every 5 mins with the output data getting accumulated after every 5 mins.
@Felipe- I am using a ProcessWindowFunction and cannot find a way to use process() & onTimer with it. On Sun, Jun 30, 2019 at 11:45 PM Felipe Gutierrez < felipe.o.gutier...@gmail.com> wrote: > No, there is no specific reason. > I am using it because I am computing the HyperLogLog over a window. > *--* > *-- Felipe Gutierrez* > > *-- skype: felipe.o.gutierrez* > *--* *https://felipeogutierrez.blogspot.com > <https://felipeogutierrez.blogspot.com>* > > > On Mon, Jul 1, 2019 at 12:34 AM Vijay Balakrishnan <bvija...@gmail.com> > wrote: > >> Hi Felipe, >> Thanks for the example. I will try a variation of that for mine. Is there >> a specific reason to use the HyperLogLogState ? >> >> Vijay >> >> On Tue, Jun 18, 2019 at 3:00 AM Felipe Gutierrez < >> felipe.o.gutier...@gmail.com> wrote: >> >>> Hi Vijay, >>> >>> I managed by using >>> "ctx.timerService().registerProcessingTimeTimer(timeoutTime);" on the >>> processElement method and clearing the state on the onTimer method. This is >>> my program [1]. >>> >>> [1] >>> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/WordHLLKeyedProcessWindowTwitter.java >>> >>> Kind Regards, >>> Felipe >>> *--* >>> *-- Felipe Gutierrez* >>> >>> *-- skype: felipe.o.gutierrez* >>> *--* *https://felipeogutierrez.blogspot.com >>> <https://felipeogutierrez.blogspot.com>* >>> >>> >>> On Mon, Jun 17, 2019 at 8:57 PM Rafi Aroch <rafi.ar...@gmail.com> wrote: >>> >>>> Hi Vijay, >>>> >>>> When using windows, you may use the 'trigger' to set a Custom Trigger >>>> which would trigger your *ProcessWindowFunction* accordingly. >>>> >>>> In your case, you would probably use: >>>> >>>>> *.trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(5)))* >>>>> >>>> >>>> Thanks, >>>> Rafi >>>> >>>> >>>> On Mon, Jun 17, 2019 at 9:01 PM Vijay Balakrishnan <bvija...@gmail.com> >>>> wrote: >>>> >>>>> I am also implementing the ProcessWindowFunction and accessing the >>>>> windowState to get data but how do i push data out every 5 mins during a 4 >>>>> hr time window ?? I am adding a globalState to handle the 4 hr window ??? >>>>> Or should I still use the context.windowState even for the 4 hr window ? >>>>> >>>>> public class MGroupingAggregateClass extends >>>>>> ProcessWindowFunction<....> { >>>>>> >>>>>> private MapState<String, Object> timedGroupKeyState; >>>>>> private MapState<String, Object> globalGroupKeyState; >>>>>> private final MapStateDescriptor<String, Object> >>>>>> timedMapKeyStateDescriptor = >>>>>> new MapStateDescriptor<>("timedGroupKeyState", >>>>>> String.class, Object.class); >>>>>> private final MapStateDescriptor<String, Object> >>>>>> globalMapKeyStateDescriptor = >>>>>> new MapStateDescriptor<>("globalGroupKeyState", >>>>>> String.class, Object.class); >>>>>> >>>>>> >>>>>> public void open(Configuration ..) { >>>>>> timedGroupKeyState = >>>>>> getRuntimeContext().getMapState(timedMapKeyStateDescriptor); >>>>>> globalGroupKeyState = >>>>>> getRuntimeContext().getMapState(globalMapKeyStateDescriptor); >>>>>> } >>>>>> >>>>>> public void process(MonitoringTuple currKey, Context context, >>>>>> Iterable<Map<String, Object>> elements, >>>>>> Collector<Map<String, Object>> out) throws >>>>>> Exception { >>>>>> logger.info("Entered MGroupingAggregateWindowProcessing - >>>>>> process interval:{}, currKey:{}", interval, currKey); >>>>>> timedGroupKeyState = >>>>>> context.windowState().getMapState(timedMapKeyStateDescriptor); >>>>>> globalGroupKeyState = >>>>>> context.globalState().getMapState(globalMapKeyStateDescriptor); >>>>>> ... >>>>>> //get data fromm state >>>>>> Object timedGroupStateObj = timedGroupKeyState.get(groupKey); >>>>>> >>>>>> //how do i push the data out every 5 mins to the sink during the 4 hr >>>>>> window ?? >>>>>> >>>>>> } >>>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Jun 17, 2019 at 10:06 AM Vijay Balakrishnan < >>>>> bvija...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> Need to calculate a 4 hour time window for count, sum with current >>>>>> calculated results being output every 5 mins. >>>>>> How do i do that ? >>>>>> Currently, I calculate results for 5 sec and 5 min time windows fine >>>>>> on the KeyedStream. >>>>>> >>>>>> Time timeWindow = getTimeWindowFromInterval(interval);//eg: >>>>>>> timeWindow = Time.seconds(timeIntervalL); >>>>>>> KeyedStream<Map<String, Object>, ...> monitoringTupleKeyedStream = >>>>>>> kinesisStream.keyBy(...); >>>>>>> final WindowedStream<Map<String, Object>, ...., TimeWindow> >>>>>>> windowStream = >>>>>>> monitoringTupleKeyedStream >>>>>>> .timeWindow(timeWindow); >>>>>>> DataStream<....> enrichedMGStream = windowStream.aggregate( >>>>>>> new MGroupingWindowAggregateClass(...), >>>>>>> new MGroupingAggregateClass(....)) >>>>>>> .map(new Monitoring...(...)); >>>>>>> enrichedMGStream.addSink(..); >>>>>>> >>>>>> >>>>>> >>>>>> TIA, >>>>>> Vijay >>>>>> >>>>>