Please try this:

val windowedStream = stream
  .keyBy(…)
  .window(TumblingEventTimeWindows./of/(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(new myTrigger)

val lateStream =   windowedStream.getSideOutput(lateTag); val aggregatedStream = windowedStream.aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount))


On 5/10/2021 9:56 PM, Slotterback, Chris wrote:

Hey Flink Users,

I am having some issues with getting sideOutputLateData to properly function with late event time reports. I have the following code that, per my understanding, should be allowing reports that fall after the window has triggered and beyond allowed lateness to pass through to the side output:

val lateTag = new OutputTag[…]("tag"){}

val windowedStream = stream
  .keyBy(…)
  .window(TumblingEventTimeWindows./of/(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(new myTrigger)
  .aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount))

val lateStream =   windowedStream.getSideOutput(lateTag);

trigger:

public class myTrigger extends Trigger<…>, Window> { @Override public TriggerResult onElement(…) throws Exception {return TriggerResult./CONTINUE/; } @Override public TriggerResult onProcessingTime(…) throws Exception { throw new Exception("processing time not supported"); } @Override public TriggerResult onEventTime(…) throws Exception {return TriggerResult./FIRE_AND_PURGE/; }

The main flow is functional when all reports are ontime, but when I start introducing late reports, they get rejected by the window but are failing to end up in the late stream. Is there something off with my understanding?

Chris


Reply via email to