On Thu, Jun 20, 2019 at 3:08 PM Jan Lukavský <[email protected]> wrote:
> Hi, > > this problem seems to be harder than I thought. I have a somewhat working > code in [1], but there are still failing some tests (now tests for > ReduceFnRunner), but I'm not sure, if the problem is not in the tests, so > that my current behavior is actually correct. Let me explain the problem: > > - let's have a fixed window with allowed lateness of 1 ms > > - let's add two elements into the window (on time), no late elements > > - now, ReduceFnRunner with default trigger will set *two* timers - one > for window.maxTimestamp() and second for window.maxTimestamp() + > allowedLateness > > - the previous implementation fired *both* timers at once (within single > call to ReduceFnRunner#onTimers), but now it fires twice - once for the > first timer and second for the other > I would be cautious changing this. Being able to put multiple timers in the same bundle saves a lot, and if we force them to all run separate through ReduceFnRunner we risk regressing performance of some pipelines. > - the result of this is that although in both cases only single pane is > emitted, in my branch the fired pane doesn't have the `isLast` flag set > (that is because the window is not yet garbage collected - waiting for late > data - but the second time it is not fired, because no late data arrived) > > Would anyone know what is actually the correct behavior regarding the > PaneInfo.isLast? I suppose there are only two options - either two panes > can come with isLast flag (both end-of-window and late), or it might be > possible, that no pane will marked with this flag (because no late pane is > fired). > > Jan > > [1] https://github.com/apache/beam/pull/8815 > > > On 6/10/19 6:26 PM, Jan Lukavský wrote: > > It seems to me that watermark hold cannot change it (currently), because > in the current implementation timers fire according to input watermark, but > watermark holds apply to output watermark. If I didn't miss anything. > > Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <[email protected]> > <[email protected]>: > > I see. Is there a missing watermark hold for timers less then T2? > > On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <[email protected]> wrote: > > Yes, there is no difference between GC and user timers in this case. I > think the problem is simply that when watermark moves from time T1 to T2, > DirectRunner fires all timers that fire until T2, but that can create new > timers for time between T1 and T2, and these will be fired later, although > should have been fired before T2. > > Jan > On 6/10/19 5:48 PM, Kenneth Knowles wrote: > > Reading your Jira, I believe this problem will manifest without the > interaction of user timers and GC. Interesting case. It surrounds whether a > runner makes a timer available or fires it prior to the bundle being > committed. > > I have commented elsewhere about this part, quoting the Jira: > > > have experimented with this a little and have not yet figured out what > the correct solution should be. What I tried: > > 1) hold input watermark for min(setup timers) > > 2) fire timers based not on input watermark, but on output watermark > (output watermark is held by min timer stamp) > > Neither of these quite works. What we need is a separate "element input > watermark" and "timer input watermark". The overall input watermark that > drives GC is the min of these. The output watermark is also held to this > overall input watermark. User timers fire according to the element input > watermark. > > Kenn > > On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <[email protected]> wrote: > > Jan are you editing the implementation of how timers work within the > DirectRunner or are trying to build support for time sorted input on top of > the Beam model for timers? > Because I think you will need to do the former. > > On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <[email protected]> wrote: > > Hm, that would probably work, thanks! > > But, should the timers behave like that? I'm trying to fix tris by > introducing a sequence of watermarks > > inputs watermark -> timer watermark -> output watermark > > as suggested in the JIRA, and it actually seems to be working as expected. > It even cleans some code paths, but I'm debugging some strange behavior > this exposed - `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems > to have stopped clearing itself after this change and some Pipelines > therefore stopped working. I'm little lost why this happened. I can push > code I have if anyone interested. > > Jan > On 6/10/19 5:32 PM, Lukasz Cwik wrote: > > We hit an instance of this problem before and solved it rescheduling the > GC timer again if there was a conflicting timer that was also meant to fire. > > On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <[email protected]> wrote: > > For a single key. I'm getting into collision of timerId > `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for flushing > sorted elements in implementation of @RequiresTimeSortedInput. The timers > are being swapped at the end of input (but it can happen anywhere near end > of window), which results in state being cleared before it gets flushed, > which means data loss. > > Jan > On 6/10/19 5:08 PM, Reuven Lax wrote: > > Do you mean for a single key or across keys? > > On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <[email protected]> wrote: > > Hi, > > I have come across issue [1], where I'm not sure how to solve this in > most elegant way. > > Any suggestions? > > Thanks, > > Jan > > [1] https://issues.apache.org/jira/browse/BEAM-7520 > > > > It seems to me that watermark hold cannot change it (currently), because in > the current implementation timers fire according to input watermark, but > watermark holds apply to output watermark. If I didn't miss anything. > > Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <[email protected]> > <[email protected]>: > > I see. Is there a missing watermark hold for timers less then T2? > > On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <[email protected]> > <[email protected]> wrote: > > Yes, there is no difference between GC and user timers in this case. I think > the problem is simply that when watermark moves from time T1 to T2, > DirectRunner fires all timers that fire until T2, but that can create new > timers for time between T1 and T2, and these will be fired later, although > should have been fired before T2. > > Jan > > On 6/10/19 5:48 PM, Kenneth Knowles wrote: > > Reading your Jira, I believe this problem will manifest without the > interaction of user timers and GC. Interesting case. It surrounds whether a > runner makes a timer available or fires it prior to the bundle being > committed. > > I have commented elsewhere about this part, quoting the Jira: > > > have experimented with this a little and have not yet figured out what the > correct solution should be. What I tried: > 1) hold input watermark for min(setup timers) > 2) fire timers based not on input watermark, but on output watermark (output > watermark is held by min timer stamp) > > Neither of these quite works. What we need is a separate "element input > watermark" and "timer input watermark". The overall input watermark that > drives GC is the min of these. The output watermark is also held to this > overall input watermark. User timers fire according to the element input > watermark. > > Kenn > > On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <[email protected]> > <[email protected]> wrote: > > Jan are you editing the implementation of how timers work within the > DirectRunner or are trying to build support for time sorted input on top of > the Beam model for timers? > Because I think you will need to do the former. > > On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <[email protected]> > <[email protected]> wrote: > > Hm, that would probably work, thanks! > > But, should the timers behave like that? I'm trying to fix tris by > introducing a sequence of watermarks > > inputs watermark -> timer watermark -> output watermark > > as suggested in the JIRA, and it actually seems to be working as expected. It > even cleans some code paths, but I'm debugging some strange behavior this > exposed - `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have > stopped clearing itself after this change and some Pipelines therefore > stopped working. I'm little lost why this happened. I can push code I have if > anyone interested. > > Jan > > On 6/10/19 5:32 PM, Lukasz Cwik wrote: > > We hit an instance of this problem before and solved it rescheduling the GC > timer again if there was a conflicting timer that was also meant to fire. > > On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <[email protected]> > <[email protected]> wrote: > > For a single key. I'm getting into collision of timerId > `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for flushing > sorted elements in implementation of @RequiresTimeSortedInput. The timers are > being swapped at the end of input (but it can happen anywhere near end of > window), which results in state being cleared before it gets flushed, which > means data loss. > > Jan > > On 6/10/19 5:08 PM, Reuven Lax wrote: > > Do you mean for a single key or across keys? > > On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <[email protected]> > <[email protected]> wrote: > > Hi, > > I have come across issue [1], where I'm not sure how to solve this in > most elegant way. > > Any suggestions? > > Thanks, > > Jan > > [1] https://issues.apache.org/jira/browse/BEAM-7520 > >
