There is now (as of last week) a way to hold back the watermark with the
state API (though not yet in a released version of Beam). If you set a
timer using withOutputTimetstamp(t), the watermark will be held to t.

On Sat, Jan 11, 2020 at 4:15 PM Aaron Dixon <[email protected]> wrote:

> Hi Reuven thanks for your quick reply
>
>  I've tried that but the drag it puts on the watermark was too intrusive.
> For example, -- even if just a single user among many decided to remain
> logged-in for a few days then the watermark holds everything else back.
>
> This was when using a custom session window. I've recently been using the
> State API to do my custom session tracking to avoid issues with downward
> merging of windows (see earlier mailing list thread) ... with the State API
> .. I'm not able to hold the watermark back (I think) ... but in any case, I
> prefer the behavior where the watermark moves forward with the upstream
> events and to deal with the very few straggler users by a lateness
> configuration.
>
> Does that make sense? So far to me this seems very reasonable (to want to
> keep the watermark moving and deal w/ the late events the few of which
> actually fall out of the window using explicit lateness configuration.)
>
> On Sat, Jan 11, 2020 at 4:57 PM Reuven Lax <[email protected]> wrote:
>
>> Have you looked at using
>> withTimestampCombiner(TimestampCombiner.EARLIEST)? This will hold the
>> downstream watermark back to the beginning of the window (presumably the
>> timestamp of the LOGIN event), so you can .call outputWithTimestamp using
>> the CLICK GREEN timestamp without needing to set the allowed-lateness skew.
>>
>> Reuven
>>
>> On Sat, Jan 11, 2020 at 1:50 PM Aaron Dixon <[email protected]> wrote:
>>
>>> I've just built a pipeline in Beam and after exploring several options
>>> for my use case, I've ended up relying on the deprecated
>>> .outputWithTimestamp() + DoFn#getAllowedTimestampSkew in what seems to me a
>>> quite valid use case. So I suppose this is a vote for un-deprecating this
>>> API (or a teachable moment in which I could be pointed to a more suitable
>>> non-deprecated approach.)
>>>
>>> I'll stick with a previously simplification of my use case:
>>>
>>> I get these events from my users:
>>>     LOGIN
>>>     CLICK GREEN BUTTON
>>>     LOGOUT
>>>
>>> I capture user session duration (logout time *minus* login time) and I
>>> want to perform a PER DAY average (i.e., my window is on CalendarDays) BUT
>>> where the aggregation's timestamp is the time of the CLICK GREEN event.
>>>
>>> So once I calculate and emit a single user's session duration I need to
>>> .outputWithTimestamp using the CLICK GREEN event's timestamp. This
>>> involves, of course, outputting with a timestamp *before* the watermark.
>>>
>>> In most cases my users LOGOUT in the same day as the CLICK GREEN BUTTON
>>> event, so even though I'm typically outputting a timestamp before the
>>> watermark the CalendarDay window is not yet closed and so most user session
>>> duration's do not affect a late aggregation for that CalendarDay.
>>>
>>> Only when a LOGOUT occurs on a day later than the CLICK GREEN event do I
>>> have to contend with potentially late data contributing back to a prior
>>> CalendarDay.
>>>
>>> In any case, I have .withAllowedLateness to allow me to make a call here
>>> about what I'm willing tradeoff (keeping windows open vs. dropping data for
>>> users with overly long sessions), etc.
>>>
>>> This here seems to be a simple scenario (it is effectively my real-world
>>> scenario) and the .outputWithTimestamp + DoFn#getAllowedTimestampSkew seem
>>> to cover it in a straightforward, effective way.
>>>
>>> However of course I don't like building production code on deprecated
>>> capabilities -- so advice on alternatives (or perhaps a reconsideration of
>>> this deprecation :) ) would be appreciated.
>>>
>>>

Reply via email to