[ 
https://issues.apache.org/jira/browse/FLINK-13148?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-13148:
-----------------------------------
      Labels: auto-deprioritized-major auto-unassigned  (was: auto-unassigned 
stale-major)
    Priority: Minor  (was: Major)

This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Expose WindowedStream.sideOutputLateData() from CoGroupedStreams
> ----------------------------------------------------------------
>
>                 Key: FLINK-13148
>                 URL: https://issues.apache.org/jira/browse/FLINK-13148
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.9.0
>            Reporter: Congxian Qiu
>            Priority: Minor
>              Labels: auto-deprioritized-major, auto-unassigned
>
> As FLINK-10050 supported {{alloedLateness}}, but we can not get the side 
> output containing the late data, this issue wants to fix it.
> For implementation, I want to add an input parameter {{OutputTag}} in 
> {{WithWindow}} as following
> {code:java}
> protected WithWindow(DataStream<T1> input1,
>     DataStream<T2> input2,
>     KeySelector<T1, KEY> keySelector1,
>     KeySelector<T2, KEY> keySelector2,
>     TypeInformation<KEY> keyType,
>     WindowAssigner<? super TaggedUnion<T1, T2>, W> windowAssigner,
>     Trigger<? super TaggedUnion<T1, T2>, ? super W> trigger,
>     Evictor<? super TaggedUnion<T1, T2>, ? super W> evictor,
>     Time allowedLateness,
>     OutputTage<TaggedUnion<T1, T2>> outputTag) {
>       ...
> }
> {code}
>  and add a function sideOutputLateData(OutputTag<T> outputTag) in 
> {{WithWindow}}
> {code:java}
> public WithWindow<T1, T2, KEY, W> 
> sideOutputLateData(OutputTag<TaggedUnion<T1, T2>> outputTag) {
>    ...
> }
> {code}
> In {{WithWindow#apply}} will add outputTag if it is not null
> {code:java}
> public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, 
> TypeInfomation<T> resultType) {
>     ...
>     if (outputTag != null) {
>         windowedStream.sideOutputLateData(outputTag);
>     }
>     ...
> }{code}
> The same will apply to {{JoinedStreams}} 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to