I think BEAM-2535 is independent. On Thu, Jun 20, 2019 at 9:47 PM Lukasz Cwik <[email protected]> wrote:
> Does BEAM-2535 provide more context? > > On Thu, Jun 20, 2019 at 12:44 PM Reuven Lax <[email protected]> wrote: > >> >> >> On Thu, Jun 20, 2019 at 9:35 PM Jan Lukavský <[email protected]> wrote: >> >>> >>> On 6/20/19 9:30 PM, Reuven Lax wrote: >>> >>> >>> >>> On Thu, Jun 20, 2019 at 8:54 PM Jan Lukavský <[email protected]> wrote: >>> >>>> > But that is exactly how time advances. Watermarks often don't move >>>> smoothly, as a single old element can hold up the watermark. When that >>>> element is finished, the watermark can jump forward in time, triggering >>>> many timers. >>>> >>>> Sure. Absolutely agree. But the move from time T1 to T2 can be viewed >>>> as discrete jump, or smooth move, so that when you fire timer, any internal >>>> timings are set to the actual timestamp of the timer. I believe that is how >>>> flink works. And this might be related to the fact that Flink lacks concept >>>> of bundles. >>>> >>>> > I'm not sure how this breaks that invariant. The input watermark has >>>> only moved forward, as should be true fo the output watermark. The output >>>> watermark is help up by watermark holds in the step, which usually means >>>> that the output watermark is already being help to the earliest pending >>>> timer. >>>> >>>> The problem was stated at the beginning of this thread. I can restate >>>> it: >>>> >>>> - let's have four times - T0 < T1 < T2 < T3 >>>> >>>> - let's have a two timers A and B, set for time T1 and T3, respectively >>>> >>>> - watermark moves time from T0 to T3 >>>> >>>> - that move fires both timers A and B (in this order), *but* timer A is >>>> free to set more timers, let's suppose it sets timer for T2 >>>> >>>> - the second instance of timer A (set for T2) will fire *after* timer B >>>> (set for T3), breaking time invariant >>>> >>> Ah, by time invariant you mean the in-order firing of timers?] >>> >>> Yes, sorry, I meant "time monotonicity invariant with relation to >>> timers". Basically that timers should be fired in timestamp order, because >>> otherwise it might cause unpredictable results. >>> >> >> I think there were similar issues with resetting timers. If you reset a >> timer to a different timestamp, but a firing of that timer is already in >> the bundle at the old timestamp. I believe that either choice (modify the >> bundle or allow the timer to fire) can lead to consistency problems. Kenn >> might remember the details here. >> >> >> >>> >>> >>>> Jan >>>> On 6/20/19 8:43 PM, Reuven Lax wrote: >>>> >>>> >>>> >>>> On Thu, Jun 20, 2019 at 8:03 PM Jan Lukavský <[email protected]> wrote: >>>> >>>>> Hi Reuven, >>>>> >>>>> > I would be cautious changing this. Being able to put multiple timers >>>>> in the same bundle saves a lot, and if we force them to all run separate >>>>> through ReduceFnRunner we risk regressing performance of some pipelines. >>>>> >>>>> I understand your point. The issue here is that, the current behavior >>>>> is at least ... unexpected. There might be one different conceptual >>>>> approach to that: >>>>> >>>>> a) if a bundle contains timers for several distinct timestamps (say >>>>> T1 and T2), then it implies, that timer T1 is effectively not fired at >>>>> time >>>>> T1, but at time T2 - that is due to the fact, that logically, the time >>>>> hopped discretely from some previous time T0 to T2 without any "stopping >>>>> by". Hence, it should be invalid to setup timer for any time lower than T2 >>>>> . >>>>> >>>> >>>> But that is exactly how time advances. Watermarks often don't move >>>> smoothly, as a single old element can hold up the watermark. When that >>>> element is finished, the watermark can jump forward in time, triggering >>>> many timers. >>>> >>>> >>>>> b) the time will move smoothly (or, millisecond precision smoothly), >>>>> but that implies, that there cannot be more distinct timers inside single >>>>> bundle. >>>>> >>>>> If we don't want to take path b), we are probably left with path a) >>>>> (as doing nothing seems weird, because it breaks one invariant, that time >>>>> can only move forward). >>>>> >>>> I'm not sure how this breaks that invariant. The input watermark has >>>> only moved forward, as should be true fo the output watermark. The output >>>> watermark is help up by watermark holds in the step, which usually means >>>> that the output watermark is already being help to the earliest pending >>>> timer. >>>> >>>> >>>>> Option a) can be done - we might add something like >>>>> `getInputWatermark()` and `getOutputWatermark()` to `DoFn.OnTimerContext`, >>>>> and throw exception when user tries to setup timer for time before input >>>>> watermark. Effectively, that way we will let the user know, that his timer >>>>> was set to time T1, but was fired at T2. But, that seems to be breaking >>>>> change, unfortunately. >>>>> >>>>> What do you think? >>>>> >>>>> Jan >>>>> On 6/20/19 5:29 PM, Reuven Lax wrote: >>>>> >>>>> >>>>> >>>>> On Thu, Jun 20, 2019 at 3:08 PM Jan Lukavský <[email protected]> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> this problem seems to be harder than I thought. I have a somewhat >>>>>> working code in [1], but there are still failing some tests (now tests >>>>>> for >>>>>> ReduceFnRunner), but I'm not sure, if the problem is not in the tests, so >>>>>> that my current behavior is actually correct. Let me explain the problem: >>>>>> >>>>>> - let's have a fixed window with allowed lateness of 1 ms >>>>>> >>>>>> - let's add two elements into the window (on time), no late elements >>>>>> >>>>>> - now, ReduceFnRunner with default trigger will set *two* timers - >>>>>> one for window.maxTimestamp() and second for window.maxTimestamp() + >>>>>> allowedLateness >>>>>> >>>>>> - the previous implementation fired *both* timers at once (within >>>>>> single call to ReduceFnRunner#onTimers), but now it fires twice - once >>>>>> for >>>>>> the first timer and second for the other >>>>>> >>>>> >>>>> I would be cautious changing this. Being able to put multiple timers >>>>> in the same bundle saves a lot, and if we force them to all run separate >>>>> through ReduceFnRunner we risk regressing performance of some pipelines. >>>>> >>>>> >>>>>> - the result of this is that although in both cases only single pane >>>>>> is emitted, in my branch the fired pane doesn't have the `isLast` flag >>>>>> set >>>>>> (that is because the window is not yet garbage collected - waiting for >>>>>> late >>>>>> data - but the second time it is not fired, because no late data arrived) >>>>>> >>>>>> Would anyone know what is actually the correct behavior regarding the >>>>>> PaneInfo.isLast? I suppose there are only two options - either two panes >>>>>> can come with isLast flag (both end-of-window and late), or it might be >>>>>> possible, that no pane will marked with this flag (because no late pane >>>>>> is >>>>>> fired). >>>>>> >>>>>> Jan >>>>>> >>>>>> [1] https://github.com/apache/beam/pull/8815 >>>>>> >>>>>> >>>>>> On 6/10/19 6:26 PM, Jan Lukavský wrote: >>>>>> >>>>>> It seems to me that watermark hold cannot change it (currently), >>>>>> because in the current implementation timers fire according to input >>>>>> watermark, but watermark holds apply to output watermark. If I didn't >>>>>> miss >>>>>> anything. >>>>>> >>>>>> Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <[email protected]> >>>>>> <[email protected]>: >>>>>> >>>>>> I see. Is there a missing watermark hold for timers less then T2? >>>>>> >>>>>> On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <[email protected]> wrote: >>>>>> >>>>>> Yes, there is no difference between GC and user timers in this case. >>>>>> I think the problem is simply that when watermark moves from time T1 to >>>>>> T2, >>>>>> DirectRunner fires all timers that fire until T2, but that can create new >>>>>> timers for time between T1 and T2, and these will be fired later, >>>>>> although >>>>>> should have been fired before T2. >>>>>> >>>>>> Jan >>>>>> On 6/10/19 5:48 PM, Kenneth Knowles wrote: >>>>>> >>>>>> Reading your Jira, I believe this problem will manifest without the >>>>>> interaction of user timers and GC. Interesting case. It surrounds >>>>>> whether a >>>>>> runner makes a timer available or fires it prior to the bundle being >>>>>> committed. >>>>>> >>>>>> I have commented elsewhere about this part, quoting the Jira: >>>>>> >>>>>> > have experimented with this a little and have not yet figured out >>>>>> what the correct solution should be. What I tried: >>>>>> > 1) hold input watermark for min(setup timers) >>>>>> > 2) fire timers based not on input watermark, but on output >>>>>> watermark (output watermark is held by min timer stamp) >>>>>> >>>>>> Neither of these quite works. What we need is a separate "element >>>>>> input watermark" and "timer input watermark". The overall input watermark >>>>>> that drives GC is the min of these. The output watermark is also held to >>>>>> this overall input watermark. User timers fire according to the element >>>>>> input watermark. >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <[email protected]> wrote: >>>>>> >>>>>> Jan are you editing the implementation of how timers work within the >>>>>> DirectRunner or are trying to build support for time sorted input on top >>>>>> of >>>>>> the Beam model for timers? >>>>>> Because I think you will need to do the former. >>>>>> >>>>>> On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <[email protected]> wrote: >>>>>> >>>>>> Hm, that would probably work, thanks! >>>>>> >>>>>> But, should the timers behave like that? I'm trying to fix tris by >>>>>> introducing a sequence of watermarks >>>>>> >>>>>> inputs watermark -> timer watermark -> output watermark >>>>>> >>>>>> as suggested in the JIRA, and it actually seems to be working as >>>>>> expected. It even cleans some code paths, but I'm debugging some strange >>>>>> behavior this exposed - >>>>>> `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have >>>>>> stopped >>>>>> clearing itself after this change and some Pipelines therefore stopped >>>>>> working. I'm little lost why this happened. I can push code I have if >>>>>> anyone interested. >>>>>> >>>>>> Jan >>>>>> On 6/10/19 5:32 PM, Lukasz Cwik wrote: >>>>>> >>>>>> We hit an instance of this problem before and solved it rescheduling >>>>>> the GC timer again if there was a conflicting timer that was also meant >>>>>> to >>>>>> fire. >>>>>> >>>>>> On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <[email protected]> wrote: >>>>>> >>>>>> For a single key. I'm getting into collision of timerId >>>>>> `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for >>>>>> flushing >>>>>> sorted elements in implementation of @RequiresTimeSortedInput. The timers >>>>>> are being swapped at the end of input (but it can happen anywhere near >>>>>> end >>>>>> of window), which results in state being cleared before it gets flushed, >>>>>> which means data loss. >>>>>> >>>>>> Jan >>>>>> On 6/10/19 5:08 PM, Reuven Lax wrote: >>>>>> >>>>>> Do you mean for a single key or across keys? >>>>>> >>>>>> On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <[email protected]> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> I have come across issue [1], where I'm not sure how to solve this in >>>>>> most elegant way. >>>>>> >>>>>> Any suggestions? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Jan >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/BEAM-7520 >>>>>> >>>>>> >>>>>> >>>>>> It seems to me that watermark hold cannot change it (currently), because >>>>>> in the current implementation timers fire according to input watermark, >>>>>> but watermark holds apply to output watermark. If I didn't miss anything. >>>>>> >>>>>> Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <[email protected]> >>>>>> <[email protected]>: >>>>>> >>>>>> I see. Is there a missing watermark hold for timers less then T2? >>>>>> >>>>>> On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <[email protected]> >>>>>> <[email protected]> wrote: >>>>>> >>>>>> Yes, there is no difference between GC and user timers in this case. I >>>>>> think the problem is simply that when watermark moves from time T1 to >>>>>> T2, DirectRunner fires all timers that fire until T2, but that can >>>>>> create new timers for time between T1 and T2, and these will be fired >>>>>> later, although should have been fired before T2. >>>>>> >>>>>> Jan >>>>>> >>>>>> On 6/10/19 5:48 PM, Kenneth Knowles wrote: >>>>>> >>>>>> Reading your Jira, I believe this problem will manifest without the >>>>>> interaction of user timers and GC. Interesting case. It surrounds >>>>>> whether a runner makes a timer available or fires it prior to the bundle >>>>>> being committed. >>>>>> >>>>>> I have commented elsewhere about this part, quoting the Jira: >>>>>> >>>>>> >>>>>> have experimented with this a little and have not yet figured out what >>>>>> the correct solution should be. What I tried: >>>>>> 1) hold input watermark for min(setup timers) >>>>>> 2) fire timers based not on input watermark, but on output watermark >>>>>> (output watermark is held by min timer stamp) >>>>>> >>>>>> Neither of these quite works. What we need is a separate "element input >>>>>> watermark" and "timer input watermark". The overall input watermark that >>>>>> drives GC is the min of these. The output watermark is also held to this >>>>>> overall input watermark. User timers fire according to the element input >>>>>> watermark. >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <[email protected]> >>>>>> <[email protected]> wrote: >>>>>> >>>>>> Jan are you editing the implementation of how timers work within the >>>>>> DirectRunner or are trying to build support for time sorted input on top >>>>>> of the Beam model for timers? >>>>>> Because I think you will need to do the former. >>>>>> >>>>>> On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <[email protected]> >>>>>> <[email protected]> wrote: >>>>>> >>>>>> Hm, that would probably work, thanks! >>>>>> >>>>>> But, should the timers behave like that? I'm trying to fix tris by >>>>>> introducing a sequence of watermarks >>>>>> >>>>>> inputs watermark -> timer watermark -> output watermark >>>>>> >>>>>> as suggested in the JIRA, and it actually seems to be working as >>>>>> expected. It even cleans some code paths, but I'm debugging some strange >>>>>> behavior this exposed - >>>>>> `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have >>>>>> stopped clearing itself after this change and some Pipelines therefore >>>>>> stopped working. I'm little lost why this happened. I can push code I >>>>>> have if anyone interested. >>>>>> >>>>>> Jan >>>>>> >>>>>> On 6/10/19 5:32 PM, Lukasz Cwik wrote: >>>>>> >>>>>> We hit an instance of this problem before and solved it rescheduling the >>>>>> GC timer again if there was a conflicting timer that was also meant to >>>>>> fire. >>>>>> >>>>>> On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <[email protected]> >>>>>> <[email protected]> wrote: >>>>>> >>>>>> For a single key. I'm getting into collision of timerId >>>>>> `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for >>>>>> flushing sorted elements in implementation of @RequiresTimeSortedInput. >>>>>> The timers are being swapped at the end of input (but it can happen >>>>>> anywhere near end of window), which results in state being cleared >>>>>> before it gets flushed, which means data loss. >>>>>> >>>>>> Jan >>>>>> >>>>>> On 6/10/19 5:08 PM, Reuven Lax wrote: >>>>>> >>>>>> Do you mean for a single key or across keys? >>>>>> >>>>>> On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <[email protected]> >>>>>> <[email protected]> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> I have come across issue [1], where I'm not sure how to solve this in >>>>>> most elegant way. >>>>>> >>>>>> Any suggestions? >>>>>> >>>>>> Thanks, >>>>>> >>>>>> Jan >>>>>> >>>>>> [1] https://issues.apache.org/jira/browse/BEAM-7520 >>>>>> >>>>>>
