Kenn, the optimization is not complex, just never done.

The FnApiDoFnRunner was rewritten to be designed with portability first and
to move away from the assumptions that were baked into the existing DoFn
"runner" implementations and the constructs used in the non-portable
implementation. There are many DoFn "runner" implementations that exist in
Java that are layered on top of each other to handle several special cases
which are also used by "system" DoFns as well.

On Mon, May 4, 2020 at 10:38 AM Robert Burke <rob...@frantil.com> wrote:

> Ack ok. Thank you for clarifying!
>  Confirming that Kenn is right, the optimization is pretty much that
> simple. [1] is where it's done in the Go SDK
>
> [1]
> https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/pardo.go#L136
>
> On Mon, May 4, 2020, 10:18 AM Reuven Lax <re...@google.com> wrote:
>
>> I wonder how often we even implement this optimization today. If the
>> processElement has an OutputReceiver parameter then we mark it as
>> observesWindow, and that's a pretty common parameter.
>>
>> Arguably this is a bug in our implementation of OutputReceiver though -
>> it should be able to copy all the windows into the output element.
>>
>> Reuven
>>
>> On Mon, May 4, 2020 at 9:37 AM Kenneth Knowles <k...@apache.org> wrote:
>>
>>> Is the optimization complex in the Fn API context? In non-Fn API it is
>>> basically "if (observesWindow) { explode } else { don't }" [1]. The DoFn
>>> signature tells you everything you need. This might be a good first commit
>>> for someone looking to contribute to the Java SDK harness?
>>>
>>> Kenn
>>>
>>> [1]
>>> https://github.com/apache/beam/blob/591de3473144de54beef0932131025e2a4d8504b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L133
>>>
>>> On Mon, May 4, 2020 at 9:33 AM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> In Python we only explode windows if the Window is being inspected.
>>>> (There is no separate "DoFnRunner" for FnApi vs. Legacy execution.)
>>>>
>>>> On Mon, May 4, 2020 at 9:21 AM Luke Cwik <lc...@google.com> wrote:
>>>> >
>>>> > Reuven you are correct that the optimization has yet to be
>>>> implemented.
>>>> > Robert the FnApiDoFnRunner is the name of a Java class that executes
>>>> Java DoFns in the Java SDK harness. The poor name choice is my fault.
>>>> >
>>>> > On Fri, May 1, 2020 at 9:14 PM Reuven Lax <re...@google.com> wrote:
>>>> >>
>>>> >> FnApiDoFnRunner does run Java DoFns.
>>>> >>
>>>> >> On Fri, May 1, 2020 at 9:10 PM Robert Burke <rob...@frantil.com>
>>>> wrote:
>>>> >>>
>>>> >>> In the Go SDK this optimization is handled on the SDK side, inthe
>>>> pardo execution node not one the runner side of the FnAPI
>>>> >>>
>>>> >>> But i think I'm about to learn that FnApiDoFnRunner is something
>>>> that runs on the Java SDK side rather than on the runner side, despite the
>>>> name.
>>>> >>>
>>>> >>> On Fri, May 1, 2020, 9:02 PM Reuven Lax <re...@google.com> wrote:
>>>> >>>>
>>>> >>>> Ah - so we don't implement the optimization of not expanding the
>>>> windows if not necessary?
>>>> >>>>
>>>> >>>> On Fri, May 1, 2020 at 8:56 PM Luke Cwik <lc...@google.com> wrote:
>>>> >>>>>
>>>> >>>>> In all the processElementYYY methods the currentWindow is
>>>> assigned as can be seen here as we loop over the set of windows:
>>>> >>>>>
>>>> https://github.com/apache/beam/blob/9bb2990c0f6c08dd33d9c6fa1fd91842c644a8e3/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L738
>>>> >>>>>
>>>> >>>>> On Fri, May 1, 2020 at 8:51 PM Reuven Lax <re...@google.com>
>>>> wrote:
>>>> >>>>>>
>>>> >>>>>> In Beam a WindowedValue can can contain multiple windows,
>>>> because an element can be in multiple windows at once (for example, sliding
>>>> windows). Usually we keep these elements unexpanded, but if the user's doFn
>>>> observes the window  then we have to "explode" the element out, and we run
>>>> the process function once per window. e.g. if the process function looks
>>>> like this
>>>> >>>>>>
>>>> >>>>>> @ProcessElement
>>>> >>>>>> public void process(@Element T e, IntervalWindow w)
>>>> >>>>>>
>>>> >>>>>> In SimpleDoFnRunner we do this inside processElement. However I
>>>> can't find the equivalent code in FnApiDoFnRunner. How does window
>>>> explosion work in the portable runner?
>>>> >>>>>>
>>>> >>>>>> Reuven
>>>>
>>>

Reply via email to