Hi Chesnay,

That doesn’t compile, as WindowedStream doesn’t have the operator 
getSideOutput, only SingleOutputStreamOperator has that operation.

Chris

From: Chesnay Schepler <ches...@apache.org>
Date: Tuesday, May 11, 2021 at 6:09 AM
To: "Slotterback, Chris" <chris_slotterb...@comcast.com>, 
"user@flink.apache.org" <user@flink.apache.org>
Subject: [EXTERNAL] Re: sideOutputLateData not propagating late reports once 
window expires

Please try this:

val windowedStream = stream
  .keyBy(…)
  .window(TumblingEventTimeWindows.of(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(new myTrigger)


val lateStream =

  windowedStream.getSideOutput(lateTag);



val aggregatedStream = windowedStream.aggregate(new 
VSGBondingGroupMediansAggregator(packetFilterCount))

On 5/10/2021 9:56 PM, Slotterback, Chris wrote:
Hey Flink Users,

I am having some issues with getting sideOutputLateData to properly function 
with late event time reports. I have the following code that, per my 
understanding, should be allowing reports that fall after the window has 
triggered and beyond allowed lateness to pass through to the side output:


val lateTag = new OutputTag[…]("tag"){}

val windowedStream = stream
  .keyBy(…)
  .window(TumblingEventTimeWindows.of(…))
  .allowedLateness(…)
  .sideOutputLateData(lateTag)
  .trigger(new myTrigger)
  .aggregate(new VSGBondingGroupMediansAggregator(packetFilterCount))


val lateStream =

  windowedStream.getSideOutput(lateTag);


trigger:

public class myTrigger extends Trigger<…>, Window> {

    @Override

    public TriggerResult onElement(…) throws Exception {

        return TriggerResult.CONTINUE;

    }



    @Override

    public TriggerResult onProcessingTime(…) throws Exception {

        throw new Exception("processing time not supported");

    }



    @Override

    public TriggerResult onEventTime(…) throws Exception {

        return TriggerResult.FIRE_AND_PURGE;

    }

The main flow is functional when all reports are ontime, but when I start 
introducing late reports, they get rejected by the window but are failing to 
end up in the late stream. Is there something off with my understanding?

Chris


Reply via email to