Correct. This API is merged into Beam, so should be included in the next
Beam release.

On Mon, Jan 13, 2020 at 4:00 AM Aaron Dixon <[email protected]> wrote:

> Reuven, thank you much for your help and the clarity here, it's very
> helpful..
>
> Per your solution #2 -- This approach makes sense, seems semantically
> right, and something I'll explore when the timer.withOutputTimetstamp(t)
> releases. Just for clarity, there is no other way in Beam
> (mid-pipeline/post-Source) for me to affect a hold the watermark today
> until this API is released, correct?
>
> On Mon, Jan 13, 2020 at 1:22 AM Reuven Lax <[email protected]> wrote:
>
>> Semantically though, since you want the CalendarWindow aggregation to be
>> based on login timestamps, the watermark should be tracking the login
>> timestamps. The watermark is a way for the CalendarWindow to know that as
>> far as the system knows, there will be no more events that fall into that
>> window. You say that long sessions are holding back the watermark, but
>> that's exactly because those long sessions mean that there is still data
>> pending for that CalendarWindow, so it is still incomplete! The above
>> techniques might appear to solve this, but do so at the expense of somewhat
>> randomly causing data to be late or worse dropped.
>>
>> There are a couple of ways I would address this:
>>
>> 1. The simplest would be to allow the watermark to track the login
>> window, but put a trigger on the CalendarWindow (e.g. trigger every 10
>> seconds). That way whenever the trigger fires you can update the results so
>> far for that window. This means that the majority of session that are
>> complete can be output without needing to wait for the long sessions, yet
>> the window will remain open waiting for those long sessions to complete.
>>
>> 2. Another possibility is to explicitly identify those extra-long
>> sessions, and handle them differently. This I think is a better solution
>> than the above timestampSkew solution, because it's deterministic: you know
>> exactly which sessions you are handling differently. I would do this by
>> using the state+timers API to calculate the sessions, instead of the
>> sessions WindowFn. When a session is overly long, then you can stop setting
>> the watermark hold for the login time, essentially removing that long
>> session from the watermark calculation.
>>
>> One possibility for how to handle the long sessions "differently" would
>> still involve using withAllowedTimestampSkew. This still risks losing some
>> of these (if the skew ever happens to be larger than the static value you
>> set, you'll not be about to output the session). However now you know
>> you're limiting the skewed output to only those specific long sessions
>> you've chosen, which is much better than emitting all records with skew and
>> hoping that things work out.
>>
>> Reuven
>>
>> On Sun, Jan 12, 2020 at 12:07 PM Aaron Dixon <[email protected]> wrote:
>>
>>> Reuven thanks -- I understand each point although I'm trying to grapple
>>> with your concerns expressed in #3; they don't seem avoidable even w/o the
>>> allowedSkew feature.
>>>
>>> Considering your response I see a revision to my solution that omits
>>> using the allowed skew configuration but as far as I can tell still has the
>>> concerns from #3 (i.e., difficulty in reasoning about which events may be
>>> dropped.)
>>>
>>> My pipeline using the skew config looks like this:
>>>
>>> (1) CustomSessionWindow
>>> emits -> (user, login, logout) @ <logout-time>
>>> (2) ParDo
>>> -> re-emits same tuple but w/ *login* timestamp
>>>     (requires custom allowed-skew)
>>> (3) CalendarWindow
>>> -> <places in window based on **event** timestamp, which is the *login*
>>> timestamp>
>>>
>>> Instead, I can write a CustomCalendarWindow that places the tuple
>>> element in the right window based on the *login* timestamp, avoiding the
>>> need for the middle/skewing ParDo:
>>>
>>> (1) CustomSessionWindow
>>> -> (user, login, logout) @ <logout-time>
>>> (2) CustomCalendarWindow
>>> -> <*explicitly* places element in window based on the **login**
>>> timestamp>
>>>
>>> So the use of the ParDo was simply a way to avoid having to write a
>>> custom window; it essentially ensures the CalendarWindow windows based on
>>> login time.
>>>
>>> But I don't see how your concerns in #3 are obviated by this revision.
>>> Elements going in to the calendar window may be already late...this is
>>> something that any (multi-stage) Beam pipeline has to contend with, even
>>> without the deprecated allowedSkew facility, no?
>>>
>>> In other words both of these pipelines are semantically, behaviorally
>>> identical. The former just had the benefit of not requiring a custom window
>>> implementation.
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Jan 12, 2020 at 12:12 PM Reuven Lax <[email protected]> wrote:
>>>
>>>> A few comments:
>>>>
>>>> 1. Yes, this already works on Dataflow (at Beam head). Flink support is
>>>> pending at pr/10534.
>>>>
>>>> 2. Just to make sure where on the same page: getAllowedTimestampSkew is
>>>> _not_ about outputting behind the watermark. Rather it's about outputting a
>>>> timestamp that's less than the current input timestamp. If for example the
>>>> watermark is 12:00 and the current input element has a timestamp of 11:00
>>>> (because it's late), then  you can output an element at 11:00 with no need
>>>> to set this parameter. It appears that the JavaDoc is somewhat confusing on
>>>> this method.
>>>>
>>>> 3. The reason for this parameter is that the watermark only correctly
>>>> tracks timestamps internal to the pipeline if your code doesn't make
>>>> timestamps travel back in time - i.e. a ParDo taking an element with a
>>>> timestamp of 12:00 and outputting another element. If you use
>>>> getAllowedTimestampSkew your elements produced might not be tracked by the
>>>> watermark and will show up late (even if the source element is on time).
>>>> What's worse, there's a chance that the elements will be older than
>>>> allowedLateness and will get dropped altogether (this can happen even if
>>>> allowedTimestampSkew < maxAllowedLateness, because the input element might
>>>> already be late and you'll then output an element that has an even earlier
>>>> timestamp).
>>>>
>>>> 4. It sounds like you both want and don't want a watermark. You want
>>>> the watermark to not be held up by your input (so that your aggregations
>>>> keep triggering), but you then want to output old data which might prevent
>>>> the watermark from working properly, and might cause data to be dropped.
>>>> Have you considered instead using either triggers or timers to trigger your
>>>> aggregations? That way you don't need to wait for the watermark to advance
>>>> to the end of the window to trigger the aggregation, but the end-of-window
>>>> aggregation will still be correct.
>>>>
>>>> Reuven
>>>>
>>>> On Sun, Jan 12, 2020 at 8:23 AM Aaron Dixon <[email protected]> wrote:
>>>>
>>>>> Reuven thanks for your insights so far. Just wanted to press a little
>>>>> more on the deprecation question as I'm still (so far) convinced that my
>>>>> use case is quite a straightforward justification (I'm looking for
>>>>> confirmation or correction to my thinking here.) I've simplified my use
>>>>> case a bit if it helps things:
>>>>>
>>>>> Use case: "For users that login on a given calendar day, what is the
>>>>> average login time?"
>>>>>
>>>>> So I have two event types LOGIN and LOGOUT. I capture a user login
>>>>> session (using custom windowing or state api, doesn't matter) and I use 
>>>>> the
>>>>> default TimestampCombiner/END_OF_WINDOW because I want my aggregations to
>>>>> not be delayed.
>>>>>
>>>>> However per my use case requirements I must window using the LOGIN
>>>>> time. So I use outputWithTimestamp plus skew configuration to this end.
>>>>>
>>>>> Since most of my users login and logout within the same calendar day,
>>>>> I get may per-day aggregations right on time in real-time.
>>>>>
>>>>> Only for the few users that logout after the day that they login will
>>>>> I see actual late aggregations produced in which case I can leverage 
>>>>> Beam's
>>>>> various lateness configuration levers to trade completeness for storage,
>>>>> etc.
>>>>>
>>>>> This to me seems a *very* straightforward justification for my use of
>>>>> DoFn#getAllowedTimestampSkew. Should this justify not deprecating that
>>>>> facility.
>>>>>
>>>>> I realize there are other various solutions, now and coming soon, that
>>>>> involve holding the watermark -- but any solution that requires holding 
>>>>> the
>>>>> watermark means that I have to give up getting on-time aggregations at the
>>>>> very end of the calendar day (window). I would much rather (and reasonably
>>>>> so?) get on-time aggregations covering the majority of my users and be
>>>>> happy to refine these averages when my few latent users logout in a later
>>>>> day.
>>>>>
>>>>> In some Beam documentation [1] there is the idea of "unobservably late
>>>>> data". That is, I have specific elements that are output late (behind the
>>>>> watermark) but because they are guaranteed to land *within the window* and
>>>>> they are therefore promoted to be on-time. This conceptualization of 
>>>>> things
>>>>> seems very well-suited to my simple use case but definitely open to a
>>>>> different way of thinking in my approach.
>>>>>
>>>>> My main concern is that my pipeline will be leveraging a Deprecated
>>>>> facility (DoFn#getAllowedTimestampSkew) but I don't see other viable
>>>>> options (within Beam) yet.
>>>>>
>>>>> (Hope I'm not pressing too hard on this question here. I think this
>>>>> use case is interesting because it ...seems... to be a rather
>>>>> simple/distilled justification for being able to output data behind the
>>>>> watermark mid-stream.)
>>>>>
>>>>> [1] https://beam.apache.org/blog/2016/10/20/test-stream.html
>>>>>
>>>>>
>>>>> On Sat, Jan 11, 2020 at 10:10 PM Aaron Dixon <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Oh nice—that will be great—will look forward to this one! Any idea of
>>>>>> Dataflow will support?
>>>>>>
>>>>>> On Sat, Jan 11, 2020 at 9:07 PM Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>> There is now (as of last week) a way to hold back the watermark with
>>>>>>> the state API (though not yet in a released version of Beam). If you 
>>>>>>> set a
>>>>>>> timer using withOutputTimetstamp(t), the watermark will be held to t.
>>>>>>>
>>>>>>> On Sat, Jan 11, 2020 at 4:15 PM Aaron Dixon <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Reuven thanks for your quick reply
>>>>>>>>
>>>>>>>>  I've tried that but the drag it puts on the watermark was too
>>>>>>>> intrusive. For example, -- even if just a single user among many 
>>>>>>>> decided to
>>>>>>>> remain logged-in for a few days then the watermark holds everything 
>>>>>>>> else
>>>>>>>> back.
>>>>>>>>
>>>>>>>> This was when using a custom session window. I've recently been
>>>>>>>> using the State API to do my custom session tracking to avoid issues 
>>>>>>>> with
>>>>>>>> downward merging of windows (see earlier mailing list thread) ... with 
>>>>>>>> the
>>>>>>>> State API .. I'm not able to hold the watermark back (I think) ... but 
>>>>>>>> in
>>>>>>>> any case, I prefer the behavior where the watermark moves forward with 
>>>>>>>> the
>>>>>>>> upstream events and to deal with the very few straggler users by a 
>>>>>>>> lateness
>>>>>>>> configuration.
>>>>>>>>
>>>>>>>> Does that make sense? So far to me this seems very reasonable (to
>>>>>>>> want to keep the watermark moving and deal w/ the late events the few 
>>>>>>>> of
>>>>>>>> which actually fall out of the window using explicit lateness
>>>>>>>> configuration.)
>>>>>>>>
>>>>>>>> On Sat, Jan 11, 2020 at 4:57 PM Reuven Lax <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Have you looked at using
>>>>>>>>> withTimestampCombiner(TimestampCombiner.EARLIEST)? This will hold the
>>>>>>>>> downstream watermark back to the beginning of the window (presumably 
>>>>>>>>> the
>>>>>>>>> timestamp of the LOGIN event), so you can .call outputWithTimestamp 
>>>>>>>>> using
>>>>>>>>> the CLICK GREEN timestamp without needing to set the allowed-lateness 
>>>>>>>>> skew.
>>>>>>>>>
>>>>>>>>> Reuven
>>>>>>>>>
>>>>>>>>> On Sat, Jan 11, 2020 at 1:50 PM Aaron Dixon <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> I've just built a pipeline in Beam and after exploring several
>>>>>>>>>> options for my use case, I've ended up relying on the deprecated
>>>>>>>>>> .outputWithTimestamp() + DoFn#getAllowedTimestampSkew in what seems 
>>>>>>>>>> to me a
>>>>>>>>>> quite valid use case. So I suppose this is a vote for un-deprecating 
>>>>>>>>>> this
>>>>>>>>>> API (or a teachable moment in which I could be pointed to a more 
>>>>>>>>>> suitable
>>>>>>>>>> non-deprecated approach.)
>>>>>>>>>>
>>>>>>>>>> I'll stick with a previously simplification of my use case:
>>>>>>>>>>
>>>>>>>>>> I get these events from my users:
>>>>>>>>>>     LOGIN
>>>>>>>>>>     CLICK GREEN BUTTON
>>>>>>>>>>     LOGOUT
>>>>>>>>>>
>>>>>>>>>> I capture user session duration (logout time *minus* login time)
>>>>>>>>>> and I want to perform a PER DAY average (i.e., my window is on
>>>>>>>>>> CalendarDays) BUT where the aggregation's timestamp is the time of 
>>>>>>>>>> the
>>>>>>>>>> CLICK GREEN event.
>>>>>>>>>>
>>>>>>>>>> So once I calculate and emit a single user's session duration I
>>>>>>>>>> need to .outputWithTimestamp using the CLICK GREEN event's 
>>>>>>>>>> timestamp. This
>>>>>>>>>> involves, of course, outputting with a timestamp *before* the 
>>>>>>>>>> watermark.
>>>>>>>>>>
>>>>>>>>>> In most cases my users LOGOUT in the same day as the CLICK GREEN
>>>>>>>>>> BUTTON event, so even though I'm typically outputting a timestamp 
>>>>>>>>>> before
>>>>>>>>>> the watermark the CalendarDay window is not yet closed and so most 
>>>>>>>>>> user
>>>>>>>>>> session duration's do not affect a late aggregation for that 
>>>>>>>>>> CalendarDay.
>>>>>>>>>>
>>>>>>>>>> Only when a LOGOUT occurs on a day later than the CLICK GREEN
>>>>>>>>>> event do I have to contend with potentially late data contributing 
>>>>>>>>>> back to
>>>>>>>>>> a prior CalendarDay.
>>>>>>>>>>
>>>>>>>>>> In any case, I have .withAllowedLateness to allow me to make a
>>>>>>>>>> call here about what I'm willing tradeoff (keeping windows open vs.
>>>>>>>>>> dropping data for users with overly long sessions), etc.
>>>>>>>>>>
>>>>>>>>>> This here seems to be a simple scenario (it is effectively my
>>>>>>>>>> real-world scenario) and the
>>>>>>>>>> .outputWithTimestamp + DoFn#getAllowedTimestampSkew seem to cover it 
>>>>>>>>>> in a
>>>>>>>>>> straightforward, effective way.
>>>>>>>>>>
>>>>>>>>>> However of course I don't like building production code on
>>>>>>>>>> deprecated capabilities -- so advice on alternatives (or perhaps a
>>>>>>>>>> reconsideration of this deprecation :) ) would be appreciated.
>>>>>>>>>>
>>>>>>>>>>

Reply via email to