Yea, it makes sense. This is an issue for the global window where there
isn't automatic cleanup of state. I've had a few user cases where they
would like a good way of doing state cleanup in the global window too -
something where whenever state gets buffer there is always a finite timer
that will fire. There might be an opportunity here, if we attach the hold
to that associated timer rather than the state. It sounds similar to what
you describe where someone made a timer just to create a watermark hold
associated with some state - I assume they actually do need to process and
emit that state in some way related to the timer.

On Tue, Jan 18, 2022 at 9:35 AM Reuven Lax <re...@google.com> wrote:

> Correct.
>
> IIRC originally we didn't want to add "buffered data timestamps"
> because it was error prone. Leaking even one record in state holds up the
> watermark and can cause the entire pipeline to grind to a halt. Associating
> with a timer guarantees that holds are always cleared eventually.
>
> On Tue, Jan 18, 2022 at 9:13 AM Kenneth Knowles <k...@apache.org> wrote:
>
>> This is an interesting case, and a legitimate counterexample to consider.
>> I'd call it a workaround :-). The semantic thing they would want/need is
>> "output timestamp" associated with buffered data (also implemented with
>> watermark hold). I do know systems that designed their state with this
>> built in.
>>
>> Kenn
>>
>> On Tue, Jan 18, 2022 at 8:57 AM Reuven Lax <re...@google.com> wrote:
>>
>>> One note - some people definitely use timer.withOutputTimestamp as a
>>> watermark hold.
>>>
>>
>>> This is a scenario in which one outputs (from processElement) a
>>> timestamp behind the current input element timestamp but knows that it is
>>> safe because there is already an extent timer with an earlier
>>> output timestamp (state can be used for this). In this case I've seen
>>> timers set simply for the hold - the actual onTimer never outputs anything.
>>>
>>> Reuven
>>>
>>> On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz <sniem...@apache.org>
>>>> wrote:
>>>>
>>>>> > I think this wouldn't be very robust to different situations where
>>>>> processing time and event time may not be that close to each other.
>>>>>
>>>>> if you do something like `min(endOfWindow, max(eventInputTimestamp,
>>>>> computedFiringTimestamp))` the worst case is that you set a watermark hold
>>>>> for somewhere in the future, right?  For example, if the watermark is
>>>>> lagging 3 hours, processing time = 4pm, event input = 1pm, window end =
>>>>> 5pm, the watermark hold/output time is set to 4pm + T.  This would make 
>>>>> the
>>>>> timestamps "newer" than the input, but shouldn't ever create late data,
>>>>> correct?
>>>>>
>>>>> Also, imo, the timestamps really already cross domains now, because
>>>>> the watermark (event time) is held until the (processing time) timer 
>>>>> fires.
>>>>>
>>>>> The concrete issue that brought this up was a pipeline with some
>>>>> state, and the state was "cleaned up" periodically with a processing time
>>>>> timer that fired every ~hour.  The author of the pipeline was confused why
>>>>> the watermark wasn't moving (and thus GBKs firing, etc).  The root cause
>>>>> was the watermark being held by the timer.
>>>>>
>>>>> > It would just save you .withOutputTimestamp(elementTimestamp) on
>>>>> your calls to setting the event time timer, right?
>>>>>
>>>>> Correct, the main thing I'm trying to solve is having to recalculate
>>>>> an output timestamp using the same logic that the timer itself is using to
>>>>> set its firing timestamp.
>>>>>
>>>>
>>>> It sounds like the main use case that you are dealing with is the case
>>>> where the timer doesn't actually produce output (or set further timers that
>>>> produce output) so it doesn't need (or want) a watermark hold. That makes
>>>> sense.
>>>>
>>>> In fact, I do not view a "watermark hold" as a fundamental concept. The
>>>> act of "set a timer with the intent that I am allowed to produce output
>>>> with timestamp X" is the fundamental concept, and watermark hold is an
>>>> implementation detail that should really never have been surfaced as an
>>>> end-user concept, or really even as an SDK author concept. This is why in
>>>> my proposal for adding output timestamps to timers, I called it
>>>> "withOutputTimestamp", and this is why the design does not include any
>>>> watermark holds - there is a self-loop on a transform where timers produce
>>>> an input watermark distinct from the watermark on input elements, and that
>>>> is enough. There is not now, and never has been, a need for the concept of
>>>> a hold at the level of the Beam model.
>>>>
>>>> I wonder if we can automate this behavior by noticing that there is no
>>>> OutputReceiver parameters to the timer callback, and also transitively. Or
>>>> just work around it by saying ".withoutOutput" on the timer.
>>>>
>>>> Kenn
>>>>
>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles <k...@apache.org>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz <sniem...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>> If I have a processing time timer, is there any way to automatically
>>>>>>> set the output timestamp to the timer firing timestamp (similar to how
>>>>>>> event-time timers work).
>>>>>>>
>>>>>>> A common use case would be to do something like:
>>>>>>> timer.offset(X).align(Y).setRelative()
>>>>>>>
>>>>>>
>>>>>>
>>>>>> but have the output timestamp be the firing timestamp.  In order to
>>>>>>> do this now you need to re-calculate the output timestamp (using the 
>>>>>>> same
>>>>>>> logic as the timer does internally) and manually use 
>>>>>>> withOutputTimestamp.
>>>>>>
>>>>>>
>>>>>> I think this wouldn't be very robust to different situations where
>>>>>> processing time and event time may not be that close to each other. In
>>>>>> general I'm skeptical of reusing timestamps across time domains, for just
>>>>>> this sort of reason. I wouldn't recommend doing this manually either.
>>>>>>
>>>>>>
>>>>>>> I'm not sure what the API would look like here, but it would also be
>>>>>>> nice to allow event-time timers to do the same in reverse (use the 
>>>>>>> element
>>>>>>> input timestamp rather than the firing timestamp).  Maybe something like
>>>>>>> `withDefaultOutputTimestampFrom(...)` and an enum of FIRING_TIMESTAMP,
>>>>>>> ELEMENT_TIMESTAMP?
>>>>>>>
>>>>>>
>>>>>> It would just save you .withOutputTimestamp(elementTimestamp) on your
>>>>>> calls to setting the event time timer, right? It doesn't work in general
>>>>>> because a timer can be set from other OnTimer methods, where there is no
>>>>>> "element" per se, but just the output timestamp of the fired timer.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>

Reply via email to