[ https://issues.apache.org/jira/browse/BEAM-241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15266983#comment-15266983 ]
Kenneth Knowles commented on BEAM-241: -------------------------------------- Your first point: I think that is a good idea. We should make all of this part of {{runners/core}} so those runners that want to use it can do so. This is closely related to the Fn API, so will probably face some changes as we sort that out. It seems like a fruitful area. Your second point: We definitely want to move away from {{DoFns}} that do windowing operations. With the exception of window merging, we'd like processing of a window to be fully isolated from other windows (more parallelism possible / smaller scope of stateful DoFn state once we make it user-facing, support shuffle-by-key-and-window implementation strategy, simpler {{ParDo}} semantics, supports micro-batch runners... probably more pragmatic benefits we haven't thought of will come from this clean theoretical move). This is a part of why I proposed making {{Window.into}} a primitive. So you definitely should phase out use of {{GroupAlsoByWindowViaWindowSetDoFn}} as a parameter to {{ParDo}}. The logic in {{GroupAlsoByWindowViaWindowSetDoFn}} is fine and you can still re-use it, you just need to get the {{StateInternals}} and {{outputWindowedValue}} via the runner backend rather than the {{ProcessContext}}. I've started this for the new {{InProcessPipelineRunner}} in [#268|https://github.com/apache/incubator-beam/pull/268/files] - {{GroupByKey}} expands into runner-specific primitives {{GroupByKeyOnly}} and {{GroupAlsoByWindow}} and then the runner provides a translator/interpreter for those transforms. Within that translator/interpreter it is fine to use the logic in {{GroupAlsoByWindowViaWindowSetDoFn}}, as I do [right here|https://github.com/apache/incubator-beam/pull/268/files#diff-b81c7b54aea3aaed9e80891f97f3d911R97]. > Not easy for runners to get late-data dropping > ---------------------------------------------- > > Key: BEAM-241 > URL: https://issues.apache.org/jira/browse/BEAM-241 > Project: Beam > Issue Type: Bug > Components: runner-core > Reporter: Mark Shields > Assignee: Frances Perry > > Quite by accident realized the Flink runner delegates to > GroupAlsoByWindowViaWindowSetDoFn for GBK, which in turn delegates to > ReduceFnRunner. The latter now assumes no messages will arrive beyond the > 'garbage collection' time of their target window(s). > The Dataflow runner interposes a LateDataDroppingDoFnRunner into the path so > as to drop those too-late messages. That's done (I think) using > DoFnRunners.createDefault. > I don't think the Flink runner does that. > We need a nice runner-friendly way of dealing with the too-late data. -- This message was sent by Atlassian JIRA (v6.3.4#6332)