On Thu, Jun 20, 2019 at 9:35 PM Jan Lukavský <je...@seznam.cz> wrote:
> > On 6/20/19 9:30 PM, Reuven Lax wrote: > > > > On Thu, Jun 20, 2019 at 8:54 PM Jan Lukavský <je...@seznam.cz> wrote: > >> > But that is exactly how time advances. Watermarks often don't move >> smoothly, as a single old element can hold up the watermark. When that >> element is finished, the watermark can jump forward in time, triggering >> many timers. >> >> Sure. Absolutely agree. But the move from time T1 to T2 can be viewed as >> discrete jump, or smooth move, so that when you fire timer, any internal >> timings are set to the actual timestamp of the timer. I believe that is how >> flink works. And this might be related to the fact that Flink lacks concept >> of bundles. >> >> > I'm not sure how this breaks that invariant. The input watermark has >> only moved forward, as should be true fo the output watermark. The output >> watermark is help up by watermark holds in the step, which usually means >> that the output watermark is already being help to the earliest pending >> timer. >> >> The problem was stated at the beginning of this thread. I can restate it: >> >> - let's have four times - T0 < T1 < T2 < T3 >> >> - let's have a two timers A and B, set for time T1 and T3, respectively >> >> - watermark moves time from T0 to T3 >> >> - that move fires both timers A and B (in this order), *but* timer A is >> free to set more timers, let's suppose it sets timer for T2 >> >> - the second instance of timer A (set for T2) will fire *after* timer B >> (set for T3), breaking time invariant >> > Ah, by time invariant you mean the in-order firing of timers?] > > Yes, sorry, I meant "time monotonicity invariant with relation to timers". > Basically that timers should be fired in timestamp order, because otherwise > it might cause unpredictable results. > I think there were similar issues with resetting timers. If you reset a timer to a different timestamp, but a firing of that timer is already in the bundle at the old timestamp. I believe that either choice (modify the bundle or allow the timer to fire) can lead to consistency problems. Kenn might remember the details here. > > >> Jan >> On 6/20/19 8:43 PM, Reuven Lax wrote: >> >> >> >> On Thu, Jun 20, 2019 at 8:03 PM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Hi Reuven, >>> >>> > I would be cautious changing this. Being able to put multiple timers >>> in the same bundle saves a lot, and if we force them to all run separate >>> through ReduceFnRunner we risk regressing performance of some pipelines. >>> >>> I understand your point. The issue here is that, the current behavior is >>> at least ... unexpected. There might be one different conceptual approach >>> to that: >>> >>> a) if a bundle contains timers for several distinct timestamps (say T1 >>> and T2), then it implies, that timer T1 is effectively not fired at time >>> T1, but at time T2 - that is due to the fact, that logically, the time >>> hopped discretely from some previous time T0 to T2 without any "stopping >>> by". Hence, it should be invalid to setup timer for any time lower than T2 >>> . >>> >> >> But that is exactly how time advances. Watermarks often don't move >> smoothly, as a single old element can hold up the watermark. When that >> element is finished, the watermark can jump forward in time, triggering >> many timers. >> >> >>> b) the time will move smoothly (or, millisecond precision smoothly), but >>> that implies, that there cannot be more distinct timers inside single >>> bundle. >>> >>> If we don't want to take path b), we are probably left with path a) (as >>> doing nothing seems weird, because it breaks one invariant, that time can >>> only move forward). >>> >> I'm not sure how this breaks that invariant. The input watermark has only >> moved forward, as should be true fo the output watermark. The output >> watermark is help up by watermark holds in the step, which usually means >> that the output watermark is already being help to the earliest pending >> timer. >> >> >>> Option a) can be done - we might add something like >>> `getInputWatermark()` and `getOutputWatermark()` to `DoFn.OnTimerContext`, >>> and throw exception when user tries to setup timer for time before input >>> watermark. Effectively, that way we will let the user know, that his timer >>> was set to time T1, but was fired at T2. But, that seems to be breaking >>> change, unfortunately. >>> >>> What do you think? >>> >>> Jan >>> On 6/20/19 5:29 PM, Reuven Lax wrote: >>> >>> >>> >>> On Thu, Jun 20, 2019 at 3:08 PM Jan Lukavský <je...@seznam.cz> wrote: >>> >>>> Hi, >>>> >>>> this problem seems to be harder than I thought. I have a somewhat >>>> working code in [1], but there are still failing some tests (now tests for >>>> ReduceFnRunner), but I'm not sure, if the problem is not in the tests, so >>>> that my current behavior is actually correct. Let me explain the problem: >>>> >>>> - let's have a fixed window with allowed lateness of 1 ms >>>> >>>> - let's add two elements into the window (on time), no late elements >>>> >>>> - now, ReduceFnRunner with default trigger will set *two* timers - one >>>> for window.maxTimestamp() and second for window.maxTimestamp() + >>>> allowedLateness >>>> >>>> - the previous implementation fired *both* timers at once (within >>>> single call to ReduceFnRunner#onTimers), but now it fires twice - once for >>>> the first timer and second for the other >>>> >>> >>> I would be cautious changing this. Being able to put multiple timers in >>> the same bundle saves a lot, and if we force them to all run separate >>> through ReduceFnRunner we risk regressing performance of some pipelines. >>> >>> >>>> - the result of this is that although in both cases only single pane >>>> is emitted, in my branch the fired pane doesn't have the `isLast` flag set >>>> (that is because the window is not yet garbage collected - waiting for late >>>> data - but the second time it is not fired, because no late data arrived) >>>> >>>> Would anyone know what is actually the correct behavior regarding the >>>> PaneInfo.isLast? I suppose there are only two options - either two panes >>>> can come with isLast flag (both end-of-window and late), or it might be >>>> possible, that no pane will marked with this flag (because no late pane is >>>> fired). >>>> >>>> Jan >>>> >>>> [1] https://github.com/apache/beam/pull/8815 >>>> >>>> >>>> On 6/10/19 6:26 PM, Jan Lukavský wrote: >>>> >>>> It seems to me that watermark hold cannot change it (currently), >>>> because in the current implementation timers fire according to input >>>> watermark, but watermark holds apply to output watermark. If I didn't miss >>>> anything. >>>> >>>> Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <lc...@google.com> >>>> <lc...@google.com>: >>>> >>>> I see. Is there a missing watermark hold for timers less then T2? >>>> >>>> On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>> Yes, there is no difference between GC and user timers in this case. I >>>> think the problem is simply that when watermark moves from time T1 to T2, >>>> DirectRunner fires all timers that fire until T2, but that can create new >>>> timers for time between T1 and T2, and these will be fired later, although >>>> should have been fired before T2. >>>> >>>> Jan >>>> On 6/10/19 5:48 PM, Kenneth Knowles wrote: >>>> >>>> Reading your Jira, I believe this problem will manifest without the >>>> interaction of user timers and GC. Interesting case. It surrounds whether a >>>> runner makes a timer available or fires it prior to the bundle being >>>> committed. >>>> >>>> I have commented elsewhere about this part, quoting the Jira: >>>> >>>> > have experimented with this a little and have not yet figured out >>>> what the correct solution should be. What I tried: >>>> > 1) hold input watermark for min(setup timers) >>>> > 2) fire timers based not on input watermark, but on output watermark >>>> (output watermark is held by min timer stamp) >>>> >>>> Neither of these quite works. What we need is a separate "element input >>>> watermark" and "timer input watermark". The overall input watermark that >>>> drives GC is the min of these. The output watermark is also held to this >>>> overall input watermark. User timers fire according to the element input >>>> watermark. >>>> >>>> Kenn >>>> >>>> On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <lc...@google.com> wrote: >>>> >>>> Jan are you editing the implementation of how timers work within the >>>> DirectRunner or are trying to build support for time sorted input on top of >>>> the Beam model for timers? >>>> Because I think you will need to do the former. >>>> >>>> On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>> Hm, that would probably work, thanks! >>>> >>>> But, should the timers behave like that? I'm trying to fix tris by >>>> introducing a sequence of watermarks >>>> >>>> inputs watermark -> timer watermark -> output watermark >>>> >>>> as suggested in the JIRA, and it actually seems to be working as >>>> expected. It even cleans some code paths, but I'm debugging some strange >>>> behavior this exposed - >>>> `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have stopped >>>> clearing itself after this change and some Pipelines therefore stopped >>>> working. I'm little lost why this happened. I can push code I have if >>>> anyone interested. >>>> >>>> Jan >>>> On 6/10/19 5:32 PM, Lukasz Cwik wrote: >>>> >>>> We hit an instance of this problem before and solved it rescheduling >>>> the GC timer again if there was a conflicting timer that was also meant to >>>> fire. >>>> >>>> On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>> For a single key. I'm getting into collision of timerId >>>> `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for flushing >>>> sorted elements in implementation of @RequiresTimeSortedInput. The timers >>>> are being swapped at the end of input (but it can happen anywhere near end >>>> of window), which results in state being cleared before it gets flushed, >>>> which means data loss. >>>> >>>> Jan >>>> On 6/10/19 5:08 PM, Reuven Lax wrote: >>>> >>>> Do you mean for a single key or across keys? >>>> >>>> On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <je...@seznam.cz> wrote: >>>> >>>> Hi, >>>> >>>> I have come across issue [1], where I'm not sure how to solve this in >>>> most elegant way. >>>> >>>> Any suggestions? >>>> >>>> Thanks, >>>> >>>> Jan >>>> >>>> [1] https://issues.apache.org/jira/browse/BEAM-7520 >>>> >>>> >>>> >>>> It seems to me that watermark hold cannot change it (currently), because >>>> in the current implementation timers fire according to input watermark, >>>> but watermark holds apply to output watermark. If I didn't miss anything. >>>> >>>> Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <lc...@google.com> >>>> <lc...@google.com>: >>>> >>>> I see. Is there a missing watermark hold for timers less then T2? >>>> >>>> On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <je...@seznam.cz> >>>> <je...@seznam.cz> wrote: >>>> >>>> Yes, there is no difference between GC and user timers in this case. I >>>> think the problem is simply that when watermark moves from time T1 to T2, >>>> DirectRunner fires all timers that fire until T2, but that can create new >>>> timers for time between T1 and T2, and these will be fired later, although >>>> should have been fired before T2. >>>> >>>> Jan >>>> >>>> On 6/10/19 5:48 PM, Kenneth Knowles wrote: >>>> >>>> Reading your Jira, I believe this problem will manifest without the >>>> interaction of user timers and GC. Interesting case. It surrounds whether >>>> a runner makes a timer available or fires it prior to the bundle being >>>> committed. >>>> >>>> I have commented elsewhere about this part, quoting the Jira: >>>> >>>> >>>> have experimented with this a little and have not yet figured out what the >>>> correct solution should be. What I tried: >>>> 1) hold input watermark for min(setup timers) >>>> 2) fire timers based not on input watermark, but on output watermark >>>> (output watermark is held by min timer stamp) >>>> >>>> Neither of these quite works. What we need is a separate "element input >>>> watermark" and "timer input watermark". The overall input watermark that >>>> drives GC is the min of these. The output watermark is also held to this >>>> overall input watermark. User timers fire according to the element input >>>> watermark. >>>> >>>> Kenn >>>> >>>> On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <lc...@google.com> >>>> <lc...@google.com> wrote: >>>> >>>> Jan are you editing the implementation of how timers work within the >>>> DirectRunner or are trying to build support for time sorted input on top >>>> of the Beam model for timers? >>>> Because I think you will need to do the former. >>>> >>>> On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <je...@seznam.cz> >>>> <je...@seznam.cz> wrote: >>>> >>>> Hm, that would probably work, thanks! >>>> >>>> But, should the timers behave like that? I'm trying to fix tris by >>>> introducing a sequence of watermarks >>>> >>>> inputs watermark -> timer watermark -> output watermark >>>> >>>> as suggested in the JIRA, and it actually seems to be working as expected. >>>> It even cleans some code paths, but I'm debugging some strange behavior >>>> this exposed - `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems >>>> to have stopped clearing itself after this change and some Pipelines >>>> therefore stopped working. I'm little lost why this happened. I can push >>>> code I have if anyone interested. >>>> >>>> Jan >>>> >>>> On 6/10/19 5:32 PM, Lukasz Cwik wrote: >>>> >>>> We hit an instance of this problem before and solved it rescheduling the >>>> GC timer again if there was a conflicting timer that was also meant to >>>> fire. >>>> >>>> On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <je...@seznam.cz> >>>> <je...@seznam.cz> wrote: >>>> >>>> For a single key. I'm getting into collision of timerId >>>> `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for >>>> flushing sorted elements in implementation of @RequiresTimeSortedInput. >>>> The timers are being swapped at the end of input (but it can happen >>>> anywhere near end of window), which results in state being cleared before >>>> it gets flushed, which means data loss. >>>> >>>> Jan >>>> >>>> On 6/10/19 5:08 PM, Reuven Lax wrote: >>>> >>>> Do you mean for a single key or across keys? >>>> >>>> On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <je...@seznam.cz> >>>> <je...@seznam.cz> wrote: >>>> >>>> Hi, >>>> >>>> I have come across issue [1], where I'm not sure how to solve this in >>>> most elegant way. >>>> >>>> Any suggestions? >>>> >>>> Thanks, >>>> >>>> Jan >>>> >>>> [1] https://issues.apache.org/jira/browse/BEAM-7520 >>>> >>>>