I think BEAM-2535 is independent.

On Thu, Jun 20, 2019 at 9:47 PM Lukasz Cwik <[email protected]> wrote:

> Does BEAM-2535 provide more context?
>
> On Thu, Jun 20, 2019 at 12:44 PM Reuven Lax <[email protected]> wrote:
>
>>
>>
>> On Thu, Jun 20, 2019 at 9:35 PM Jan Lukavský <[email protected]> wrote:
>>
>>>
>>> On 6/20/19 9:30 PM, Reuven Lax wrote:
>>>
>>>
>>>
>>> On Thu, Jun 20, 2019 at 8:54 PM Jan Lukavský <[email protected]> wrote:
>>>
>>>> > But that is exactly how time advances. Watermarks often don't move
>>>> smoothly, as a single old element can hold up the watermark. When that
>>>> element is finished, the watermark can jump forward in time, triggering
>>>> many timers.
>>>>
>>>> Sure. Absolutely agree. But the move from time T1 to T2 can be viewed
>>>> as discrete jump, or smooth move, so that when you fire timer, any internal
>>>> timings are set to the actual timestamp of the timer. I believe that is how
>>>> flink works. And this might be related to the fact that Flink lacks concept
>>>> of bundles.
>>>>
>>>> > I'm not sure how this breaks that invariant. The input watermark has
>>>> only moved forward, as should be true fo the output watermark. The output
>>>> watermark is help up by watermark holds in the step, which usually means
>>>> that the output watermark is already being help to the earliest pending
>>>> timer.
>>>>
>>>> The problem was stated at the beginning of this thread. I can restate
>>>> it:
>>>>
>>>> - let's have four times - T0 < T1 < T2 < T3
>>>>
>>>> - let's have a two timers A and B, set for time T1 and T3, respectively
>>>>
>>>> - watermark moves time from T0 to T3
>>>>
>>>> - that move fires both timers A and B (in this order), *but* timer A is
>>>> free to set more timers, let's suppose it sets timer for T2
>>>>
>>>> - the second instance of timer A (set for T2) will fire *after* timer B
>>>> (set for T3), breaking time invariant
>>>>
>>> Ah, by time invariant you mean the in-order firing of timers?]
>>>
>>> Yes, sorry, I meant "time monotonicity invariant with relation to
>>> timers". Basically that timers should be fired in timestamp order, because
>>> otherwise it might cause unpredictable results.
>>>
>>
>> I think there were similar issues with resetting timers. If you reset a
>> timer to a different timestamp, but a firing of that timer is already in
>> the bundle at the old timestamp. I believe that either choice (modify the
>> bundle or allow the timer to fire) can lead to consistency problems. Kenn
>> might remember the details here.
>>
>>
>>
>>>
>>>
>>>> Jan
>>>> On 6/20/19 8:43 PM, Reuven Lax wrote:
>>>>
>>>>
>>>>
>>>> On Thu, Jun 20, 2019 at 8:03 PM Jan Lukavský <[email protected]> wrote:
>>>>
>>>>> Hi Reuven,
>>>>>
>>>>> > I would be cautious changing this. Being able to put multiple timers
>>>>> in the same bundle saves a lot, and if we force them to all run separate
>>>>> through ReduceFnRunner we risk regressing performance of some pipelines.
>>>>>
>>>>> I understand your point. The issue here is that, the current behavior
>>>>> is at least ... unexpected. There might be one different conceptual
>>>>> approach to that:
>>>>>
>>>>>  a) if a bundle contains timers for several distinct timestamps (say
>>>>> T1 and T2), then it implies, that timer T1 is effectively not fired at 
>>>>> time
>>>>> T1, but at time T2 - that is due to the fact, that logically, the time
>>>>> hopped discretely from some previous time T0 to T2 without any "stopping
>>>>> by". Hence, it should be invalid to setup timer for any time lower than T2
>>>>> .
>>>>>
>>>>
>>>> But that is exactly how time advances. Watermarks often don't move
>>>> smoothly, as a single old element can hold up the watermark. When that
>>>> element is finished, the watermark can jump forward in time, triggering
>>>> many timers.
>>>>
>>>>
>>>>> b) the time will move smoothly (or, millisecond precision smoothly),
>>>>> but that implies, that there cannot be more distinct timers inside single
>>>>> bundle.
>>>>>
>>>>> If we don't want to take path b), we are probably left with path a)
>>>>> (as doing nothing seems weird, because it breaks one invariant, that time
>>>>> can only move forward).
>>>>>
>>>> I'm not sure how this breaks that invariant. The input watermark has
>>>> only moved forward, as should be true fo the output watermark. The output
>>>> watermark is help up by watermark holds in the step, which usually means
>>>> that the output watermark is already being help to the earliest pending
>>>> timer.
>>>>
>>>>
>>>>> Option a) can be done - we might add something like
>>>>> `getInputWatermark()` and `getOutputWatermark()` to `DoFn.OnTimerContext`,
>>>>> and throw exception when user tries to setup timer for time before input
>>>>> watermark. Effectively, that way we will let the user know, that his timer
>>>>> was set to time T1, but was fired at T2. But, that seems to be breaking
>>>>> change, unfortunately.
>>>>>
>>>>> What do you think?
>>>>>
>>>>> Jan
>>>>> On 6/20/19 5:29 PM, Reuven Lax wrote:
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Jun 20, 2019 at 3:08 PM Jan Lukavský <[email protected]> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> this problem seems to be harder than I thought. I have a somewhat
>>>>>> working code in [1], but there are still failing some tests (now tests 
>>>>>> for
>>>>>> ReduceFnRunner), but I'm not sure, if the problem is not in the tests, so
>>>>>> that my current behavior is actually correct. Let me explain the problem:
>>>>>>
>>>>>>  - let's have a fixed window with allowed lateness of 1 ms
>>>>>>
>>>>>>  - let's add two elements into the window (on time), no late elements
>>>>>>
>>>>>>  - now, ReduceFnRunner with default trigger will set *two* timers -
>>>>>> one for window.maxTimestamp() and second for window.maxTimestamp() +
>>>>>> allowedLateness
>>>>>>
>>>>>>  - the previous implementation fired *both* timers at once (within
>>>>>> single call to ReduceFnRunner#onTimers), but now it fires twice - once 
>>>>>> for
>>>>>> the first timer and second for the other
>>>>>>
>>>>>
>>>>> I would be cautious changing this. Being able to put multiple timers
>>>>> in the same bundle saves a lot, and if we force them to all run separate
>>>>> through ReduceFnRunner we risk regressing performance of some pipelines.
>>>>>
>>>>>
>>>>>>  - the result of this is that although in both cases only single pane
>>>>>> is emitted, in my branch the fired pane doesn't have the `isLast` flag 
>>>>>> set
>>>>>> (that is because the window is not yet garbage collected - waiting for 
>>>>>> late
>>>>>> data - but the second time it is not fired, because no late data arrived)
>>>>>>
>>>>>> Would anyone know what is actually the correct behavior regarding the
>>>>>> PaneInfo.isLast? I suppose there are only two options - either two panes
>>>>>> can come with isLast flag (both end-of-window and late), or it might be
>>>>>> possible, that no pane will marked with this flag (because no late pane 
>>>>>> is
>>>>>> fired).
>>>>>>
>>>>>> Jan
>>>>>>
>>>>>>  [1] https://github.com/apache/beam/pull/8815
>>>>>>
>>>>>>
>>>>>> On 6/10/19 6:26 PM, Jan Lukavský wrote:
>>>>>>
>>>>>> It seems to me that watermark hold cannot change it (currently),
>>>>>> because in the current implementation timers fire according to input
>>>>>> watermark, but watermark holds apply to output watermark. If I didn't 
>>>>>> miss
>>>>>> anything.
>>>>>>
>>>>>> Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <[email protected]>
>>>>>> <[email protected]>:
>>>>>>
>>>>>> I see. Is there a missing watermark hold for timers less then T2?
>>>>>>
>>>>>> On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <[email protected]> wrote:
>>>>>>
>>>>>> Yes, there is no difference between GC and user timers in this case.
>>>>>> I think the problem is simply that when watermark moves from time T1 to 
>>>>>> T2,
>>>>>> DirectRunner fires all timers that fire until T2, but that can create new
>>>>>> timers for time between T1 and T2, and these will be fired later, 
>>>>>> although
>>>>>> should have been fired before T2.
>>>>>>
>>>>>> Jan
>>>>>> On 6/10/19 5:48 PM, Kenneth Knowles wrote:
>>>>>>
>>>>>> Reading your Jira, I believe this problem will manifest without the
>>>>>> interaction of user timers and GC. Interesting case. It surrounds 
>>>>>> whether a
>>>>>> runner makes a timer available or fires it prior to the bundle being
>>>>>> committed.
>>>>>>
>>>>>> I have commented elsewhere about this part, quoting the Jira:
>>>>>>
>>>>>> > have experimented with this a little and have not yet figured out
>>>>>> what the correct solution should be. What I tried:
>>>>>> > 1) hold input watermark for min(setup timers)
>>>>>> > 2) fire timers based not on input watermark, but on output
>>>>>> watermark (output watermark is held by min timer stamp)
>>>>>>
>>>>>> Neither of these quite works. What we need is a separate "element
>>>>>> input watermark" and "timer input watermark". The overall input watermark
>>>>>> that drives GC is the min of these. The output watermark is also held to
>>>>>> this overall input watermark. User timers fire according to the element
>>>>>> input watermark.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <[email protected]> wrote:
>>>>>>
>>>>>> Jan are you editing the implementation of how timers work within the
>>>>>> DirectRunner or are trying to build support for time sorted input on top 
>>>>>> of
>>>>>> the Beam model for timers?
>>>>>> Because I think you will need to do the former.
>>>>>>
>>>>>> On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <[email protected]> wrote:
>>>>>>
>>>>>> Hm, that would probably work, thanks!
>>>>>>
>>>>>> But, should the timers behave like that? I'm trying to fix tris by
>>>>>> introducing a sequence of watermarks
>>>>>>
>>>>>>  inputs watermark -> timer watermark -> output watermark
>>>>>>
>>>>>> as suggested in the JIRA, and it actually seems to be working as
>>>>>> expected. It even cleans some code paths, but I'm debugging some strange
>>>>>> behavior this exposed -
>>>>>> `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have 
>>>>>> stopped
>>>>>> clearing itself after this change and some Pipelines therefore stopped
>>>>>> working. I'm little lost why this happened. I can push code I have if
>>>>>> anyone interested.
>>>>>>
>>>>>> Jan
>>>>>> On 6/10/19 5:32 PM, Lukasz Cwik wrote:
>>>>>>
>>>>>> We hit an instance of this problem before and solved it rescheduling
>>>>>> the GC timer again if there was a conflicting timer that was also meant 
>>>>>> to
>>>>>> fire.
>>>>>>
>>>>>> On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <[email protected]> wrote:
>>>>>>
>>>>>> For a single key. I'm getting into collision of timerId
>>>>>> `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for 
>>>>>> flushing
>>>>>> sorted elements in implementation of @RequiresTimeSortedInput. The timers
>>>>>> are being swapped at the end of input (but it can happen anywhere near 
>>>>>> end
>>>>>> of window), which results in state being cleared before it gets flushed,
>>>>>> which means data loss.
>>>>>>
>>>>>>  Jan
>>>>>> On 6/10/19 5:08 PM, Reuven Lax wrote:
>>>>>>
>>>>>> Do you mean for a single key or across keys?
>>>>>>
>>>>>> On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <[email protected]> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have come across issue [1], where I'm not sure how to solve this in
>>>>>> most elegant way.
>>>>>>
>>>>>> Any suggestions?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>>   Jan
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-7520
>>>>>>
>>>>>>
>>>>>>
>>>>>> It seems to me that watermark hold cannot change it (currently), because 
>>>>>> in the current implementation timers fire according to input watermark, 
>>>>>> but watermark holds apply to output watermark. If I didn't miss anything.
>>>>>>
>>>>>> Dne 10. 6. 2019 18:15 napsal uživatel Lukasz Cwik <[email protected]> 
>>>>>> <[email protected]>:
>>>>>>
>>>>>> I see. Is there a missing watermark hold for timers less then T2?
>>>>>>
>>>>>> On Mon, Jun 10, 2019 at 9:08 AM Jan Lukavský <[email protected]> 
>>>>>> <[email protected]> wrote:
>>>>>>
>>>>>> Yes, there is no difference between GC and user timers in this case. I 
>>>>>> think the problem is simply that when watermark moves from time T1 to 
>>>>>> T2, DirectRunner fires all timers that fire until T2, but that can 
>>>>>> create new timers for time between T1 and T2, and these will be fired 
>>>>>> later, although should have been fired before T2.
>>>>>>
>>>>>> Jan
>>>>>>
>>>>>> On 6/10/19 5:48 PM, Kenneth Knowles wrote:
>>>>>>
>>>>>> Reading your Jira, I believe this problem will manifest without the 
>>>>>> interaction of user timers and GC. Interesting case. It surrounds 
>>>>>> whether a runner makes a timer available or fires it prior to the bundle 
>>>>>> being committed.
>>>>>>
>>>>>> I have commented elsewhere about this part, quoting the Jira:
>>>>>>
>>>>>>
>>>>>> have experimented with this a little and have not yet figured out what 
>>>>>> the correct solution should be. What I tried:
>>>>>> 1) hold input watermark for min(setup timers)
>>>>>> 2) fire timers based not on input watermark, but on output watermark 
>>>>>> (output watermark is held by min timer stamp)
>>>>>>
>>>>>> Neither of these quite works. What we need is a separate "element input 
>>>>>> watermark" and "timer input watermark". The overall input watermark that 
>>>>>> drives GC is the min of these. The output watermark is also held to this 
>>>>>> overall input watermark. User timers fire according to the element input 
>>>>>> watermark.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Mon, Jun 10, 2019 at 8:44 AM Lukasz Cwik <[email protected]> 
>>>>>> <[email protected]> wrote:
>>>>>>
>>>>>> Jan are you editing the implementation of how timers work within the 
>>>>>> DirectRunner or are trying to build support for time sorted input on top 
>>>>>> of the Beam model for timers?
>>>>>> Because I think you will need to do the former.
>>>>>>
>>>>>> On Mon, Jun 10, 2019 at 8:41 AM Jan Lukavský <[email protected]> 
>>>>>> <[email protected]> wrote:
>>>>>>
>>>>>> Hm, that would probably work, thanks!
>>>>>>
>>>>>> But, should the timers behave like that? I'm trying to fix tris by 
>>>>>> introducing a sequence of watermarks
>>>>>>
>>>>>>  inputs watermark -> timer watermark -> output watermark
>>>>>>
>>>>>> as suggested in the JIRA, and it actually seems to be working as 
>>>>>> expected. It even cleans some code paths, but I'm debugging some strange 
>>>>>> behavior this exposed - 
>>>>>> `WatermarkHold.watermarkHoldTagForTimestampCombiner` seems to have 
>>>>>> stopped clearing itself after this change and some Pipelines therefore 
>>>>>> stopped working. I'm little lost why this happened. I can push code I 
>>>>>> have if anyone interested.
>>>>>>
>>>>>> Jan
>>>>>>
>>>>>> On 6/10/19 5:32 PM, Lukasz Cwik wrote:
>>>>>>
>>>>>> We hit an instance of this problem before and solved it rescheduling the 
>>>>>> GC timer again if there was a conflicting timer that was also meant to 
>>>>>> fire.
>>>>>>
>>>>>> On Mon, Jun 10, 2019 at 8:17 AM Jan Lukavský <[email protected]> 
>>>>>> <[email protected]> wrote:
>>>>>>
>>>>>> For a single key. I'm getting into collision of timerId 
>>>>>> `__StatefulParDoGcTimerId` (StatefulDoFnRunner) and my timerId for 
>>>>>> flushing sorted elements in implementation of @RequiresTimeSortedInput. 
>>>>>> The timers are being swapped at the end of input (but it can happen 
>>>>>> anywhere near end of window), which results in state being cleared 
>>>>>> before it gets flushed, which means data loss.
>>>>>>
>>>>>>  Jan
>>>>>>
>>>>>> On 6/10/19 5:08 PM, Reuven Lax wrote:
>>>>>>
>>>>>> Do you mean for a single key or across keys?
>>>>>>
>>>>>> On Mon, Jun 10, 2019, 5:11 AM Jan Lukavský <[email protected]> 
>>>>>> <[email protected]> wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> I have come across issue [1], where I'm not sure how to solve this in
>>>>>> most elegant way.
>>>>>>
>>>>>> Any suggestions?
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>>   Jan
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/BEAM-7520
>>>>>>
>>>>>>

Reply via email to