> One note - some people definitely use timer.withOutputTimestamp as a
watermark hold.
Definitely.
> In fact, I do not view a "watermark hold" as a fundamental concept.
The act of "set a timer with the intent that I am allowed to produce
output with timestamp X" is the fundamental concept, and watermark hold
is an implementation detail that should really never have been surfaced
as an end-user concept, or really even as an SDK author concept.
Agree that this need not be exposed explicitly, but the given the
causality-preserving invariant that elements arriving *before* watermark
*must not* leave after watermark I think that .withOutputTimestamp
actually defines watermark hold implicitly. I think there is no other
valid implementation than to hold output watermark not to cross the
output timestamp of any active per-key timer (actually, we could
distinguish cases when the timer is set for already late elements, there
is no need - or possibility - to hold the watermark).
I'd be also supportive for associating any buffer output timestamp with
timer, rather than the buffer itself, as that really feels like a better
description of what is *really* going to happen.
This was probably discussed, but I cannot see this in this discussion,
what keeps us from setting output timestamp of processing-time timer to
something like min(endOfWindow, currentOutputWatermark)? Yes, output
watermark is not stable, but anything that is derived from _processing
time_ is not stable by definition. For on-time elements, outputWatermark
gives an estimation of the current position in event-time, so it makes
sense to me to use that. Are there any counter examples?
Jan
On 1/18/22 21:10, Kenneth Knowles wrote:
Yea, it makes sense. This is an issue for the global window where
there isn't automatic cleanup of state. I've had a few user cases
where they would like a good way of doing state cleanup in the global
window too - something where whenever state gets buffer there is
always a finite timer that will fire. There might be an opportunity
here, if we attach the hold to that associated timer rather than the
state. It sounds similar to what you describe where someone made a
timer just to create a watermark hold associated with some state - I
assume they actually do need to process and emit that state in some
way related to the timer.
On Tue, Jan 18, 2022 at 9:35 AM Reuven Lax <re...@google.com> wrote:
Correct.
IIRC originally we didn't want to add "buffered data timestamps"
because it was error prone. Leaking even one record in state
holds up the watermark and can cause the entire pipeline to grind
to a halt. Associating with a timer guarantees that holds are
always cleared eventually.
On Tue, Jan 18, 2022 at 9:13 AM Kenneth Knowles <k...@apache.org>
wrote:
This is an interesting case, and a legitimate counterexample
to consider. I'd call it a workaround :-). The semantic thing
they would want/need is "output timestamp" associated with
buffered data (also implemented with watermark hold). I do
know systems that designed their state with this built in.
Kenn
On Tue, Jan 18, 2022 at 8:57 AM Reuven Lax <re...@google.com>
wrote:
One note - some people definitely use
timer.withOutputTimestamp as a watermark hold.
This is a scenario in which one outputs (from
processElement) a timestamp behind the current input
element timestamp but knows that it is safe because there
is already an extent timer with an earlier
output timestamp (state can be used for this). In this
case I've seen timers set simply for the hold - the actual
onTimer never outputs anything.
Reuven
On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles
<k...@apache.org> wrote:
On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz
<sniem...@apache.org> wrote:
> I think this wouldn't be very robust to
different situations where processing time and
event time may not be that close to each other.
if you do something like `min(endOfWindow,
max(eventInputTimestamp,
computedFiringTimestamp))` the worst case is that
you set a watermark hold for somewhere in the
future, right? For example, if the watermark is
lagging 3 hours, processing time = 4pm, event
input = 1pm, window end = 5pm, the watermark
hold/output time is set to 4pm + T. This would
make the timestamps "newer" than the input, but
shouldn't ever create late data, correct?
Also, imo, the timestamps really already cross
domains now, because the watermark (event time) is
held until the (processing time) timer fires.
The concrete issue that brought this up was a
pipeline with some state, and the state was
"cleaned up" periodically with a processing time
timer that fired every ~hour. The author of the
pipeline was confused why the watermark wasn't
moving (and thus GBKs firing, etc). The root
cause was the watermark being held by the timer.
> It would just save you
.withOutputTimestamp(elementTimestamp) on your
calls to setting the event time timer, right?
Correct, the main thing I'm trying to solve is
having to recalculate an output timestamp using
the same logic that the timer itself is using to
set its firing timestamp.
It sounds like the main use case that you are dealing
with is the case where the timer doesn't actually
produce output (or set further timers that produce
output) so it doesn't need (or want) a watermark hold.
That makes sense.
In fact, I do not view a "watermark hold" as a
fundamental concept. The act of "set a timer with the
intent that I am allowed to produce output with
timestamp X" is the fundamental concept, and watermark
hold is an implementation detail that should really
never have been surfaced as an end-user concept, or
really even as an SDK author concept. This is why in
my proposal for adding output timestamps to timers, I
called it "withOutputTimestamp", and this is why the
design does not include any watermark holds - there is
a self-loop on a transform where timers produce an
input watermark distinct from the watermark on input
elements, and that is enough. There is not now, and
never has been, a need for the concept of a hold at
the level of the Beam model.
I wonder if we can automate this behavior by noticing
that there is no OutputReceiver parameters to the
timer callback, and also transitively. Or just work
around it by saying ".withoutOutput" on the timer.
Kenn
On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles
<k...@apache.org> wrote:
On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz
<sniem...@apache.org> wrote:
If I have a processing time timer, is
there any way to automatically set the
output timestamp to the timer firing
timestamp (similar to how event-time
timers work).
A common use case would be to do something
like:
timer.offset(X).align(Y).setRelative()
but have the output timestamp be the
firing timestamp. In order to do this now
you need to re-calculate the output
timestamp (using the same logic as the
timer does internally) and manually use
withOutputTimestamp.
I think this wouldn't be very robust to
different situations where processing time and
event time may not be that close to each
other. In general I'm skeptical of reusing
timestamps across time domains, for just this
sort of reason. I wouldn't recommend doing
this manually either.
I'm not sure what the API would look like
here, but it would also be nice to allow
event-time timers to do the same in
reverse (use the element input timestamp
rather than the firing timestamp). Maybe
something like
`withDefaultOutputTimestampFrom(...)` and
an enum of FIRING_TIMESTAMP,
ELEMENT_TIMESTAMP?
It would just save you
.withOutputTimestamp(elementTimestamp) on your
calls to setting the event time timer, right?
It doesn't work in general because a timer can
be set from other OnTimer methods, where there
is no "element" per se, but just the output
timestamp of the fired timer.
Kenn