Backing up to the original question, you mentioned the thing I wanted to
highlight. Doing an event-time based side input join for something like a
changing dimension table is explicitly a project that has been done
somewhat:
https://beam.apache.org/documentation/patterns/side-inputs/#slowly-updating-side-input-using-windowing

The ideas:

 - one side of the join (like a BQ table or files) is periodically
refreshed and that entire refresh is put into a single window of the side
input.
 - each main input (stream) element is windowed according to the latest
refresh and joins always against that refresh.

A full implementation really requires state & timers and SDF so it is
somewhat advanced and Mikhail built helper transforms.

I'm not sure if this can be used to implement your use case. This does not
allow the side input to change like a BigTable or Redis cache so each main
input always gets the "latest" on a per-element granularity. For that,
MapState like Reuven mentioned, though of course that makes the user do
more of the boilerplate and the actual semantics of the pipeline are
undefined (because processing time arrival order is not part of the data,
results that depend on  it are only partially defined).

I agree that the approximate join of a processing time refresh on the side
input has limited uses and we need to implement, document and scale test
other join approaches like these.

Kenn

On Tue, Apr 27, 2021 at 1:40 PM Jan Lukavský <[email protected]> wrote:

> Actually yes, tagging PCollections and using stateful DoFn can solve
> that in all cases - the memory requirements for batch should be solvable
> using @RequiresTimeSortedInput, as that is what causes the issues on
> batch on runners that support it.
>
> Another yes, this is somewhat related to a full stream-to-stream join
> with exact time characteristics, where each side should be matched to
> the closest preceding update on the other side (again, some
> oversimplification).
>
> "Event time side inputs" could be a handy tool for solving these
> problems in cases where one side is "reasonably small", so that it can
> simply fir into memory, yet still be arbitrarily fast updatable (as
> opposed to the "slowly changing caches" patterns that we sometimes see
> using side inputs - the reasons for this "slowness" seem to be in the
> inherent processing time matching in global windows). It should also
> give a more efficient runtime characteristics.
>
> On 4/27/21 10:27 PM, Robert Bradshaw wrote:
> > So, I was just thinking when Jan mentioned "timestamp indexed" that
> > side input and state happen to be implemented on top of roughly the
> > same thing (in streaming Dataflow, and over the FnAPI) but the
> > connection might be deeper than that. Side inputs could be viewed as a
> > special type of (typically bag or map) state that is fully populated
> > by another PCollection and has a set eviction property. Maybe there's
> > some deeper unification here that could generalize side inputs.
> >
> > On the other hand, it seems you can get any desired side input
> > behavior by taking as your main input a tagged union of the various
> > PCollections and using state+timers. (This may, however, require
> > infeasibly large memory requirements for batch.) The value is in a
> > more general, modular pattern that both pipeline authors and systems
> > can reason about.
> >
> > On Tue, Apr 27, 2021 at 1:01 PM Reuven Lax <[email protected]> wrote:
> >> The watermark discussion may be a red herring here.
> >>
> >> I think it's interesting to think about the case where the side input
> is produced by a state/timers DoFn. in this scenario, there may be more
> interesting ways to match the side input than simply using windowing. It
> might be worth thinking about this topic more deeply.
> >>
> >> On Tue, Apr 27, 2021 at 12:51 PM Jan Lukavský <[email protected]> wrote:
> >>> On 4/27/21 9:26 PM, Robert Bradshaw wrote:
> >>>
> >>>> On Tue, Apr 27, 2021 at 12:05 PM Jan Lukavský <[email protected]>
> wrote:
> >>>>> On 4/27/21 8:51 PM, Robert Bradshaw wrote:
> >>>>>> On Tue, Apr 27, 2021 at 11:25 AM Jan Lukavský <[email protected]>
> wrote:
> >>>>>>>> Are you asking for a way to ignore early triggers on side input
> mapping, and only map to on-time triggered values for the window?
> >>>>>>> No, that could for sure be done before applying the View
> transform. I'd like a know if it would be possible to create mode of the
> matching which would be deterministic. One possibility to make it
> deterministic seems to be, that main input elements would be pushed back
> until side input watermark 'catches up' with main input. Whenever the side
> input watermark would be delayed after the main input watermark, elements
> would start to be pushed back again. Not sure if I'm explaining it using
> the right words. The side input watermark can be controlled using timer in
> an upstream transform, so this defines which elements in main input would
> be matched onto which pane of the side input.
> >>>>>> Perhaps I'm not following the request correctly, but this is exactly
> >>>>>> how side inputs work by default. It is only when one explicitly
> >>>>>> requests a non-deterministic trigger upstream of the side input
> (e.g.
> >>>>>> one that may fire multiple times or ahead of the watermark) that one
> >>>>>> sees a side input with multiple variations or data in the side input
> >>>>>> before the watermark of the side input is caught up to the main
> input.
> >>>>> Yes, exactly. But take the example of side input in global windows
> (on
> >>>>> both the main input and side input). Then there has to be multiple
> >>>>> firings per window, because otherwise the side input would be
> available
> >>>>> at the end of time, which is not practical. The trigger doesn't have
> to
> >>>>> be non-deterministic, the data might come from a stateful ParDo,
> using a
> >>>>> timer with output timestamp, which would make the downstream
> watermark
> >>>>> progress quite well defined. The matching would still be
> >>>>> nondeterministic in this case.
> >>>> If everything is in the global window, things get non-deterministic
> >>>> across PCollections. For example, say the main input has element m20
> >>>> and the side input has elements s10 and s30 (with the obvious
> >>>> timestamps). Suppose we have a total ordering of events as follows.
> >>>>
> >>>>       s10 arrives
> >>>>       side input watermark advances to 25
> >>>>       s30 arrives
> >>>>       side input watermark advances to 100
> >>>>       m20 arrives
> >>>>
> >>>> In this case, while processing m20, one would see both s10 and s30.
> >>>> Alternatively we could have had
> >>>>
> >>>>       s10 arrives
> >>>>       side input watermark advances to 25
> >>>>       m20 arrives
> >>>>       s30 arrives
> >>>>       side input watermark advances to 100
> >>>>
> >>>> in which case m20 would only see s10.
> >>> Yes, this is absolutely true, I didn't want to make things too
> >>> complicated, so I ignored this fact, that absolutely correct solution
> >>> would require the PCollectioView to be timestamp indexed so that if it
> >>> would receive both s10 and s30 it would correctly return s10 (as s30
> >>> didn't exist at m20, right?). I simplified this at this moment for this
> >>> discussion, thanks for clarifying.
> >>>> Windowing is exactly the mechanism that gives us a cross-PCollection
> >>>> barrier with which to line things up.
> >>> Should not watermark do this?
> >>>>>>> On 4/27/21 8:03 PM, Reuven Lax wrote:
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Apr 27, 2021 at 10:53 AM Jan Lukavský <[email protected]>
> wrote:
> >>>>>>>>> If using early triggers, then the side input loads the latest
> trigger for that window.
> >>>>>>>> Does not the the word 'latest' imply processing time matching?
> The watermark of main input might be arbitrarily delayed from the watermark
> of side input. If we consider GlobalWindows on both sides, than "latest"
> trigger in side input looks exactly as processing time matching. Yes,
> between different windows, the matching is in event time. But within the
> same window (ignoring the window mapping for now), the matching looks like
> processing time, right?
> >>>>>>> Not really. Think of each trigger of a window as a refinement - so
> the latest trigger for that window is our best approximation of the
> "correct" value for that window.  For this reason, window panes are indexed
> by an integer (pane_index), not by a timestamp. The idea here is that the
> main input element maps to the side input for the best, most-recent
> knowledge of the window.
> >>>>>>>
> >>>>>>> Are you asking for a way to ignore early triggers on side input
> mapping, and only map to on-time triggered values for the window?
> >>>>>>>
> >>>>>>>> If we look at the SimplePushbackSideInputDoFnRunner used by
> runners exactly for the side input matching, there is no testing of side
> input watermark to determine if an element should be 'pushed back' or
> processed. Element is processed if, and only if, all side inputs for
> particular window are ready.
> >>>>>>>>
> >>>>>>>> On 4/27/21 7:24 PM, Reuven Lax wrote:
> >>>>>>>>
> >>>>>>>> The windows shouldn't need to match - at least if the FlinkRunner
> implementation is correct. By default, the side-input's WindowFn should be
> used to map the main input's timestamp into a window, and that window is
> used to determine which version of the side input to load. A custom
> WindowFn can be used to to even more - e.g. if you want the main input
> element to map to the _previous_ window in the side input (you would do
> this by overloading getDefaultWindowMappingFn).
> >>>>>>>>
> >>>>>>>> If using early triggers, then the side input loads the latest
> trigger for that window. This is still an event-time mapping - for example
> two main-input events in two different windows will still map the the side
> input for the matching window. However if the side input PCollection is
> triggered, than the latest trigger for each window's side input will be
> loaded.
> >>>>>>>>
> >>>>>>>> It's possible that the FlinkRunner implementation is incomplete,
> in which case it should be fixed.
> >>>>>>>>
> >>>>>>>> On Tue, Apr 27, 2021 at 9:36 AM Jan Lukavský <[email protected]>
> wrote:
> >>>>>>>>> It seems to me, that this is true only with matching windows on
> both sides and default trigger of the side input. Then it will (due to
> condition a)) work as if the matching happenned in event time. But when
> using any early triggers then it will work in processing time. At least,
> that is my understanding from studying the code in FlinkRunner.
> >>>>>>>>>
> >>>>>>>>> On 4/27/21 4:05 PM, Robert Burke wrote:
> >>>>>>>>>
> >>>>>>>>> I thought the matching happened with elements in the matching
> window, in Event time, not in Processing time.
> >>>>>>>>>
> >>>>>>>>> Granted, I'm not that familiar with this area myself, but one
> key part of Beam is nearly everything is Event time by default, not
> Processing time.
> >>>>>>>>>
> >>>>>>>>> On Tue, Apr 27, 2021, 12:43 AM Jan Lukavský <[email protected]>
> wrote:
> >>>>>>>>>> Hi,
> >>>>>>>>>>
> >>>>>>>>>> I have a question about matching side inputs to main input.
> First a
> >>>>>>>>>> recap, to make sure I understand the current state correctly:
> >>>>>>>>>>
> >>>>>>>>>>      a) elements from main input are pushed back (stored in
> state) until a
> >>>>>>>>>> first side input pane arrives (that might be on time, or early)
> >>>>>>>>>>
> >>>>>>>>>>      b) after that, elements from the main input are matched to
> the current
> >>>>>>>>>> side input view - the view is updated as new data arrives, but
> is
> >>>>>>>>>> matched to the main input elements in processing time
> >>>>>>>>>>
> >>>>>>>>>> If this is the current state, my question is, would it be
> possible to
> >>>>>>>>>> add a new mode of matching of side inputs? Something like
> >>>>>>>>>>
> >>>>>>>>>>      ParDo.of(new MyDoFn()).withSideInput("name", myView,
> >>>>>>>>>> TimeDomain.EVENT_TIME)
> >>>>>>>>>>
> >>>>>>>>>> the semantics would be that the elements from the main
> PCollection would
> >>>>>>>>>> be stored into state as pairs with the value of the current
> main input
> >>>>>>>>>> watermark and on update of side-input watermark only main input
> elements
> >>>>>>>>>> with associated input watermark less than that of the side
> input would
> >>>>>>>>>> be matched with the side input and sent downstream. Although
> this
> >>>>>>>>>> approach is necessarily more expensive and introducing
> additional
> >>>>>>>>>> latency than processing time matching, there are situations when
> >>>>>>>>>> processing time matching is inapropriate for correctness
> reasons.
> >>>>>>>>>>
> >>>>>>>>>> WDYT?
> >>>>>>>>>>
> >>>>>>>>>>      Jan
> >>>>>>>>>>
>

Reply via email to