Hi all,
In my pipeline setup I cannot see side outputs for Session Window (Flink
1.9.1)

What I have is:


messageStream.
    .keyBy(tradeKeySelector)
    .window(ProcessingTimeSessionWindows.withDynamicGap(new
TradeAggregationGapExtractor()))
    .sideOutputLateData(lateTradeMessages)
    .process(new CumulativeTransactionOperator())
    .name("Aggregate Transaction Builder");

lateTradeMessages implementes SessionWindowTimeGapExtractor and returns 5
secodns.

Further I have:

messageStream.getSideOutput(lateTradeMessages)
  .keyBy(tradeKeySelector)
  .process(new KeyedProcessFunction<Long, EnrichedMessage, Transaction>() {
     @Override
     public void processElement(EnrichedMessage value, Context ctx,
Collector<Transaction> out) throws Exception {
                   System.out.println("Process Late messages For
Aggregation");
                   out.collect(new Transaction());
                }
       })
   .name("Process Late messages For Aggregation");


The problem is that I never see "Process Late messages For Aggregation" when
Im sending Messages with same key. 

When Session Window passes and I "immediately" sent a new message for the
same Key it triggerts new Session Window, without going into Late Side
Output.

Not sure What I'm doing wrong here.

What I would like to achieve heve is to catch "late events" and try to
reprocess them againts state that was builder for "on time" events for this
Window or if its is impossible, report late events into special Sink.

I will appreciate any help.
However it seems I do not see have any late Events.







--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to