Does BEAM-2535 provide more context? On Thu, Jun 20, 2019 at 12:44 PM Reuven Lax <[email protected]> wrote:
> > > On Thu, Jun 20, 2019 at 9:35 PM Jan Lukavský <[email protected]> wrote: > >> >> On 6/20/19 9:30 PM, Reuven Lax wrote: >> >> >> >> On Thu, Jun 20, 2019 at 8:54 PM Jan Lukavský <[email protected]> wrote: >> >>> > But that is exactly how time advances. Watermarks often don't move >>> smoothly, as a single old element can hold up the watermark. When that >>> element is finished, the watermark can jump forward in time, triggering >>> many timers. >>> >>> Sure. Absolutely agree. But the move from time T1 to T2 can be viewed as >>> discrete jump, or smooth move, so that when you fire timer, any internal >>> timings are set to the actual timestamp of the timer. I believe that is how >>> flink works. And this might be related to the fact that Flink lacks concept >>> of bundles. >>> >>> > I'm not sure how this breaks that invariant. The input watermark has >>> only moved forward, as should be true fo the output watermark. The output >>> watermark is help up by watermark holds in the step, which usually means >>> that the output watermark is already being help to the earliest pending >>> timer. >>> >>> The problem was stated at the beginning of this thread. I can restate it: >>> >>> - let's have four times - T0 < T1 < T2 < T3 >>> >>> - let's have a two timers A and B, set for time T1 and T3, respectively >>> >>> - watermark moves time from T0 to T3 >>> >>> - that move fires both timers A and B (in this order), *but* timer A is >>> free to set more timers, let's suppose it sets timer for T2 >>> >>> - the second instance of timer A (set for T2) will fire *after* timer B >>> (set for T3), breaking time invariant >>> >> Ah, by time invariant you mean the in-order firing of timers?] >> >> Yes, sorry, I meant "time monotonicity invariant with relation to >> timers". Basically that timers should be fired in timestamp order, because >> otherwise it might cause unpredictable results. >> > > I think there were similar issues with resetting timers. If you reset a > timer to a different timestamp, but a firing of that timer is already in > the bundle at the old timestamp. I believe that either choice (modify the > bundle or allow the timer to fire) can lead to consistency problems. Kenn > might remember the details here. > > > >> >> >>> Jan >>> On 6/20/19 8:43 PM, Reuven Lax wrote: >>> >>> >>> >>> On Thu, Jun 20, 2019 at 8:03 PM Jan Lukavský <[email protected]> wrote: >>> >>>> Hi Reuven, >>>> >>>> > I would be cautious changing this. Being able to put multiple timers >>>> in the same bundle saves a lot, and if we force them to all run separate >>>> through ReduceFnRunner we risk regressing performance of some pipelines. >>>> >>>> I understand your point. The issue here is that, the current behavior >>>> is at least ... unexpected. There might be one different conceptual >>>> approach to that: >>>> >>>> a) if a bundle contains timers for several distinct timestamps (say T1 >>>> and T2), then it implies, that timer T1 is effectively not fired at time >>>> T1, but at time T2 - that is due to the fact, that logically, the time >>>> hopped discretely from some previous time T0 to T2 without any "stopping >>>> by". Hence, it should be invalid to setup timer for any time lower than T2 >>>> . >>>> >>> >>> But that is exactly how time advances. Watermarks often don't move >>> smoothly, as a single old element can hold up the watermark. When that >>> element is finished, the watermark can jump forward in time, triggering >>> many timers. >>> >>> >>>> b) the time will move smoothly (or, millisecond precision smoothly), >>>> but that implies, that there cannot be more distinct timers inside single >>>> bundle. >>>> >>>> If we don't want to take path b), we are probably left with path a) (as >>>> doing nothing seems weird, because it breaks one invariant, that time can >>>> only move forward). >>>> >>> I'm not sure how this breaks that invariant. The input watermark has >>> only moved forward, as should be true fo the output watermark. The output >>> watermark is help up by watermark holds in the step, which usually means >>> that the output watermark is already being help to the earliest pending >>> timer. >>> >>> >>>> Option a) can be done - we might add something like >>>> `getInputWatermark()` and `getOutputWatermark()` to `DoFn.OnTimerContext`, >>>> and throw exception when user tries to setup timer for time before input >>>> watermark. Effectively, that way we will let the user know, that his timer >>>> was set to time T1, but was fired at T2. But, that seems to be breaking >>>> change, unfortunately. >>>> >>>> What do you think? >>>> >>>> Jan >>>> On 6/20/19 5:29 PM, Reuven Lax wrote: >>>> >>>> >>>> >>>> On Thu, Jun 20, 2019 at 3:08 PM Jan Lukavský <[email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> this problem seems to be harder than I thought. I have a somewhat >>>>> working code in [1], but there are still failing some tests (now tests for >>>>> ReduceFnRunner), but I'm not sure, if the problem is not in the tests, so >>>>> that my current behavior is actually correct. Let me explain the problem: >>>>> >>>>> - let's have a fixed window with allowed lateness of 1 ms >>>>> >>>>> - let's add two elements into the window (on time), no late elements >>>>> >>>>> - now, ReduceFnRunner with default trigger will set *two* timers - >>>>> one for window.maxTimestamp() and second for window.maxTimestamp() + >>>>> allowedLateness >>>>> >>>>> - the previous implementation fired *both* timers at once (within >>>>> single call to ReduceFnRunner#onTimers), but now it fires twice - once for >>>>> the first timer and second for the other >>>>> >>>> >>>> I would be cautious changing this. Being able to put multiple timers in >>>> the same bundle saves a lot, and if we force them to all run separate >>>> through ReduceFnRunner we risk regressing performance of some pipelines. >>>> >>>> >>>>> - the result of this is that although in both cases only single pane >>>>> is emitted, in my branch the fired pane doesn't have the `isLast` flag set >>>>> (that is because the window is not yet garbage collected - waiting for >>>>> late >>>>> data - but the second time it is not fired, because no late data arrived) >>>>> >>>>> Would anyone know what is actually the correct behavior regarding the >>>>> PaneInfo.isLast? I suppose there are only two options - either two panes >>>>> can come with isLast flag (both end-of-window and late), or it might be >>>>> possible, that no pane will marked with this flag (because no late pane is >>>>> fired). >>>>> >>>>> Jan >>>>> >>>>> [1] https://github.com/apache/beam/pull/8815 >>>>> >>>>> >>>>> On 6/10/19 6:26 PM, Jan Lukavský wrote: >>>>> >>>>> It seems to me that watermark hold cannot change it (currently), >>>>> because in the current implementation timers fire according to input >>>>> watermark, but watermark holds apply to output watermark. If I didn't miss >>>>> anything. >>>>> >>>>> Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <[email protected]> >>>>> <[email protected]>: >>>>> >>>>> I see. Is there a missing watermark hold for timers less then T2? >>>>> >>>>> On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <[email protected]> wrote: >>>>> >>>>> Yes, there is no difference between GC and user timers in this case. I >>>>> think the problem is simply that when watermark moves from time T1 to T2, >>>>> DirectRunner fires all timers that fire until T2, but that can create new >>>>> timers for time between T1 and T2, and these will be fired later, although >>>>> should have been fired before T2. >>>>> >>>>> Jan >>>>> On 6/10/19 5:48 PM, Kenneth Knowles wrote: >>>>> >>>>> Reading your Jira, I believe this problem will manifest without the >>>>> interaction of user timers and GC. Interesting case. It surrounds whether >>>>> a >>>>> runner makes a timer available or fires it prior to the bundle being >>>>> committed. >>>>> >>>>> I have commented elsewhere about this part, quoting the Jira: >>>>> >>>>> > have experimented with this a little and have not yet figured out >>>>> what the correct solution should be. What I tried: >>>>> > 1) hold input watermark for min(setup timers) >>>>> > 2) fire timers based not on input watermark, but on output watermark >>>>> (output watermark is held by min timer stamp) >>>>> >>>>> Neither of these quite works. What we need is a separate "element >>>>> input watermark" and "timer input watermark". The overall input watermark >>>>> that drives GC is the min of these. The output watermark is also held to >>>>> this overall input watermark. User timers fire according to the element >>>>> input watermark. >>>>> >>>>> Kenn >>>>> >>>>> On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <[email protected]> wrote: >>>>> >>>>> Jan are you editing the implementation of how timers work within the >>>>> DirectRunner or are trying to build support for time sorted input on top >>>>> of >>>>> the Beam model for timers? >>>>> Because I think you will need to do the former. >>>>> >>>>> On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <[email protected]> wrote: >>>>> >>>>> Hm, that would probably work, thanks! >>>>> >>>>> But, should the timers behave like that? I'm trying to fix tris by >>>>> introducing a sequence of watermarks >>>>> >>>>> inputs watermark -> timer watermark -> output watermark >>>>> >>>>> as suggested in the JIRA, and it actually seems to be working as >>>>> expected. It even cleans some code paths, but I'm debugging some strange >>>>> behavior this exposed - >>>>> `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have stopped >>>>> clearing itself after this change and some Pipelines therefore stopped >>>>> working. I'm little lost why this happened. I can push code I have if >>>>> anyone interested. >>>>> >>>>> Jan >>>>> On 6/10/19 5:32 PM, Lukasz Cwik wrote: >>>>> >>>>> We hit an instance of this problem before and solved it rescheduling >>>>> the GC timer again if there was a conflicting timer that was also meant to >>>>> fire. >>>>> >>>>> On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <[email protected]> wrote: >>>>> >>>>> For a single key. I'm getting into collision of timerId >>>>> `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for >>>>> flushing >>>>> sorted elements in implementation of @RequiresTimeSortedInput. The timers >>>>> are being swapped at the end of input (but it can happen anywhere near end >>>>> of window), which results in state being cleared before it gets flushed, >>>>> which means data loss. >>>>> >>>>> Jan >>>>> On 6/10/19 5:08 PM, Reuven Lax wrote: >>>>> >>>>> Do you mean for a single key or across keys? >>>>> >>>>> On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <[email protected]> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I have come across issue [1], where I'm not sure how to solve this in >>>>> most elegant way. >>>>> >>>>> Any suggestions? >>>>> >>>>> Thanks, >>>>> >>>>> Jan >>>>> >>>>> [1] https://issues.apache.org/jira/browse/BEAM-7520 >>>>> >>>>> >>>>> >>>>> It seems to me that watermark hold cannot change it (currently), because >>>>> in the current implementation timers fire according to input watermark, >>>>> but watermark holds apply to output watermark. If I didn't miss anything. >>>>> >>>>> Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <[email protected]> >>>>> <[email protected]>: >>>>> >>>>> I see. Is there a missing watermark hold for timers less then T2? >>>>> >>>>> On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <[email protected]> >>>>> <[email protected]> wrote: >>>>> >>>>> Yes, there is no difference between GC and user timers in this case. I >>>>> think the problem is simply that when watermark moves from time T1 to T2, >>>>> DirectRunner fires all timers that fire until T2, but that can create new >>>>> timers for time between T1 and T2, and these will be fired later, >>>>> although should have been fired before T2. >>>>> >>>>> Jan >>>>> >>>>> On 6/10/19 5:48 PM, Kenneth Knowles wrote: >>>>> >>>>> Reading your Jira, I believe this problem will manifest without the >>>>> interaction of user timers and GC. Interesting case. It surrounds whether >>>>> a runner makes a timer available or fires it prior to the bundle being >>>>> committed. >>>>> >>>>> I have commented elsewhere about this part, quoting the Jira: >>>>> >>>>> >>>>> have experimented with this a little and have not yet figured out what >>>>> the correct solution should be. What I tried: >>>>> 1) hold input watermark for min(setup timers) >>>>> 2) fire timers based not on input watermark, but on output watermark >>>>> (output watermark is held by min timer stamp) >>>>> >>>>> Neither of these quite works. What we need is a separate "element input >>>>> watermark" and "timer input watermark". The overall input watermark that >>>>> drives GC is the min of these. The output watermark is also held to this >>>>> overall input watermark. User timers fire according to the element input >>>>> watermark. >>>>> >>>>> Kenn >>>>> >>>>> On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <[email protected]> >>>>> <[email protected]> wrote: >>>>> >>>>> Jan are you editing the implementation of how timers work within the >>>>> DirectRunner or are trying to build support for time sorted input on top >>>>> of the Beam model for timers? >>>>> Because I think you will need to do the former. >>>>> >>>>> On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <[email protected]> >>>>> <[email protected]> wrote: >>>>> >>>>> Hm, that would probably work, thanks! >>>>> >>>>> But, should the timers behave like that? I'm trying to fix tris by >>>>> introducing a sequence of watermarks >>>>> >>>>> inputs watermark -> timer watermark -> output watermark >>>>> >>>>> as suggested in the JIRA, and it actually seems to be working as >>>>> expected. It even cleans some code paths, but I'm debugging some strange >>>>> behavior this exposed - >>>>> `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have >>>>> stopped clearing itself after this change and some Pipelines therefore >>>>> stopped working. I'm little lost why this happened. I can push code I >>>>> have if anyone interested. >>>>> >>>>> Jan >>>>> >>>>> On 6/10/19 5:32 PM, Lukasz Cwik wrote: >>>>> >>>>> We hit an instance of this problem before and solved it rescheduling the >>>>> GC timer again if there was a conflicting timer that was also meant to >>>>> fire. >>>>> >>>>> On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <[email protected]> >>>>> <[email protected]> wrote: >>>>> >>>>> For a single key. I'm getting into collision of timerId >>>>> `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for >>>>> flushing sorted elements in implementation of @RequiresTimeSortedInput. >>>>> The timers are being swapped at the end of input (but it can happen >>>>> anywhere near end of window), which results in state being cleared before >>>>> it gets flushed, which means data loss. >>>>> >>>>> Jan >>>>> >>>>> On 6/10/19 5:08 PM, Reuven Lax wrote: >>>>> >>>>> Do you mean for a single key or across keys? >>>>> >>>>> On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <[email protected]> >>>>> <[email protected]> wrote: >>>>> >>>>> Hi, >>>>> >>>>> I have come across issue [1], where I'm not sure how to solve this in >>>>> most elegant way. >>>>> >>>>> Any suggestions? >>>>> >>>>> Thanks, >>>>> >>>>> Jan >>>>> >>>>> [1] https://issues.apache.org/jira/browse/BEAM-7520 >>>>> >>>>>
