Hi all,
I'm exploring Flink for our new project.

Currently I'm playing with Session Windows with dynamic Gap. In short, I
would like to be able to change the value of the gap on demand, for example
on config update. 

So I'm having this code:


messageStream
                .keyBy(tradeKeySelector)
                .window(ProcessingTimeSessionWindows.withDynamicGap(new 
                  SessionWindowTimeGapExtractor<EnrichedMessage>() {
                    @Override
                    public long extract(EnrichedMessage element) {
                       * // Try to dynamically change the gap here
                        // milliseconds.
                        return 5000;*
                    }
                }))
                .process(new CumulativeTransactionOperator())
                .name("Aggregate Transaction Builder");

I would assume something like "broadcast pattern" here, although this is
related to operators and we are interested with
SessionWindowTimeGapExtractor here.

Probably we will keep the gap size in a Flink State, not sure if it has to
be keyed state or "operator state". Updates will come from external system. 

So I guess, what i need here is actually an operator that will implements
SessionWindowTimeGapExtractor interface. Instance of this operator will
keep/update the state based on Config updates and returns the gap size like
SessionWindowTimeGapExtractor. 

Would it be a valid approach for this use case? Is it any other way to have
such a config in Flink state?







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to