On Wed, Jan 19, 2022 at 1:11 AM Jan Lukavský <je...@seznam.cz> wrote:
> > One note - some people definitely use timer.withOutputTimestamp as a > watermark hold. > > Definitely. > > > In fact, I do not view a "watermark hold" as a fundamental concept. The > act of "set a timer with the intent that I am allowed to produce output > with timestamp X" is the fundamental concept, and watermark hold is an > implementation detail that should really never have been surfaced as an > end-user concept, or really even as an SDK author concept. > > Agree that this need not be exposed explicitly, but the given the > causality-preserving invariant that elements arriving *before* watermark > *must not* leave after watermark I think that .withOutputTimestamp actually > defines watermark hold implicitly. I think there is no other valid > implementation than to hold output watermark not to cross the output > timestamp of any active per-key timer (actually, we could distinguish cases > when the timer is set for already late elements, there is no need - or > possibility - to hold the watermark). > > I'd be also supportive for associating any buffer output timestamp with > timer, rather than the buffer itself, as that really feels like a better > description of what is *really* going to happen. > Is this just a way to connect the state, timer callback, and process element. I wonder how it looks different or what we could do better with this information. (I like these sorts of ideas, but I can't think of how it would be different) In the case Reuven described, where the timer callback does nothing, there seems to be a real risk that data is left behind in the buffer when the watermark hold is released. So you could, for example, have a timer callback that always must accept the full contents of the buffer, and where it is obvious to a user that the buffer is cleared after the callback. Like OnWindowExpiration but OnBufferEviction. > This was probably discussed, but I cannot see this in this discussion, > what keeps us from setting output timestamp of processing-time timer to > something like min(endOfWindow, currentOutputWatermark)? Yes, output > watermark is not stable, but anything that is derived from _processing > time_ is not stable by definition. For on-time elements, outputWatermark > gives an estimation of the current position in event-time, so it makes > sense to me to use that. Are there any counter examples? > This seems OK to me. Certainly the hold should never be based on processing time. Kenn > Jan > On 1/18/22 21:10, Kenneth Knowles wrote: > > Yea, it makes sense. This is an issue for the global window where there > isn't automatic cleanup of state. I've had a few user cases where they > would like a good way of doing state cleanup in the global window too - > something where whenever state gets buffer there is always a finite timer > that will fire. There might be an opportunity here, if we attach the hold > to that associated timer rather than the state. It sounds similar to what > you describe where someone made a timer just to create a watermark hold > associated with some state - I assume they actually do need to process and > emit that state in some way related to the timer. > > On Tue, Jan 18, 2022 at 9:35 AM Reuven Lax <re...@google.com> wrote: > >> Correct. >> >> IIRC originally we didn't want to add "buffered data timestamps" >> because it was error prone. Leaking even one record in state holds up the >> watermark and can cause the entire pipeline to grind to a halt. Associating >> with a timer guarantees that holds are always cleared eventually. >> >> On Tue, Jan 18, 2022 at 9:13 AM Kenneth Knowles <k...@apache.org> wrote: >> >>> This is an interesting case, and a legitimate counterexample to >>> consider. I'd call it a workaround :-). The semantic thing they would >>> want/need is "output timestamp" associated with buffered data (also >>> implemented with watermark hold). I do know systems that designed their >>> state with this built in. >>> >>> Kenn >>> >>> On Tue, Jan 18, 2022 at 8:57 AM Reuven Lax <re...@google.com> wrote: >>> >>>> One note - some people definitely use timer.withOutputTimestamp as a >>>> watermark hold. >>>> >>> >>>> This is a scenario in which one outputs (from processElement) a >>>> timestamp behind the current input element timestamp but knows that it is >>>> safe because there is already an extent timer with an earlier >>>> output timestamp (state can be used for this). In this case I've seen >>>> timers set simply for the hold - the actual onTimer never outputs anything. >>>> >>>> Reuven >>>> >>>> On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles <k...@apache.org> >>>> wrote: >>>> >>>>> >>>>> >>>>> On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz <sniem...@apache.org> >>>>> wrote: >>>>> >>>>>> > I think this wouldn't be very robust to different situations where >>>>>> processing time and event time may not be that close to each other. >>>>>> >>>>>> if you do something like `min(endOfWindow, max(eventInputTimestamp, >>>>>> computedFiringTimestamp))` the worst case is that you set a watermark >>>>>> hold >>>>>> for somewhere in the future, right? For example, if the watermark is >>>>>> lagging 3 hours, processing time = 4pm, event input = 1pm, window end = >>>>>> 5pm, the watermark hold/output time is set to 4pm + T. This would make >>>>>> the >>>>>> timestamps "newer" than the input, but shouldn't ever create late data, >>>>>> correct? >>>>>> >>>>>> Also, imo, the timestamps really already cross domains now, because >>>>>> the watermark (event time) is held until the (processing time) timer >>>>>> fires. >>>>>> >>>>>> The concrete issue that brought this up was a pipeline with some >>>>>> state, and the state was "cleaned up" periodically with a processing time >>>>>> timer that fired every ~hour. The author of the pipeline was confused >>>>>> why >>>>>> the watermark wasn't moving (and thus GBKs firing, etc). The root cause >>>>>> was the watermark being held by the timer. >>>>>> >>>>>> > It would just save you .withOutputTimestamp(elementTimestamp) on >>>>>> your calls to setting the event time timer, right? >>>>>> >>>>>> Correct, the main thing I'm trying to solve is having to recalculate >>>>>> an output timestamp using the same logic that the timer itself is using >>>>>> to >>>>>> set its firing timestamp. >>>>>> >>>>> >>>>> It sounds like the main use case that you are dealing with is the case >>>>> where the timer doesn't actually produce output (or set further timers >>>>> that >>>>> produce output) so it doesn't need (or want) a watermark hold. That makes >>>>> sense. >>>>> >>>>> In fact, I do not view a "watermark hold" as a fundamental concept. >>>>> The act of "set a timer with the intent that I am allowed to produce >>>>> output >>>>> with timestamp X" is the fundamental concept, and watermark hold is an >>>>> implementation detail that should really never have been surfaced as an >>>>> end-user concept, or really even as an SDK author concept. This is why in >>>>> my proposal for adding output timestamps to timers, I called it >>>>> "withOutputTimestamp", and this is why the design does not include any >>>>> watermark holds - there is a self-loop on a transform where timers produce >>>>> an input watermark distinct from the watermark on input elements, and that >>>>> is enough. There is not now, and never has been, a need for the concept of >>>>> a hold at the level of the Beam model. >>>>> >>>>> I wonder if we can automate this behavior by noticing that there is no >>>>> OutputReceiver parameters to the timer callback, and also transitively. Or >>>>> just work around it by saying ".withoutOutput" on the timer. >>>>> >>>>> Kenn >>>>> >>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles <k...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz <sniem...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> If I have a processing time timer, is there any way to >>>>>>>> automatically set the output timestamp to the timer firing timestamp >>>>>>>> (similar to how event-time timers work). >>>>>>>> >>>>>>>> A common use case would be to do something like: >>>>>>>> timer.offset(X).align(Y).setRelative() >>>>>>>> >>>>>>> >>>>>>> >>>>>>> but have the output timestamp be the firing timestamp. In order to >>>>>>>> do this now you need to re-calculate the output timestamp (using the >>>>>>>> same >>>>>>>> logic as the timer does internally) and manually use >>>>>>>> withOutputTimestamp. >>>>>>> >>>>>>> >>>>>>> I think this wouldn't be very robust to different situations where >>>>>>> processing time and event time may not be that close to each other. In >>>>>>> general I'm skeptical of reusing timestamps across time domains, for >>>>>>> just >>>>>>> this sort of reason. I wouldn't recommend doing this manually either. >>>>>>> >>>>>>> >>>>>>>> I'm not sure what the API would look like here, but it would also >>>>>>>> be nice to allow event-time timers to do the same in reverse (use the >>>>>>>> element input timestamp rather than the firing timestamp). Maybe >>>>>>>> something >>>>>>>> like `withDefaultOutputTimestampFrom(...)` and an enum of >>>>>>>> FIRING_TIMESTAMP, >>>>>>>> ELEMENT_TIMESTAMP? >>>>>>>> >>>>>>> >>>>>>> It would just save you .withOutputTimestamp(elementTimestamp) on >>>>>>> your calls to setting the event time timer, right? It doesn't work in >>>>>>> general because a timer can be set from other OnTimer methods, where >>>>>>> there >>>>>>> is no "element" per se, but just the output timestamp of the fired >>>>>>> timer. >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>