[ 
https://issues.apache.org/jira/browse/FLINK-18647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17606503#comment-17606503
 ] 

Piotr Nowojski edited comment on FLINK-18647 at 9/19/22 8:59 AM:
-----------------------------------------------------------------

{quote}
I would suggest going for an implementation that only implements Option 1 and 
Option 2 (as Option 1 is roughly the same as Option 3 without the complexity of 
timer cancels). User code can easily convert Option 1 to Option 3 if they so 
desire by skipping the timer body. 
{quote}
I would be very careful with such change. Current behaviour makes sense for 
some use cases, like a code that is supposed to emit something every N seconds 
or TTLs. Also keep in mind that what you are suggesting would cause a braking 
change to a stable `@Public` API.

{quote}
logically users usually have a uniform requirements on the pending timers to 
keep the semantics consistent
{quote}
Is that true [~gaoyunhaii]? I've always thought that this is highly operator 
dependant. Apart of the testing purposes as pointed out by [~dkapoor1], I can 
see timers from different operator can have different use cases:
# mark end of some windowed aggregation
# handle CEP style timeouts, like emit record X if record Y hasn't arrived 
within 30 seconds after record Z
# handle timeouts when dealing with external systems. Something like in async 
function or maybe in some sinks. Do something if an external system doesn't 
respond within 30 seconds. 
# clean internal flink state, like some form of TTL

1. should be fired immediately on EOF, or waited. Depending on the business 
logic. Indeed I could see this being correlated through out the job - most 
likely all windowed operations should behave in the same way.
2. should be either dropped on EOF, or fired immediately, depending on the 
business logic. If firing immediately is the correct thing to do, waiting would 
be also correct, but inefficient. 
3. most likely can be dropped, as this should have been dealt by some kind of 
clean up code. For example `AsyncWaitOperator` is waiting for all async 
operations to complete anyway. But theoretically I could see this depending on 
the business logic.
4. ideally should be dropped on EOF. Can be also fired or waited, but either of 
those two is inefficient. When TTL is huge (hours, days or months) waiting can 
be impractical.

I can maybe see that in SQL/Table API, we are dealing only with options 1. and 
maybe 4., so the global configuration might be acceptable, if the TTL is small. 
But is that the case always? Do we have TTL like use cases of timers in SQL 
[~gaoyunhaii]? [~dwysakowicz] [~twalthr]?

I could see that allowing users to set this globally, might be 
harmful/confusing in the long run. For example user has an issue with some 
windowed operator, he changes the global setting in order to fix it, but 
inadvertently brakes something else, without realising it. 

Now that I think of potential solutions, maybe we should allow this to be 
configured via a call like 
`org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setEndOfInputTimerBehaviour`?
 In SQL either we could do this via some session variable or indeed via a 
global Table API config if TTL is not an issue? If it is, maybe SQL planner can 
set the windowed joins/aggregations behaviour to the global behaviour (from 
session/config variable), but set all TTLs to be ignored anyway? But I don't 
like the idea of allowing global config for DataStream users, since setting 
this per operator is very easy and IMO less error prone there.


was (Author: pnowojski):
{{quote}}
I would suggest going for an implementation that only implements Option 1 and 
Option 2 (as Option 1 is roughly the same as Option 3 without the complexity of 
timer cancels). User code can easily convert Option 1 to Option 3 if they so 
desire by skipping the timer body. 
{{quote}}
I would be very careful with such change. Current behaviour makes sense for 
some use cases, like a code that is supposed to emit something every N seconds 
or TTLs. Also keep in mind that what you are suggesting would cause a braking 
change to a stable `@Public` API.

{{quote}}
logically users usually have a uniform requirements on the pending timers to 
keep the semantics consistent
{{quote}}
Is that true [~gaoyunhaii]? I've always thought that this is highly operator 
dependant. Apart of the testing purposes as pointed out by [~dkapoor1], I can 
see timers from different operator can have different use cases:
# mark end of some windowed aggregation
# handle CEP style timeouts, like emit record X if record Y hasn't arrived 
within 30 seconds after record Z
# handle timeouts when dealing with external systems. Something like in async 
function or maybe in some sinks. Do something if an external system doesn't 
respond within 30 seconds. 
# clean internal flink state, like some form of TTL

1. should be fired immediately on EOF, or waited. Depending on the business 
logic. Indeed I could see this being correlated through out the job - most 
likely all windowed operations should behave in the same way.
2. should be either dropped on EOF, or fired immediately, depending on the 
business logic. If firing immediately is the correct thing to do, waiting would 
be also correct, but inefficient. 
3. most likely can be dropped, as this should have been dealt by some kind of 
clean up code. For example `AsyncWaitOperator` is waiting for all async 
operations to complete anyway. But theoretically I could see this depending on 
the business logic.
4. ideally should be dropped on EOF. Can be also fired or waited, but either of 
those two is inefficient. When TTL is huge (hours, days or months) waiting can 
be impractical.

I can maybe see that in SQL/Table API, we are dealing only with options 1. and 
maybe 4., so the global configuration might be acceptable, if the TTL is small. 
But is that the case always? Do we have TTL like use cases of timers in SQL 
[~gaoyunhaii]? [~dwysakowicz] [~twalthr]?

I could see that allowing users to set this globally, might be 
harmful/confusing in the long run. For example user has an issue with some 
windowed operator, he changes the global setting in order to fix it, but 
inadvertently brakes something else, without realising it. 

Now that I think of potential solutions, maybe we should allow this to be 
configured via a call like 
`org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#setEndOfInputTimerBehaviour`?
 In SQL either we could do this via some session variable or indeed via a 
global Table API config if TTL is not an issue? If it is, maybe SQL planner can 
set the windowed joins/aggregations behaviour to the global behaviour (from 
session/config variable), but set all TTLs to be ignored anyway? But I don't 
like the idea of allowing global config for DataStream users, since setting 
this per operator is very easy and IMO less error prone there.

> How to handle processing time timers with bounded input
> -------------------------------------------------------
>
>                 Key: FLINK-18647
>                 URL: https://issues.apache.org/jira/browse/FLINK-18647
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.11.0
>            Reporter: Piotr Nowojski
>            Priority: Not a Priority
>              Labels: auto-deprioritized-critical, auto-deprioritized-major, 
> stale-minor
>
> (most of this description comes from an offline discussion between me, 
> [~AHeise], [~roman_khachatryan], [~aljoscha] and [~sunhaibotb])
> In case of end of input (for example for bounded sources), all pending 
> (untriggered) processing time timers are ignored/dropped. In some cases this 
> is desirable, but for example for {{WindowOperator}} it means that last 
> trailing window will not be triggered, causing an apparent data loss.
> There are a couple of ideas what should be considered.
> 1. Provide a way for users to decide what to do with such timers: cancel, 
> wait, trigger immediately. For example by overloading the existing methods: 
> {{ProcessingTimeService#registerTimer}} and 
> {{ProcessingTimeService#scheduleAtFixedRate}} in the following way:
> {code:java}
> ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback 
> target, TimerAction timerAction);
> enum TimerAction { 
>     CANCEL_ON_END_OF_INPUT, 
>     TRIGGER_ON_END_OF_INPUT,
>     WAIT_ON_END_OF_INPUT}
> {code}
> or maybe:
> {code}
> public interface TimerAction {
>     void onEndOfInput(ScheduledFuture<?> timer);
> }
> {code}
> But this would also mean we store additional state with each timer and we 
> need to modify the serialisation format (providing some kind of state 
> migration path) and potentially increase the size foot print of the timers.
> Extra overhead could have been avoided via some kind of {{Map<Timer, 
> TimerAction>}}, with lack of entry meaning some default value.
> 2. 
> Also another way to solve this problem might be let the operator code decide 
> what to do with the given timer. Either ask an operator what should happen 
> with given timer (a), or let the operator iterate and cancel the timers on 
> endOfInput() (b), or just fire the timer with some endOfInput flag (c).
> I think none of the (a), (b), and (c) would require braking API changes, no 
> state changes and no additional overheads. Just the logic what to do with the 
> timer would have to be “hardcoded” in the operator’s code. (which btw might 
> even has an additional benefit of being easier to change in case of some 
> bugs, like a timer was registered with wrong/incorrect {{TimerAction}}).
> This is complicated a bit by a question, how (if at all?) options a), b) or 
> c) should be exposed to UDFs? 
> 3. 
> Maybe we need a combination of both? Pre existing operators could implement 
> some custom handling of this issue (via 2a, 2b or 2c), while UDFs could be 
> handled by 1.? 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to