On Thu, Jun 20, 2019 at 3:08 PM Jan Lukavský <[email protected]> wrote:

> Hi,
>
> this problem seems to be harder than I thought. I have a somewhat working
> code in [1], but there are still failing some tests (now tests for
> ReduceFnRunner), but I'm not sure, if the problem is not in the tests, so
> that my current behavior is actually correct. Let me explain the problem:
>
>  - let's have a fixed window with allowed lateness of 1 ms
>
>  - let's add two elements into the window (on time), no late elements
>
>  - now, ReduceFnRunner with default trigger will set *two* timers - one
> for window.maxTimestamp() and second for window.maxTimestamp() +
> allowedLateness
>
>  - the previous implementation fired *both* timers at once (within single
> call to ReduceFnRunner#onTimers), but now it fires twice - once for the
> first timer and second for the other
>

I would be cautious changing this. Being able to put multiple timers in the
same bundle saves a lot, and if we force them to all run separate through
ReduceFnRunner we risk regressing performance of some pipelines.


>  - the result of this is that although in both cases only single pane is
> emitted, in my branch the fired pane doesn't have the `isLast` flag set
> (that is because the window is not yet garbage collected - waiting for late
> data - but the second time it is not fired, because no late data arrived)
>
> Would anyone know what is actually the correct behavior regarding the
> PaneInfo.isLast? I suppose there are only two options - either two panes
> can come with isLast flag (both end-of-window and late), or it might be
> possible, that no pane will marked with this flag (because no late pane is
> fired).
>
> Jan
>
>  [1] https://github.com/apache/beam/pull/8815
>
>
> On 6/10/19 6:26 PM, Jan Lukavský wrote:
>
> It seems to me that watermark hold cannot change it (currently), because
> in the current implementation timers fire according to input watermark, but
> watermark holds apply to output watermark. If I didn't miss anything.
>
> Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <[email protected]>
> <[email protected]>:
>
> I see. Is there a missing watermark hold for timers less then T2?
>
> On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <[email protected]> wrote:
>
> Yes, there is no difference between GC and user timers in this case. I
> think the problem is simply that when watermark moves from time T1 to T2,
> DirectRunner fires all timers that fire until T2, but that can create new
> timers for time between T1 and T2, and these will be fired later, although
> should have been fired before T2.
>
> Jan
> On 6/10/19 5:48 PM, Kenneth Knowles wrote:
>
> Reading your Jira, I believe this problem will manifest without the
> interaction of user timers and GC. Interesting case. It surrounds whether a
> runner makes a timer available or fires it prior to the bundle being
> committed.
>
> I have commented elsewhere about this part, quoting the Jira:
>
> > have experimented with this a little and have not yet figured out what
> the correct solution should be. What I tried:
> > 1) hold input watermark for min(setup timers)
> > 2) fire timers based not on input watermark, but on output watermark
> (output watermark is held by min timer stamp)
>
> Neither of these quite works. What we need is a separate "element input
> watermark" and "timer input watermark". The overall input watermark that
> drives GC is the min of these. The output watermark is also held to this
> overall input watermark. User timers fire according to the element input
> watermark.
>
> Kenn
>
> On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <[email protected]> wrote:
>
> Jan are you editing the implementation of how timers work within the
> DirectRunner or are trying to build support for time sorted input on top of
> the Beam model for timers?
> Because I think you will need to do the former.
>
> On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <[email protected]> wrote:
>
> Hm, that would probably work, thanks!
>
> But, should the timers behave like that? I'm trying to fix tris by
> introducing a sequence of watermarks
>
>  inputs watermark -> timer watermark -> output watermark
>
> as suggested in the JIRA, and it actually seems to be working as expected.
> It even cleans some code paths, but I'm debugging some strange behavior
> this exposed - `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems
> to have stopped clearing itself after this change and some Pipelines
> therefore stopped working. I'm little lost why this happened. I can push
> code I have if anyone interested.
>
> Jan
> On 6/10/19 5:32 PM, Lukasz Cwik wrote:
>
> We hit an instance of this problem before and solved it rescheduling the
> GC timer again if there was a conflicting timer that was also meant to fire.
>
> On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <[email protected]> wrote:
>
> For a single key. I'm getting into collision of timerId
> `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for flushing
> sorted elements in implementation of @RequiresTimeSortedInput. The timers
> are being swapped at the end of input (but it can happen anywhere near end
> of window), which results in state being cleared before it gets flushed,
> which means data loss.
>
>  Jan
> On 6/10/19 5:08 PM, Reuven Lax wrote:
>
> Do you mean for a single key or across keys?
>
> On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <[email protected]> wrote:
>
> Hi,
>
> I have come across issue [1], where I'm not sure how to solve this in
> most elegant way.
>
> Any suggestions?
>
> Thanks,
>
>   Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-7520
>
>
>
> It seems to me that watermark hold cannot change it (currently), because in 
> the current implementation timers fire according to input watermark, but 
> watermark holds apply to output watermark. If I didn't miss anything.
>
> Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <[email protected]> 
> <[email protected]>:
>
> I see. Is there a missing watermark hold for timers less then T2?
>
> On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <[email protected]> 
> <[email protected]> wrote:
>
> Yes, there is no difference between GC and user timers in this case. I think 
> the problem is simply that when watermark moves from time T1 to T2, 
> DirectRunner fires all timers that fire until T2, but that can create new 
> timers for time between T1 and T2, and these will be fired later, although 
> should have been fired before T2.
>
> Jan
>
> On 6/10/19 5:48 PM, Kenneth Knowles wrote:
>
> Reading your Jira, I believe this problem will manifest without the 
> interaction of user timers and GC. Interesting case. It surrounds whether a 
> runner makes a timer available or fires it prior to the bundle being 
> committed.
>
> I have commented elsewhere about this part, quoting the Jira:
>
>
> have experimented with this a little and have not yet figured out what the 
> correct solution should be. What I tried:
> 1) hold input watermark for min(setup timers)
> 2) fire timers based not on input watermark, but on output watermark (output 
> watermark is held by min timer stamp)
>
> Neither of these quite works. What we need is a separate "element input 
> watermark" and "timer input watermark". The overall input watermark that 
> drives GC is the min of these. The output watermark is also held to this 
> overall input watermark. User timers fire according to the element input 
> watermark.
>
> Kenn
>
> On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <[email protected]> 
> <[email protected]> wrote:
>
> Jan are you editing the implementation of how timers work within the 
> DirectRunner or are trying to build support for time sorted input on top of 
> the Beam model for timers?
> Because I think you will need to do the former.
>
> On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <[email protected]> 
> <[email protected]> wrote:
>
> Hm, that would probably work, thanks!
>
> But, should the timers behave like that? I'm trying to fix tris by 
> introducing a sequence of watermarks
>
>  inputs watermark -> timer watermark -> output watermark
>
> as suggested in the JIRA, and it actually seems to be working as expected. It 
> even cleans some code paths, but I'm debugging some strange behavior this 
> exposed - `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have 
> stopped clearing itself after this change and some Pipelines therefore 
> stopped working. I'm little lost why this happened. I can push code I have if 
> anyone interested.
>
> Jan
>
> On 6/10/19 5:32 PM, Lukasz Cwik wrote:
>
> We hit an instance of this problem before and solved it rescheduling the GC 
> timer again if there was a conflicting timer that was also meant to fire.
>
> On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <[email protected]> 
> <[email protected]> wrote:
>
> For a single key. I'm getting into collision of timerId 
> `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for flushing 
> sorted elements in implementation of @RequiresTimeSortedInput. The timers are 
> being swapped at the end of input (but it can happen anywhere near end of 
> window), which results in state being cleared before it gets flushed, which 
> means data loss.
>
>  Jan
>
> On 6/10/19 5:08 PM, Reuven Lax wrote:
>
> Do you mean for a single key or across keys?
>
> On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <[email protected]> 
> <[email protected]> wrote:
>
> Hi,
>
> I have come across issue [1], where I'm not sure how to solve this in
> most elegant way.
>
> Any suggestions?
>
> Thanks,
>
>   Jan
>
> [1] https://issues.apache.org/jira/browse/BEAM-7520
>
>

Reply via email to