[ 
https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17607948#comment-17607948
 ] 

Piotr Nowojski commented on FLINK-18647:
----------------------------------------

[~dkapoor1], is there some other ticket for the
{quote}
Processing Time CEP is broken in minicluster
{quote}
that you are referring to?

Apart of that, I would be afraid that even changing the behaviour of the 
minicluster to waiting for timers before shutdown would be problematic, 
prolonging the tests. Keep in mind that as a workaround in tests, you can keep 
alive your artificial source until some timer fires.

> How to handle processing time timers with bounded input
> -------------------------------------------------------
>
>                 Key: FLINK-18647
>                 URL: https://issues.apache.org/jira/browse/FLINK-18647
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.11.0
>            Reporter: Piotr Nowojski
>            Priority: Not a Priority
>              Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> stale-minor
>
> (most of this description comes from an offline discussion between me, 
> [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb])
> In case of end of input (for example for bounded sources), all pending 
> (untriggered) processing time timers are ignored/dropped. In some cases this 
> is desirable, but for example for {{WindowOperator}} it means that last 
> trailing window will not be triggered, causing an apparent data loss.
> There are a couple of ideas what should be considered.
> 1. Provide a way for users to decide what to do with such timers: cancel, 
> wait, trigger immediately. For example by overloading the existing methods: 
> {{ProcessingTimeService#registerTimer}} and 
> {{ProcessingTimeService#scheduleAtFixedRate}} in the following way:
> {code:java}
> ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback 
> target, TimerAction timerAction);
> enum TimerAction { 
>     CANCEL_ON_END_OF_INPUT, 
>     TRIGGER_ON_END_OF_INPUT,
>     WAIT_ON_END_OF_INPUT}
> {code}
> or maybe:
> {code}
> public interface TimerAction {
>     void onEndOfInput(ScheduledFuture<?> timer);
> }
> {code}
> But this would also mean we store additional state with each timer and we 
> need to modify the serialisation format (providing some kind of state 
> migration path) and potentially increase the size foot print of the timers.
> Extra overhead could have been avoided via some kind of {{Map<Timer, 
> TimerAction>}}, with lack of entry meaning some default value.
> 2. 
> Also another way to solve this problem might be let the operator code decide 
> what to do with the given timer. Either ask an operator what should happen 
> with given timer (a), or let the operator iterate and cancel the timers on 
> endOfInput() (b), or just fire the timer with some endOfInput flag (c).
> I think none of the (a), (b), and (c) would require braking API changes, no 
> state changes and no additional overheads. Just the logic what to do with the 
> timer would have to be “hardcoded” in the operator’s code. (which btw might 
> even has an additional benefit of being easier to change in case of some 
> bugs, like a timer was registered with wrong/incorrect {{TimerAction}}).
> This is complicated a bit by a question, how (if at all?) options a), b) or 
> c) should be exposed to UDFs? 
> 3. 
> Maybe we need a combination of both? Pre existing operators could implement 
> some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be 
> handled by 1.? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to