[ https://issues.apache.org/jira/browse/BEAM-1287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16049935#comment-16049935 ]
Kenneth Knowles commented on BEAM-1287: --------------------------------------- Did you grab this intentionally, [~tgroh]? I ask because it came right on the heels of me opening a PR on it, which I figured on following up with the remaining bits. Happy to offload if you are interested. > Give new DoFn the ability to output to a particular window > ---------------------------------------------------------- > > Key: BEAM-1287 > URL: https://issues.apache.org/jira/browse/BEAM-1287 > Project: Beam > Issue Type: New Feature > Components: beam-model, sdk-java-core, sdk-py > Reporter: Kenneth Knowles > Assignee: Thomas Groh > > The new {{DoFn}} design allows us to have specialized output receivers, such > as a key-preserving output (the default is non-key-preserving) or > non-window-preserving (the default is window-preserving) output. This JIRA is > for the latter, with an emphasis on making the two as analogous as we can. > {code} > new DoFn<A, B>() { > @ProcessElement > public void processElement(ProcessContext c, OutputToWindow receiver) { > receiver.outputWithTimestamp(value, timestamp, window); > } > } > {code} > After this change, window assignment need not be a primitive. > Why is this OK? The primary motivation for keeping windows strongly separated > is because they yield parallelism if we don't impose any requirement that > multiple windows for a single key be co-located or linearized. We should be > able to process a single key with millions of non-merging windows in parallel > without having to reify the windows (though this isn't _that_ bad). That is a > major change/improvement over the vague assumption that keys are the atom of > parallelism. > This change will not remove this property, as it pertains to input and state. > The analogy with keys: > - Stateful DoFn requires the ability to access key-and-window state. For > some runners, perhaps this does not require colocation. For runners that want > to do this efficiently/locally, it means some key-and-window colocation > operation followed by only key-and-window preserving transforms. So > outputting to a new window breaks the invariant, just as a non-key-preserving > transform would. Until we had the new {{DoFn}} we couldn't know if > non-window-preserving output was used. > - Non-key-preserving output also breaks any idea that combined aggregates > are actually one per key, etc. So windows can work the same way. > - Timestamps are interesting. By analogy with keys, timestamps would be just > part of the value and able to change freely. This doesn't work so well > because of lateness. To avoid digging deeper into changing anything, this > proposal just suggests that a timestamp is provided, and whether it is > allowed to be late is governed by the same rules as {{outputWithTimestamp}}. > - Not clear if this has uses for merging windows. > This change is entirely backwards compatible, but given that it removes a > primitive and is rather little effort, it might bear earlier consideration. > No work will begin until it is brought to the dev list. -- This message was sent by Atlassian JIRA (v6.4.14#64029)