This is an interesting case, and a legitimate counterexample to consider.
I'd call it a workaround :-). The semantic thing they would want/need is
"output timestamp" associated with buffered data (also implemented with
watermark hold). I do know systems that designed their state with this
built in.

Kenn

On Tue, Jan 18, 2022 at 8:57 AM Reuven Lax <re...@google.com> wrote:

> One note - some people definitely use timer.withOutputTimestamp as a
> watermark hold.
>

> This is a scenario in which one outputs (from processElement) a timestamp
> behind the current input element timestamp but knows that it is safe
> because there is already an extent timer with an earlier output timestamp
> (state can be used for this). In this case I've seen timers set simply for
> the hold - the actual onTimer never outputs anything.
>
> Reuven
>
> On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles <k...@apache.org> wrote:
>
>>
>>
>> On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz <sniem...@apache.org>
>> wrote:
>>
>>> > I think this wouldn't be very robust to different situations where
>>> processing time and event time may not be that close to each other.
>>>
>>> if you do something like `min(endOfWindow, max(eventInputTimestamp,
>>> computedFiringTimestamp))` the worst case is that you set a watermark hold
>>> for somewhere in the future, right?  For example, if the watermark is
>>> lagging 3 hours, processing time = 4pm, event input = 1pm, window end =
>>> 5pm, the watermark hold/output time is set to 4pm + T.  This would make the
>>> timestamps "newer" than the input, but shouldn't ever create late data,
>>> correct?
>>>
>>> Also, imo, the timestamps really already cross domains now, because the
>>> watermark (event time) is held until the (processing time) timer fires.
>>>
>>> The concrete issue that brought this up was a pipeline with some state,
>>> and the state was "cleaned up" periodically with a processing time timer
>>> that fired every ~hour.  The author of the pipeline was confused why the
>>> watermark wasn't moving (and thus GBKs firing, etc).  The root cause was
>>> the watermark being held by the timer.
>>>
>>> > It would just save you .withOutputTimestamp(elementTimestamp) on your
>>> calls to setting the event time timer, right?
>>>
>>> Correct, the main thing I'm trying to solve is having to recalculate an
>>> output timestamp using the same logic that the timer itself is using to set
>>> its firing timestamp.
>>>
>>
>> It sounds like the main use case that you are dealing with is the case
>> where the timer doesn't actually produce output (or set further timers that
>> produce output) so it doesn't need (or want) a watermark hold. That makes
>> sense.
>>
>> In fact, I do not view a "watermark hold" as a fundamental concept. The
>> act of "set a timer with the intent that I am allowed to produce output
>> with timestamp X" is the fundamental concept, and watermark hold is an
>> implementation detail that should really never have been surfaced as an
>> end-user concept, or really even as an SDK author concept. This is why in
>> my proposal for adding output timestamps to timers, I called it
>> "withOutputTimestamp", and this is why the design does not include any
>> watermark holds - there is a self-loop on a transform where timers produce
>> an input watermark distinct from the watermark on input elements, and that
>> is enough. There is not now, and never has been, a need for the concept of
>> a hold at the level of the Beam model.
>>
>> I wonder if we can automate this behavior by noticing that there is no
>> OutputReceiver parameters to the timer callback, and also transitively. Or
>> just work around it by saying ".withoutOutput" on the timer.
>>
>> Kenn
>>
>>
>>>
>>>
>>>
>>> On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles <k...@apache.org> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz <sniem...@apache.org>
>>>> wrote:
>>>>
>>>>> If I have a processing time timer, is there any way to automatically
>>>>> set the output timestamp to the timer firing timestamp (similar to how
>>>>> event-time timers work).
>>>>>
>>>>> A common use case would be to do something like:
>>>>> timer.offset(X).align(Y).setRelative()
>>>>>
>>>>
>>>>
>>>> but have the output timestamp be the firing timestamp.  In order to do
>>>>> this now you need to re-calculate the output timestamp (using the same
>>>>> logic as the timer does internally) and manually use withOutputTimestamp.
>>>>
>>>>
>>>> I think this wouldn't be very robust to different situations where
>>>> processing time and event time may not be that close to each other. In
>>>> general I'm skeptical of reusing timestamps across time domains, for just
>>>> this sort of reason. I wouldn't recommend doing this manually either.
>>>>
>>>>
>>>>> I'm not sure what the API would look like here, but it would also be
>>>>> nice to allow event-time timers to do the same in reverse (use the element
>>>>> input timestamp rather than the firing timestamp).  Maybe something like
>>>>> `withDefaultOutputTimestampFrom(...)` and an enum of FIRING_TIMESTAMP,
>>>>> ELEMENT_TIMESTAMP?
>>>>>
>>>>
>>>> It would just save you .withOutputTimestamp(elementTimestamp) on your
>>>> calls to setting the event time timer, right? It doesn't work in general
>>>> because a timer can be set from other OnTimer methods, where there is no
>>>> "element" per se, but just the output timestamp of the fired timer.
>>>>
>>>> Kenn
>>>>
>>>

Reply via email to