Hi Xuyang,

So if I check the side output way then my pipeline would be something like
this:

final OutputTag<Data> lateOutputTag = new OutputTag<Data>("late-data"){};

SingleOutputStreamOperator<ReducedData>  reducedDataStream =

dataStream
    .keyBy(new MyKeySelector())
    .window(TumblingEventTimeWindows.of(Time.seconds(60)))
    .allowedLateness(Time.seconds(180))

     .sideOutputLateData(lateOutputTag)
    .reduce(new MyDataReducer());


DataStream<Data> lateStream = reducedDataStream.getSideOutput
(lateOutputTag);

lateStream
    .keyBy(new MyKeySelector())
    .window(TumblingEventTimeWindows.of(Time.seconds(60)))
    .allowedLateness(Time.seconds(480))

    .reduce(new MyDataReducer());


So basically I collect all the late data into another stream and apply the
same transformations again on it, to get reduced data from this late data.
Is this the correct handling for having reduced data from late data only ?

Also I have a few more queries
1. Now for this late records stream not having to drop records, I would
have to set allowed lateness to be of a larger value than what I had set in
the first stream transformation ?
   Basically do I need to set any allowed lateness for the window operation
of the late data stream if I want to also reduce them the same way as in
time records ?

2. Also when I collect late data as side output, would the reduced function
now only contain the data reduced from in time records only and no late
records would be included in the subsequent reduced data.

Basically after this the output of reduced data will only contain:

[ reduceData (d1, d2, d3) ]

and not any data like:
reducedData(d1, d2, d3, late d4, late d5)  or reducedData(d1, d2, d3, late
d4, late d5, late d6)

And transformation of lata data stream would now contain reduced data from:

[ reducedData(late d4, late d5, late d6) ]

Thanks
Sachin


On Wed, Apr 17, 2024 at 4:05 PM Xuyang <xyzhong...@163.com> wrote:

> Hi, Sachin.
>
> IIUC, it is in the second situation you listed, that is:
> [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late
> d6) ].
> However, because of `table.exec.emit.late-fire.delay`, it could also be
> such as
> [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5),
> reducedData(ld1, d2, d3, late d4, late d5, late d6) ]
>
> Actually, allow-lateness(table.exec.emit.allow-lateness) is used to
> control when it decides not to change the value of the window output, and
> allowing the framework to automatically clear the corresponding state.
>
> > Also if I want the reduced data from late records to not include the
> data emitted within the window bounds, how can I do the same ?
> or if this is handled as default case ?
>
> Maybe side output[1] can help you to collect the late data and re-compute
> them.
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/
>
> --
>     Best!
>     Xuyang
>
>
> At 2024-04-17 16:56:54, "Sachin Mittal" <sjmit...@gmail.com> wrote:
>
> Hi,
>
> Suppose my pipeline is:
>
> data
>     .keyBy(new MyKeySelector())
>     .window(TumblingEventTimeWindows.of(Time.seconds(60)))
>     .allowedLateness(Time.seconds(180))
>     .reduce(new MyDataReducer())
>
>
> So I wanted to know if the final output stream would contain reduced data
> at the end of the window mark and also another reduced data at the end of
> allowed lateness ?
> If that is the case, then the reduced data at the end of allowed lateness
> would also include the data from non late records or it will only include
> reduced data from late records.
>
> Example
>
> If I have data in sequence:
>
> [window start], d1, d2, d3, [window end], late d4, late d5, late d6, [end
> of allowed lateness]
>
> The resultant stream after window and reduce operation would be:
>
> [ reduceData (d1, d2, d3), reducedData(late d4, late d5, late d6) ]
>
> or
> [ reduceData (d1, d2, d3), reducedData(ld1, d2, d3, late d4, late d5, late
> d6) ]
>
> or something else ?
>
> Also if I want the reduced data from late records to not include the data
> emitted within the window bounds, how can I do the same ?
> or if this is handled as default case ?
>
> Thanks
> Sachin
>
>
>
>
>

Reply via email to