We do have merging state. However merging timers is a bit more awkward. I now tend to think that we're better off providing an onMerge function and let the user handle this.
On Fri, Jun 28, 2019, 11:06 AM Jan Lukavský <[email protected]> wrote: > Hi, > > during my implementation of @RequiresTimeSortedInput I found out, that > current runners do not support stateful DoFns on merging windows [1]. > The question is why is that? One obvious reason seems to be, that > current definition of StateSpec doesn't define a state merge function, > which is necessary for merging windows to work. Actually, on Euphoria > [2] (originated separately, now merged in Beam) we had only two basic > operators (PTransforms) - FlatMap (stateless ParDo) and an operation we > called ReduceStateByKey [3] (not merged into Beam, as there were some > difficulties, one of which might be the missing support for merging > windows). All the others operations could be derived from these two. The > ReduceStateByKey (RSBK) operator was keyed operator (shuffle) with > following user defined function: > > - state factory (roughly equivalent to declaring a StateSpec) > > - state merge function (State1 + State2 = State3) > > - state update function (State + Value = new State) > > - and state extract function (State -> Output) -- actually called > after each element was added to the state > > Now if you look at this, this is essentially both Beam's Combine > operator and stateful ParDo. Moreove, GroupByKey is just RSBK with > BagState and append merge function. So, a question that come to mind, > what would happen if we add state merge function to StateSpec? I think > it can have the following benefits: > > - Both Combine and GroupByKey can become derived operators (this is no > breaking change, as runners are always free to provide their override to > any PTransform) > > - in batch, stateful ParDo can now be implemented more efficiently, > using Combine operation (as long, as doesn't @RequiresTimeSortedInput, > which is my favourite :)) > > - even in stream a combining approach to stateful pardo would be > possible (provided trigger would be AfterWatermark with no early > firings, and there will be no user timers) > > - there is still a problem with merging windows on stateful DoFns, > which is early firings in general (that needs retractions, which is what > we first hit here [4], and solved by disabling early emitting from > merging windows) > > I'd really like to hear any comments on this. > > Jan > > [1] > > https://github.com/apache/beam/blob/1992cde69343b6e8bb5eea537182af3d036d155d/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java#L71 > > [2] > https://github.com/apache/beam/tree/master/sdks/java/extensions/euphoria > > [3] > > https://github.com/seznam/euphoria/blob/master/euphoria-core/src/main/java/cz/seznam/euphoria/core/client/operator/ReduceStateByKey.java > > [4] https://github.com/seznam/euphoria/issues/43 > >
