On Wed, Jan 19, 2022 at 1:11 AM Jan Lukavský <je...@seznam.cz> wrote:

> > One note - some people definitely use timer.withOutputTimestamp as a
> watermark hold.
>
> Definitely.
>
> > In fact, I do not view a "watermark hold" as a fundamental concept. The
> act of "set a timer with the intent that I am allowed to produce output
> with timestamp X" is the fundamental concept, and watermark hold is an
> implementation detail that should really never have been surfaced as an
> end-user concept, or really even as an SDK author concept.
>
> Agree that this need not be exposed explicitly, but the given the
> causality-preserving invariant that elements arriving *before* watermark
> *must not* leave after watermark I think that .withOutputTimestamp actually
> defines watermark hold implicitly. I think there is no other valid
> implementation than to hold output watermark not to cross the output
> timestamp of any active per-key timer (actually, we could distinguish cases
> when the timer is set for already late elements, there is no need - or
> possibility - to hold the watermark).
>
> I'd be also supportive for associating any buffer output timestamp with
> timer, rather than the buffer itself, as that really feels like a better
> description of what is *really* going to happen.
>
Is this just a way to connect the state, timer callback, and process
element. I wonder how it looks different or what we could do better with
this information. (I like these sorts of ideas, but I can't think of how it
would be different)

In the case Reuven described, where the timer callback does nothing, there
seems to be a real risk that data is left behind in the buffer when the
watermark hold is released. So you could, for example, have a timer
callback that always must accept the full contents of the buffer, and where
it is obvious to a user that the buffer is cleared after the callback. Like
OnWindowExpiration but OnBufferEviction.

> This was probably discussed, but I cannot see this in this discussion,
> what keeps us from setting output timestamp of processing-time timer to
> something like min(endOfWindow, currentOutputWatermark)? Yes, output
> watermark is not stable, but anything that is derived from _processing
> time_ is not stable by definition. For on-time elements, outputWatermark
> gives an estimation of the current position in event-time, so it makes
> sense to me to use that. Are there any counter examples?
>
This seems OK to me. Certainly the hold should never be based on processing
time.

Kenn

>  Jan
> On 1/18/22 21:10, Kenneth Knowles wrote:
>
> Yea, it makes sense. This is an issue for the global window where there
> isn't automatic cleanup of state. I've had a few user cases where they
> would like a good way of doing state cleanup in the global window too -
> something where whenever state gets buffer there is always a finite timer
> that will fire. There might be an opportunity here, if we attach the hold
> to that associated timer rather than the state. It sounds similar to what
> you describe where someone made a timer just to create a watermark hold
> associated with some state - I assume they actually do need to process and
> emit that state in some way related to the timer.
>
> On Tue, Jan 18, 2022 at 9:35 AM Reuven Lax <re...@google.com> wrote:
>
>> Correct.
>>
>> IIRC originally we didn't want to add "buffered data timestamps"
>> because it was error prone. Leaking even one record in state holds up the
>> watermark and can cause the entire pipeline to grind to a halt. Associating
>> with a timer guarantees that holds are always cleared eventually.
>>
>> On Tue, Jan 18, 2022 at 9:13 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> This is an interesting case, and a legitimate counterexample to
>>> consider. I'd call it a workaround :-). The semantic thing they would
>>> want/need is "output timestamp" associated with buffered data (also
>>> implemented with watermark hold). I do know systems that designed their
>>> state with this built in.
>>>
>>> Kenn
>>>
>>> On Tue, Jan 18, 2022 at 8:57 AM Reuven Lax <re...@google.com> wrote:
>>>
>>>> One note - some people definitely use timer.withOutputTimestamp as a
>>>> watermark hold.
>>>>
>>>
>>>> This is a scenario in which one outputs (from processElement) a
>>>> timestamp behind the current input element timestamp but knows that it is
>>>> safe because there is already an extent timer with an earlier
>>>> output timestamp (state can be used for this). In this case I've seen
>>>> timers set simply for the hold - the actual onTimer never outputs anything.
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles <k...@apache.org>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz <sniem...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> > I think this wouldn't be very robust to different situations where
>>>>>> processing time and event time may not be that close to each other.
>>>>>>
>>>>>> if you do something like `min(endOfWindow, max(eventInputTimestamp,
>>>>>> computedFiringTimestamp))` the worst case is that you set a watermark 
>>>>>> hold
>>>>>> for somewhere in the future, right?  For example, if the watermark is
>>>>>> lagging 3 hours, processing time = 4pm, event input = 1pm, window end =
>>>>>> 5pm, the watermark hold/output time is set to 4pm + T.  This would make 
>>>>>> the
>>>>>> timestamps "newer" than the input, but shouldn't ever create late data,
>>>>>> correct?
>>>>>>
>>>>>> Also, imo, the timestamps really already cross domains now, because
>>>>>> the watermark (event time) is held until the (processing time) timer 
>>>>>> fires.
>>>>>>
>>>>>> The concrete issue that brought this up was a pipeline with some
>>>>>> state, and the state was "cleaned up" periodically with a processing time
>>>>>> timer that fired every ~hour.  The author of the pipeline was confused 
>>>>>> why
>>>>>> the watermark wasn't moving (and thus GBKs firing, etc).  The root cause
>>>>>> was the watermark being held by the timer.
>>>>>>
>>>>>> > It would just save you .withOutputTimestamp(elementTimestamp) on
>>>>>> your calls to setting the event time timer, right?
>>>>>>
>>>>>> Correct, the main thing I'm trying to solve is having to recalculate
>>>>>> an output timestamp using the same logic that the timer itself is using 
>>>>>> to
>>>>>> set its firing timestamp.
>>>>>>
>>>>>
>>>>> It sounds like the main use case that you are dealing with is the case
>>>>> where the timer doesn't actually produce output (or set further timers 
>>>>> that
>>>>> produce output) so it doesn't need (or want) a watermark hold. That makes
>>>>> sense.
>>>>>
>>>>> In fact, I do not view a "watermark hold" as a fundamental concept.
>>>>> The act of "set a timer with the intent that I am allowed to produce 
>>>>> output
>>>>> with timestamp X" is the fundamental concept, and watermark hold is an
>>>>> implementation detail that should really never have been surfaced as an
>>>>> end-user concept, or really even as an SDK author concept. This is why in
>>>>> my proposal for adding output timestamps to timers, I called it
>>>>> "withOutputTimestamp", and this is why the design does not include any
>>>>> watermark holds - there is a self-loop on a transform where timers produce
>>>>> an input watermark distinct from the watermark on input elements, and that
>>>>> is enough. There is not now, and never has been, a need for the concept of
>>>>> a hold at the level of the Beam model.
>>>>>
>>>>> I wonder if we can automate this behavior by noticing that there is no
>>>>> OutputReceiver parameters to the timer callback, and also transitively. Or
>>>>> just work around it by saying ".withoutOutput" on the timer.
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles <k...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz <sniem...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> If I have a processing time timer, is there any way to
>>>>>>>> automatically set the output timestamp to the timer firing timestamp
>>>>>>>> (similar to how event-time timers work).
>>>>>>>>
>>>>>>>> A common use case would be to do something like:
>>>>>>>> timer.offset(X).align(Y).setRelative()
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> but have the output timestamp be the firing timestamp.  In order to
>>>>>>>> do this now you need to re-calculate the output timestamp (using the 
>>>>>>>> same
>>>>>>>> logic as the timer does internally) and manually use 
>>>>>>>> withOutputTimestamp.
>>>>>>>
>>>>>>>
>>>>>>> I think this wouldn't be very robust to different situations where
>>>>>>> processing time and event time may not be that close to each other. In
>>>>>>> general I'm skeptical of reusing timestamps across time domains, for 
>>>>>>> just
>>>>>>> this sort of reason. I wouldn't recommend doing this manually either.
>>>>>>>
>>>>>>>
>>>>>>>> I'm not sure what the API would look like here, but it would also
>>>>>>>> be nice to allow event-time timers to do the same in reverse (use the
>>>>>>>> element input timestamp rather than the firing timestamp).  Maybe 
>>>>>>>> something
>>>>>>>> like `withDefaultOutputTimestampFrom(...)` and an enum of 
>>>>>>>> FIRING_TIMESTAMP,
>>>>>>>> ELEMENT_TIMESTAMP?
>>>>>>>>
>>>>>>>
>>>>>>> It would just save you .withOutputTimestamp(elementTimestamp) on
>>>>>>> your calls to setting the event time timer, right? It doesn't work in
>>>>>>> general because a timer can be set from other OnTimer methods, where 
>>>>>>> there
>>>>>>> is no "element" per se, but just the output timestamp of the fired 
>>>>>>> timer.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>

Reply via email to