On 4/27/21 9:26 PM, Robert Bradshaw wrote:

On Tue, Apr 27, 2021 at 12:05 PM Jan Lukavský <[email protected]> wrote:
On 4/27/21 8:51 PM, Robert Bradshaw wrote:
On Tue, Apr 27, 2021 at 11:25 AM Jan Lukavský <[email protected]> wrote:
Are you asking for a way to ignore early triggers on side input mapping, and 
only map to on-time triggered values for the window?
No, that could for sure be done before applying the View transform. I'd like a 
know if it would be possible to create mode of the matching which would be 
deterministic. One possibility to make it deterministic seems to be, that main 
input elements would be pushed back until side input watermark 'catches up' 
with main input. Whenever the side input watermark would be delayed after the 
main input watermark, elements would start to be pushed back again. Not sure if 
I'm explaining it using the right words. The side input watermark can be 
controlled using timer in an upstream transform, so this defines which elements 
in main input would be matched onto which pane of the side input.
Perhaps I'm not following the request correctly, but this is exactly
how side inputs work by default. It is only when one explicitly
requests a non-deterministic trigger upstream of the side input (e.g.
one that may fire multiple times or ahead of the watermark) that one
sees a side input with multiple variations or data in the side input
before the watermark of the side input is caught up to the main input.
Yes, exactly. But take the example of side input in global windows (on
both the main input and side input). Then there has to be multiple
firings per window, because otherwise the side input would be available
at the end of time, which is not practical. The trigger doesn't have to
be non-deterministic, the data might come from a stateful ParDo, using a
timer with output timestamp, which would make the downstream watermark
progress quite well defined. The matching would still be
nondeterministic in this case.
If everything is in the global window, things get non-deterministic
across PCollections. For example, say the main input has element m20
and the side input has elements s10 and s30 (with the obvious
timestamps). Suppose we have a total ordering of events as follows.

     s10 arrives
     side input watermark advances to 25
     s30 arrives
     side input watermark advances to 100
     m20 arrives

In this case, while processing m20, one would see both s10 and s30.
Alternatively we could have had

     s10 arrives
     side input watermark advances to 25
     m20 arrives
     s30 arrives
     side input watermark advances to 100

in which case m20 would only see s10.
Yes, this is absolutely true, I didn't want to make things too complicated, so I ignored this fact, that absolutely correct solution would require the PCollectioView to be timestamp indexed so that if it would receive both s10 and s30 it would correctly return s10 (as s30 didn't exist at m20, right?). I simplified this at this moment for this discussion, thanks for clarifying.

Windowing is exactly the mechanism that gives us a cross-PCollection
barrier with which to line things up.
Should not watermark do this?


On 4/27/21 8:03 PM, Reuven Lax wrote:



On Tue, Apr 27, 2021 at 10:53 AM Jan Lukavský <[email protected]> wrote:
If using early triggers, then the side input loads the latest trigger for that 
window.
Does not the the word 'latest' imply processing time matching? The watermark of main 
input might be arbitrarily delayed from the watermark of side input. If we consider 
GlobalWindows on both sides, than "latest" trigger in side input looks exactly 
as processing time matching. Yes, between different windows, the matching is in event 
time. But within the same window (ignoring the window mapping for now), the matching 
looks like processing time, right?
Not really. Think of each trigger of a window as a refinement - so the latest trigger for 
that window is our best approximation of the "correct" value for that window.  
For this reason, window panes are indexed by an integer (pane_index), not by a timestamp. 
The idea here is that the main input element maps to the side input for the best, 
most-recent knowledge of the window.

Are you asking for a way to ignore early triggers on side input mapping, and 
only map to on-time triggered values for the window?

If we look at the SimplePushbackSideInputDoFnRunner used by runners exactly for 
the side input matching, there is no testing of side input watermark to 
determine if an element should be 'pushed back' or processed. Element is 
processed if, and only if, all side inputs for particular window are ready.

On 4/27/21 7:24 PM, Reuven Lax wrote:

The windows shouldn't need to match - at least if the FlinkRunner 
implementation is correct. By default, the side-input's WindowFn should be used 
to map the main input's timestamp into a window, and that window is used to 
determine which version of the side input to load. A custom WindowFn can be 
used to to even more - e.g. if you want the main input element to map to the 
_previous_ window in the side input (you would do this by overloading 
getDefaultWindowMappingFn).

If using early triggers, then the side input loads the latest trigger for that 
window. This is still an event-time mapping - for example two main-input events 
in two different windows will still map the the side input for the matching 
window. However if the side input PCollection is triggered, than the latest 
trigger for each window's side input will be loaded.

It's possible that the FlinkRunner implementation is incomplete, in which case 
it should be fixed.

On Tue, Apr 27, 2021 at 9:36 AM Jan Lukavský <[email protected]> wrote:
It seems to me, that this is true only with matching windows on both sides and 
default trigger of the side input. Then it will (due to condition a)) work as 
if the matching happenned in event time. But when using any early triggers then 
it will work in processing time. At least, that is my understanding from 
studying the code in FlinkRunner.

On 4/27/21 4:05 PM, Robert Burke wrote:

I thought the matching happened with elements in the matching window, in Event 
time, not in Processing time.

Granted, I'm not that familiar with this area myself, but one key part of Beam 
is nearly everything is Event time by default, not Processing time.

On Tue, Apr 27, 2021, 12:43 AM Jan Lukavský <[email protected]> wrote:
Hi,

I have a question about matching side inputs to main input. First a
recap, to make sure I understand the current state correctly:

    a) elements from main input are pushed back (stored in state) until a
first side input pane arrives (that might be on time, or early)

    b) after that, elements from the main input are matched to the current
side input view - the view is updated as new data arrives, but is
matched to the main input elements in processing time

If this is the current state, my question is, would it be possible to
add a new mode of matching of side inputs? Something like

    ParDo.of(new MyDoFn()).withSideInput("name", myView,
TimeDomain.EVENT_TIME)

the semantics would be that the elements from the main PCollection would
be stored into state as pairs with the value of the current main input
watermark and on update of side-input watermark only main input elements
with associated input watermark less than that of the side input would
be matched with the side input and sent downstream. Although this
approach is necessarily more expensive and introducing additional
latency than processing time matching, there are situations when
processing time matching is inapropriate for correctness reasons.

WDYT?

    Jan

Reply via email to