On Thu, Jun 20, 2019 at 8:03 PM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi Reuven,
> I would be cautious changing this. Being able to put
multiple timers in the same bundle saves a lot, and if we
force them to all run separate through ReduceFnRunner we risk
regressing performance of some pipelines.
I understand your point. The issue here is that, the current
behavior is at least ... unexpected. There might be one
different conceptual approach to that:
a) if a bundle contains timers for several distinct
timestamps (say T1 and T2), then it implies, that timer T1 is
effectively not fired at time T1, but at time T2 - that is
due to the fact, that logically, the time hopped discretely
from some previous time T0 to T2 without any "stopping by".
Hence, it should be invalid to setup timer for any time lower
than T2.
But that is exactly how time advances. Watermarks often don't
move smoothly, as a single old element can hold up the watermark.
When that element is finished, the watermark can jump forward in
time, triggering many timers.
b) the time will move smoothly (or, millisecond precision
smoothly), but that implies, that there cannot be more
distinct timers inside single bundle.
If we don't want to take path b), we are probably left with
path a) (as doing nothing seems weird, because it breaks one
invariant, that time can only move forward).
I'm not sure how this breaks that invariant. The input watermark
has only moved forward, as should be true fo the output
watermark. The output watermark is help up by watermark holds in
the step, which usually means that the output watermark is
already being help to the earliest pending timer.
Option a) can be done - we might add something like
`getInputWatermark()` and `getOutputWatermark()` to
`DoFn.OnTimerContext`, and throw exception when user tries to
setup timer for time before input watermark. Effectively,
that way we will let the user know, that his timer was set to
time T1, but was fired at T2. But, that seems to be breaking
change, unfortunately.
What do you think?
Jan
On 6/20/19 5:29 PM, Reuven Lax wrote:
On Thu, Jun 20, 2019 at 3:08 PM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Hi,
this problem seems to be harder than I thought. I have a
somewhat working code in [1], but there are still
failing some tests (now tests for ReduceFnRunner), but
I'm not sure, if the problem is not in the tests, so
that my current behavior is actually correct. Let me
explain the problem:
- let's have a fixed window with allowed lateness of 1 ms
- let's add two elements into the window (on time), no
late elements
- now, ReduceFnRunner with default trigger will set
*two* timers - one for window.maxTimestamp() and second
for window.maxTimestamp() + allowedLateness
- the previous implementation fired *both* timers at
once (within single call to ReduceFnRunner#onTimers),
but now it fires twice - once for the first timer and
second for the other
I would be cautious changing this. Being able to put
multiple timers in the same bundle saves a lot, and if we
force them to all run separate through ReduceFnRunner we
risk regressing performance of some pipelines.
- the result of this is that although in both cases
only single pane is emitted, in my branch the fired pane
doesn't have the `isLast` flag set (that is because the
window is not yet garbage collected - waiting for late
data - but the second time it is not fired, because no
late data arrived)
Would anyone know what is actually the correct behavior
regarding the PaneInfo.isLast? I suppose there are only
two options - either two panes can come with isLast flag
(both end-of-window and late), or it might be possible,
that no pane will marked with this flag (because no late
pane is fired).
Jan
[1] https://github.com/apache/beam/pull/8815
On 6/10/19 6:26 PM, Jan Lukavský wrote:
It seems to me that watermark hold cannot change it
(currently), because in the current implementation
timers fire according to input watermark, but watermark
holds apply to output watermark. If I didn't miss anything.
Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik
<[email protected]> <mailto:[email protected]>:
I see. Is there a missing watermark hold for timers
less then T2?
On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Yes, there is no difference between GC and user
timers in this case. I think the problem is
simply that when watermark moves from time T1
to T2, DirectRunner fires all timers that fire
until T2, but that can create new timers for
time between T1 and T2, and these will be fired
later, although should have been fired before T2.
Jan
On 6/10/19 5:48 PM, Kenneth Knowles wrote:
Reading your Jira, I believe this problem
will manifest without the interaction of
user timers and GC. Interesting case. It
surrounds whether a runner makes a timer
available or fires it prior to the bundle
being committed.
I have commented elsewhere about this part,
quoting the Jira:
> have experimented with this a little and
have not yet figured out what the correct
solution should be. What I tried:
> 1) hold input watermark for min(setup timers)
> 2) fire timers based not on input
watermark, but on output watermark (output
watermark is held by min timer stamp)
Neither of these quite works. What we need
is a separate "element input watermark" and
"timer input watermark". The overall input
watermark that drives GC is the min of
these. The output watermark is also held to
this overall input watermark. User timers
fire according to the element input watermark.
Kenn
On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik
<[email protected]
<mailto:[email protected]>> wrote:
Jan are you editing the implementation
of how timers work within the
DirectRunner or are trying to build
support for time sorted input on top of
the Beam model for timers?
Because I think you will need to do the
former.
On Mon, Jun 10, 2019 at 8:41 AM Jan
Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hm, that would probably work, thanks!
But, should the timers behave like
that? I'm trying to fix tris by
introducing a sequence of watermarks
inputs watermark -> timer
watermark -> output watermark
as suggested in the JIRA, and it
actually seems to be working as
expected. It even cleans some code
paths, but I'm debugging some
strange behavior this exposed -
`WatermarkHold.watermarkHoldTagForTimestampCombiner`
seems to have stopped clearing
itself after this change and some
Pipelines therefore stopped
working. I'm little lost why this
happened. I can push code I have if
anyone interested.
Jan
On 6/10/19 5:32 PM, Lukasz Cwik wrote:
We hit an instance of this
problem before and solved it
rescheduling the GC timer again
if there was a conflicting
timer that was also meant to fire.
On Mon, Jun 10, 2019 at 8:17 AM
Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
For a single key. I'm
getting into collision of
timerId
`__StatefulParDoGcTimerId`
(StatefulDoFnRunner) and my
timerId for flushing sorted
elements in implementation
of
@RequiresTimeSortedInput.
The timers are being
swapped at the end of input
(but it can happen anywhere
near end of window), which
results in state being
cleared before it gets
flushed, which means data loss.
Jan
On 6/10/19 5:08 PM, Reuven
Lax wrote:
Do you mean for a
single key or across keys?
On Mon, Jun 10, 2019,
5:11 AM Jan Lukavský
<[email protected]
<mailto:[email protected]>>
wrote:
Hi,
I have come across
issue [1], where
I'm not sure how to
solve this in
most elegant way.
Any suggestions?
Thanks,
Jan
[1]
https://issues.apache.org/jira/browse/BEAM-7520
It seems to me that watermark hold cannot change it (currently),
because in the current implementation timers fire according to input watermark,
but watermark holds apply to output watermark. If I didn't miss anything.
Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik<[email protected]>
<mailto:[email protected]>:
I see. Is there a missing watermark hold for timers less then T2?
On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský<[email protected]>
<mailto:[email protected]> wrote:
Yes, there is no difference between GC and user timers in this
case. I think the problem is simply that when watermark moves from time T1 to
T2, DirectRunner fires all timers that fire until T2, but that can create new
timers for time between T1 and T2, and these will be fired later, although
should have been fired before T2.
Jan
On 6/10/19 5:48 PM, Kenneth Knowles wrote:
Reading your Jira, I believe this problem will manifest without the
interaction of user timers and GC. Interesting case. It surrounds whether a
runner makes a timer available or fires it prior to the bundle being committed.
I have commented elsewhere about this part, quoting the Jira:
have experimented with this a little and have not yet figured out
what the correct solution should be. What I tried:
1) hold input watermark for min(setup timers)
2) fire timers based not on input watermark, but on output
watermark (output watermark is held by min timer stamp)
Neither of these quite works. What we need is a separate "element input
watermark" and "timer input watermark". The overall input watermark that drives GC
is the min of these. The output watermark is also held to this overall input watermark. User timers
fire according to the element input watermark.
Kenn
On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik<[email protected]>
<mailto:[email protected]> wrote:
Jan are you editing the implementation of how timers work within
the DirectRunner or are trying to build support for time sorted input on top of
the Beam model for timers?
Because I think you will need to do the former.
On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský<[email protected]>
<mailto:[email protected]> wrote:
Hm, that would probably work, thanks!
But, should the timers behave like that? I'm trying to fix tris by
introducing a sequence of watermarks
inputs watermark -> timer watermark -> output watermark
as suggested in the JIRA, and it actually seems to be working as
expected. It even cleans some code paths, but I'm debugging some strange
behavior this exposed - `WatermarkHold.watermarkHoldTagForTimestampCombiner`
seems to have stopped clearing itself after this change and some Pipelines
therefore stopped working. I'm little lost why this happened. I can push code I
have if anyone interested.
Jan
On 6/10/19 5:32 PM, Lukasz Cwik wrote:
We hit an instance of this problem before and solved it
rescheduling the GC timer again if there was a conflicting timer that was also
meant to fire.
On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský<[email protected]>
<mailto:[email protected]> wrote:
For a single key. I'm getting into collision of timerId
`__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for flushing
sorted elements in implementation of @RequiresTimeSortedInput. The timers are
being swapped at the end of input (but it can happen anywhere near end of
window), which results in state being cleared before it gets flushed, which
means data loss.
Jan
On 6/10/19 5:08 PM, Reuven Lax wrote:
Do you mean for a single key or across keys?
On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský<[email protected]>
<mailto:[email protected]> wrote:
Hi,
I have come across issue [1], where I'm not sure how to solve this
in
most elegant way.
Any suggestions?
Thanks,
Jan
[1]https://issues.apache.org/jira/browse/BEAM-7520