> If using early triggers, then the side input loads the latest trigger
for that window.
Does not the the word 'latest' imply processing time matching? The
watermark of main input might be arbitrarily delayed from the watermark
of side input. If we consider GlobalWindows on both sides, than "latest"
trigger in side input looks exactly as processing time matching. Yes,
between different windows, the matching is in event time. But within the
same window (ignoring the window mapping for now), the matching looks
like processing time, right?
If we look at the SimplePushbackSideInputDoFnRunner used by runners
exactly for the side input matching, there is no testing of side input
watermark to determine if an element should be 'pushed back' or
processed. Element is processed if, and only if, all side inputs for
particular window are ready.
On 4/27/21 7:24 PM, Reuven Lax wrote:
The windows shouldn't need to match - at least if the FlinkRunner
implementation is correct. By default, the side-input's WindowFn
should be used to map the main input's timestamp into a window, and
that window is used to determine which version of the side input to
load. A custom WindowFn can be used to to even more - e.g. if you want
the main input element to map to the _previous_ window in the side
input (you would do this by overloading getDefaultWindowMappingFn).
If using early triggers, then the side input loads the latest trigger
for that window. This is still an event-time mapping - for example two
main-input events in two different windows will still map the the side
input for the matching window. However if the side input PCollection
is triggered, than the latest trigger for each window's side input
will be loaded.
It's possible that the FlinkRunner implementation is incomplete, in
which case it should be fixed.
On Tue, Apr 27, 2021 at 9:36 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
It seems to me, that this is true only with matching windows on
both sides and default trigger of the side input. Then it will
(due to condition a)) work as if the matching happenned in event
time. But when using any early triggers then it will work in
processing time. At least, that is my understanding from studying
the code in FlinkRunner.
On 4/27/21 4:05 PM, Robert Burke wrote:
I thought the matching happened with elements in the matching
window, in Event time, not in Processing time.
Granted, I'm not that familiar with this area myself, but one key
part of Beam is nearly everything is Event time by default, not
Processing time.
On Tue, Apr 27, 2021, 12:43 AM Jan Lukavský <[email protected]
<mailto:[email protected]>> wrote:
Hi,
I have a question about matching side inputs to main input.
First a
recap, to make sure I understand the current state correctly:
a) elements from main input are pushed back (stored in
state) until a
first side input pane arrives (that might be on time, or early)
b) after that, elements from the main input are matched to
the current
side input view - the view is updated as new data arrives,
but is
matched to the main input elements in processing time
If this is the current state, my question is, would it be
possible to
add a new mode of matching of side inputs? Something like
ParDo.of(new MyDoFn()).withSideInput("name", myView,
TimeDomain.EVENT_TIME)
the semantics would be that the elements from the main
PCollection would
be stored into state as pairs with the value of the current
main input
watermark and on update of side-input watermark only main
input elements
with associated input watermark less than that of the side
input would
be matched with the side input and sent downstream. Although
this
approach is necessarily more expensive and introducing
additional
latency than processing time matching, there are situations when
processing time matching is inapropriate for correctness reasons.
WDYT?
Jan