The windows shouldn't need to match - at least if the FlinkRunner implementation is correct. By default, the side-input's WindowFn should be used to map the main input's timestamp into a window, and that window is used to determine which version of the side input to load. A custom WindowFn can be used to to even more - e.g. if you want the main input element to map to the _previous_ window in the side input (you would do this by overloading getDefaultWindowMappingFn).
If using early triggers, then the side input loads the latest trigger for that window. This is still an event-time mapping - for example two main-input events in two different windows will still map the the side input for the matching window. However if the side input PCollection is triggered, than the latest trigger for each window's side input will be loaded. It's possible that the FlinkRunner implementation is incomplete, in which case it should be fixed. On Tue, Apr 27, 2021 at 9:36 AM Jan Lukavský <[email protected]> wrote: > It seems to me, that this is true only with matching windows on both sides > and default trigger of the side input. Then it will (due to condition a)) > work as if the matching happenned in event time. But when using any early > triggers then it will work in processing time. At least, that is my > understanding from studying the code in FlinkRunner. > On 4/27/21 4:05 PM, Robert Burke wrote: > > I thought the matching happened with elements in the matching window, in > Event time, not in Processing time. > > Granted, I'm not that familiar with this area myself, but one key part of > Beam is nearly everything is Event time by default, not Processing time. > > On Tue, Apr 27, 2021, 12:43 AM Jan Lukavský <[email protected]> wrote: > >> Hi, >> >> I have a question about matching side inputs to main input. First a >> recap, to make sure I understand the current state correctly: >> >> a) elements from main input are pushed back (stored in state) until a >> first side input pane arrives (that might be on time, or early) >> >> b) after that, elements from the main input are matched to the current >> side input view - the view is updated as new data arrives, but is >> matched to the main input elements in processing time >> >> If this is the current state, my question is, would it be possible to >> add a new mode of matching of side inputs? Something like >> >> ParDo.of(new MyDoFn()).withSideInput("name", myView, >> TimeDomain.EVENT_TIME) >> >> the semantics would be that the elements from the main PCollection would >> be stored into state as pairs with the value of the current main input >> watermark and on update of side-input watermark only main input elements >> with associated input watermark less than that of the side input would >> be matched with the side input and sent downstream. Although this >> approach is necessarily more expensive and introducing additional >> latency than processing time matching, there are situations when >> processing time matching is inapropriate for correctness reasons. >> >> WDYT? >> >> Jan >> >>
