Please try this:
val windowedStream = stream
.keyBy(…)
.window(TumblingEventTimeWindows./of/(…))
.allowedLateness(…)
.sideOutputLateData(lateTag)
.trigger(new myTrigger)
val lateStream = windowedStream.getSideOutput(lateTag); val
aggregatedStream = windowedStream.aggregate(new
VSGBondingGroupMediansAggregator(packetFilterCount))
On 5/10/2021 9:56 PM, Slotterback, Chris wrote:
Hey Flink Users,
I am having some issues with getting sideOutputLateData to properly
function with late event time reports. I have the following code that,
per my understanding, should be allowing reports that fall after the
window has triggered and beyond allowed lateness to pass through to
the side output:
val lateTag = new OutputTag[…]("tag"){}
val windowedStream = stream
.keyBy(…)
.window(TumblingEventTimeWindows./of/(…))
.allowedLateness(…)
.sideOutputLateData(lateTag)
.trigger(new myTrigger)
.aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount))
val lateStream = windowedStream.getSideOutput(lateTag);
trigger:
public class myTrigger extends Trigger<…>, Window> { @Override public
TriggerResult onElement(…) throws Exception {return
TriggerResult./CONTINUE/; } @Override public TriggerResult onProcessingTime(…) throws
Exception { throw new Exception("processing time not supported"); }
@Override public TriggerResult onEventTime(…) throws Exception {return
TriggerResult./FIRE_AND_PURGE/; }
The main flow is functional when all reports are ontime, but when I
start introducing late reports, they get rejected by the window but
are failing to end up in the late stream. Is there something off with
my understanding?
Chris