Hi Kenn, Thanks for the response.
Semantically, it is not causing any problem. I just want to make sure if certain optimizations are valid. We want to drop an expired key and its associated states when the highest boundary of all its windows falls behind (watermark - allowedLateness). In the beginning, I am concerned about states of the expired key could become in-time again when a downstream Window transform re-assigns windows, which prevents non-aggregating operators to discard any states. It could become costly when the application sees an evolving set of keys. Best, Shen On Wed, Jan 31, 2018 at 11:19 PM, Kenneth Knowles <[email protected]> wrote: > On Mon, Jan 22, 2018 at 11:42 AM, Shen Li <[email protected]> wrote: > >> Hi Kenn, >> >> Thanks for the explanation. >> >> > So now elements are droppable if they belong to an expired window. >> >> Say I have two consecutive window transforms with FixedWindows WindowFn >> (just an example, most likely won't appear in real pipeline). The first >> windowFn says the element belongs to an expired window. But according to >> the second windowFn, the element's window is not yet expired. In this case, >> can the first Window transform drop the element? >> > > Yes, it is permitted to drop the expired data at any point. The reason I > think this is OK is that the runner also completely controls the watermark. > So there is arbitrary runner-owned behavior in terms of dropping either > way. It hasn't come up, since windows are hardly useful until you have an > aggregation, where they provide the notion of completeness. Do you have an > example in mind where this gets weird? > > Kenn > > > > >> Best, >> Shen >> >> On Mon, Jan 22, 2018 at 2:07 PM, Kenneth Knowles <[email protected]> wrote: >> >>> Hi Shen, >>> >>> This is a documentation issue. The Beam model switched from dropping >>> individual elements to expiring windows. So now elements are droppable if >>> they belong to an expired window. This works a little better with the >>> purpose of windowing and allowed lateness: to say when an aggregation is >>> "complete". Any element that manages to make it to an aggregation before >>> the accumulator is expired is allowed to be included now and only after the >>> whole window expires we drop any further incoming elements for that window. >>> >>> Kenn >>> >>> On Mon, Jan 22, 2018 at 10:52 AM, Shen Li <[email protected]> wrote: >>> >>>> Hi, >>>> >>>> The Window#withAllowedLateness(Duration) doc says "Any elements that >>>> are later than this as decided by the system-maintained watermark will be >>>> dropped". Can the runner safely discard a tuple that violates the allowed >>>> lateness in the Window operator? Or does it have to drop it in the >>>> downstream GBK operator just in case that there could be another Window >>>> transform in between overriding the allowed lateness (or other >>>> configurations)? >>>> >>>> Thanks, >>>> Shen >>>> >>> >>> >> >
