[ 
https://issues.apache.org/jira/browse/BEAM-241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15266983#comment-15266983
 ] 

Kenneth Knowles commented on BEAM-241:
--------------------------------------

Your first point: I think that is a good idea. We should make all of this part 
of {{runners/core}} so those runners that want to use it can do so. This is 
closely related to the Fn API, so will probably face some changes as we sort 
that out. It seems like a fruitful area.

Your second point: We definitely want to move away from {{DoFns}} that do 
windowing operations. With the exception of window merging, we'd like 
processing of a window to be fully isolated from other windows (more 
parallelism possible / smaller scope of stateful DoFn state once we make it 
user-facing, support shuffle-by-key-and-window implementation strategy, simpler 
{{ParDo}} semantics, supports micro-batch runners... probably more pragmatic 
benefits we haven't thought of will come from this clean theoretical move). 
This is a part of why I proposed making {{Window.into}} a primitive.

So you definitely should phase out use of {{GroupAlsoByWindowViaWindowSetDoFn}} 
as a parameter to {{ParDo}}. The logic in {{GroupAlsoByWindowViaWindowSetDoFn}} 
is fine and you can still re-use it, you just need to get the 
{{StateInternals}} and {{outputWindowedValue}} via the runner backend rather 
than the {{ProcessContext}}.

I've started this for the new {{InProcessPipelineRunner}} in 
[#268|https://github.com/apache/incubator-beam/pull/268/files] - {{GroupByKey}} 
expands into runner-specific primitives {{GroupByKeyOnly}} and 
{{GroupAlsoByWindow}} and then the runner provides a translator/interpreter for 
those transforms. Within that translator/interpreter it is fine to use the 
logic in {{GroupAlsoByWindowViaWindowSetDoFn}}, as I do [right 
here|https://github.com/apache/incubator-beam/pull/268/files#diff-b81c7b54aea3aaed9e80891f97f3d911R97].

> Not easy for runners to get late-data dropping
> ----------------------------------------------
>
>                 Key: BEAM-241
>                 URL: https://issues.apache.org/jira/browse/BEAM-241
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core
>            Reporter: Mark Shields
>            Assignee: Frances Perry
>
> Quite by accident realized the Flink runner delegates to 
> GroupAlsoByWindowViaWindowSetDoFn for GBK, which in turn delegates to 
> ReduceFnRunner. The latter now assumes no messages will arrive beyond the 
> 'garbage collection' time of their target window(s).
> The Dataflow runner interposes a LateDataDroppingDoFnRunner into the path so 
> as to drop those too-late messages. That's done (I think) using 
> DoFnRunners.createDefault.
> I don't think the Flink runner does that.
> We need a nice runner-friendly way of dealing with the too-late data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to