There is now (as of last week) a way to hold back the watermark with the state API (though not yet in a released version of Beam). If you set a timer using withOutputTimetstamp(t), the watermark will be held to t.
On Sat, Jan 11, 2020 at 4:15 PM Aaron Dixon <[email protected]> wrote: > Hi Reuven thanks for your quick reply > > I've tried that but the drag it puts on the watermark was too intrusive. > For example, -- even if just a single user among many decided to remain > logged-in for a few days then the watermark holds everything else back. > > This was when using a custom session window. I've recently been using the > State API to do my custom session tracking to avoid issues with downward > merging of windows (see earlier mailing list thread) ... with the State API > .. I'm not able to hold the watermark back (I think) ... but in any case, I > prefer the behavior where the watermark moves forward with the upstream > events and to deal with the very few straggler users by a lateness > configuration. > > Does that make sense? So far to me this seems very reasonable (to want to > keep the watermark moving and deal w/ the late events the few of which > actually fall out of the window using explicit lateness configuration.) > > On Sat, Jan 11, 2020 at 4:57 PM Reuven Lax <[email protected]> wrote: > >> Have you looked at using >> withTimestampCombiner(TimestampCombiner.EARLIEST)? This will hold the >> downstream watermark back to the beginning of the window (presumably the >> timestamp of the LOGIN event), so you can .call outputWithTimestamp using >> the CLICK GREEN timestamp without needing to set the allowed-lateness skew. >> >> Reuven >> >> On Sat, Jan 11, 2020 at 1:50 PM Aaron Dixon <[email protected]> wrote: >> >>> I've just built a pipeline in Beam and after exploring several options >>> for my use case, I've ended up relying on the deprecated >>> .outputWithTimestamp() + DoFn#getAllowedTimestampSkew in what seems to me a >>> quite valid use case. So I suppose this is a vote for un-deprecating this >>> API (or a teachable moment in which I could be pointed to a more suitable >>> non-deprecated approach.) >>> >>> I'll stick with a previously simplification of my use case: >>> >>> I get these events from my users: >>> LOGIN >>> CLICK GREEN BUTTON >>> LOGOUT >>> >>> I capture user session duration (logout time *minus* login time) and I >>> want to perform a PER DAY average (i.e., my window is on CalendarDays) BUT >>> where the aggregation's timestamp is the time of the CLICK GREEN event. >>> >>> So once I calculate and emit a single user's session duration I need to >>> .outputWithTimestamp using the CLICK GREEN event's timestamp. This >>> involves, of course, outputting with a timestamp *before* the watermark. >>> >>> In most cases my users LOGOUT in the same day as the CLICK GREEN BUTTON >>> event, so even though I'm typically outputting a timestamp before the >>> watermark the CalendarDay window is not yet closed and so most user session >>> duration's do not affect a late aggregation for that CalendarDay. >>> >>> Only when a LOGOUT occurs on a day later than the CLICK GREEN event do I >>> have to contend with potentially late data contributing back to a prior >>> CalendarDay. >>> >>> In any case, I have .withAllowedLateness to allow me to make a call here >>> about what I'm willing tradeoff (keeping windows open vs. dropping data for >>> users with overly long sessions), etc. >>> >>> This here seems to be a simple scenario (it is effectively my real-world >>> scenario) and the .outputWithTimestamp + DoFn#getAllowedTimestampSkew seem >>> to cover it in a straightforward, effective way. >>> >>> However of course I don't like building production code on deprecated >>> capabilities -- so advice on alternatives (or perhaps a reconsideration of >>> this deprecation :) ) would be appreciated. >>> >>>
