Hi Chad! As you've discovered, fully custom merging window fns are not yet supported portably, though this is on our todo list.
https://issues.apache.org/jira/browse/BEAM-5601 This involves calling back into the SDK to perform the actually merging logic (and also, for full generality, being able to extract the endpoints of unknown window types). Note that it's tricky to get the exact behavior you need as data may come in out of order. Consider, for example, three events of increasing timestamp e1, e2, e3 where e2 is the "end" event. It could happen that e1 and e3 get merged before e2 is seen, and there's no "unmerge" capability (the values may already be combined via an upstream combiner). How do you handle this? In the meantime, you could try getting this behavior with StatefulDoFns. On Sun, Jun 2, 2019 at 6:33 AM Chad Dombrova <[email protected]> wrote: > > Hi all, > I'm currently investigating Beam for our company's needs, and I have a > question about how to solve a specific windowing problem in python. > > I have a stream of elements and I want to gather them until a special > end-of-stream element arrives. > > To solve this, I've written a modified version of window.Sessions that takes > an optional callable which is passed an element when it is being assigned to > a window. If the callable returns True, the window is immediately closed. > Here's the code: > > https://gist.github.com/chadrik/b0dfff8953fed99f99bdd69c7cc870ba > > It works as expected in the Direct runner, but fails in Dataflow, which I'm > pretty sure is due to a serialization problem. > > So, I have a few questions: > > 1) Is there a way to do this in python using stock components? (i.e. without > my custom class) > > 2) If not, is there any interest in accepting a PR to modify the stock > window.Sessions to do something like what I've written? It seems very > useful, and I've seen other people attempting to solve this same problem > [1][2] > > 3) If not, how do I go about serializing my custom WindowFn class? From > looking at the source code I'm certain I could figure out how to extend the > serialization for a stock object like window.Sessions (modify > standard_window_fns.proto, and update to/from_runner_api_parameter), but it's > very unclear how I would do this for a custom WindowFn, since serialization > of these classes seems to be part of the official beam gRPC portability > protocol. > > Thanks in advance for your help! > -chad > > > 1. > https://stackoverflow.com/questions/49011360/using-a-custom-windowfn-on-google-dataflow-which-closes-after-an-element-value > 2. > https://stackoverflow.com/questions/43035302/close-window-after-based-on-element-value/43052779#43052779 > > > > > >
