[ 
https://issues.apache.org/jira/browse/BEAM-696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15566498#comment-15566498
 ] 

Amit Sela edited comment on BEAM-696 at 10/11/16 8:42 PM:
----------------------------------------------------------

Does Dataflow "buffer until trigger..." if there are no sideInputs assigned ?

Combiners are a very important optimization (Spark for sure, but I guess other 
runners too), and Sessions (or any other merging windows) can be used without 
sideInput, so I guess a runner should defer *only* for merging windows and 
*only* if they are used with sideInputs..

I think my question is: where do we draw the line ?

I could argue that in order to use sideInputs for merging windows a pipeline 
author should use explicit {{GroupByKey}} followed by {{Combine.GroupedValues}} 
or risk a non-deterministic result.
There are analytical cases where you actually want to do that such as 
identifying a sequence of events in a time frame. It's clear you can't use 
combiners here and are willing to pay the price of shuffling and grouping the 
events (+maintaining non-compactable state).

I don't know if you have/can access such statistics, but I wonder what % of 
pipelines with sessions also use sideInputs (and would be affected from 
non-deterministic behaviour of combiners) ?


was (Author: amitsela):
Does Dataflow "buffer until trigger..." if there are no sideInputs assigned ?

Combiners are a very important optimization (Spark for sure, but I guess other 
runners too), and Sessions (or any other merging windows) can be used without 
sideInput, so I guess a runner should defer *only* for merging windows and 
*only* if they are used with sideInputs..

I think my question is: where do we draw the line ?

I could argue that in order to use sideInputs for merging windows a pipeline 
author should use explicit {{GroupByKey}} followed by {{Combine.GroupedValues}} 
or risk a non-deterministic result.
There are analytical cases where you actually want to do that such as 
identifying a sequence of events in a time frame. It's clear you can't use 
combiners here and are willing to pay the price of shuffling and grouping the 
events (+maintaining non-compactable state).

I don't know if you have/can access such statistics, but I wonder what % of 
pipelines with sessions also use sideInputs ?

> Side-Inputs non-deterministic with merging main-input windows
> -------------------------------------------------------------
>
>                 Key: BEAM-696
>                 URL: https://issues.apache.org/jira/browse/BEAM-696
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model
>            Reporter: Ben Chambers
>            Assignee: Pei He
>
> Side-Inputs are non-deterministic for several reasons:
> 1. Because they depend on triggering of the side-input (this is acceptable 
> because triggers are by their nature non-deterministic).
> 2. They depend on the current state of the main-input window in order to 
> lookup the side-input. This means that with merging
> 3. Any runner optimizations that affect when the side-input is looked up may 
> cause problems with either or both of these.
> This issue focuses on #2 -- the non-determinism of side-inputs that execute 
> within a Merging WindowFn.
> Possible solution would be to defer running anything that looks up the 
> side-input until we need to extract an output, and using the main-window at 
> that point. Specifically, if the main-window is a MergingWindowFn, don't 
> execute any kind of pre-combine, instead buffer all the inputs and combine 
> later.
> This could still run into some non-determinism if there are triggers 
> controlling when we extract output.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to