BTW another issue is when a single triggered PCollectionView is read by two different ParDos - each one might have a different view of the trigger. This is noticeable if the output of those two ParDos is then joined together.
Reuven On Thu, Apr 11, 2019 at 10:39 AM Kenneth Knowles <[email protected]> wrote: > The consistency problem occurs even in a single output PCollection that is > read as a side input, because two output elements can be re-bundled and > materialized in separate updates to the side input. > > Kenn > > On Thu, Apr 11, 2019 at 10:36 AM Ruoyun Huang <[email protected]> wrote: > >> With little to none experience on Trigger, I am trying to understand the >> problem statement in this discussion. >> >> If a user is aware of the potential non-deterministic behavior, isn't it >> almost trivial to refactor his/her user code, by putting PCollectionViews S >> and T into one single PCollectionView S', to get around the issue? I >> cannot think of a reason (wrong?) why a user *have* to put data into two >> separate PCollectionViews in a single ParDo(A). >> >> On Thu, Apr 11, 2019 at 10:16 AM Lukasz Cwik <[email protected]> wrote: >> >>> Even though what Kenn points out is a major reason for me bringing up >>> this topic, I didn't want to limit this discussion to how side inputs could >>> work but in general what users want from their side inputs when dealing >>> with multiple firings. >>> >>> On Thu, Apr 11, 2019 at 10:09 AM Kenneth Knowles <[email protected]> >>> wrote: >>> >>>> Luke & I talked in person a bit. I want to give context for what is at >>>> stake here, in terms of side inputs in portability. A decent starting place >>>> is https://s.apache.org/beam-side-inputs-1-pager >>>> >>>> In that general design, the runner offers the SDK just one (or a few) >>>> materialization strategies, and the SDK builds idiomatic structures on top >>>> of it. Concretely, the Fn API today offers a multimap structure, and the >>>> idea was that the SDK could cleverly prepare a PCollection<KV<...>> for the >>>> runner to materialize. As a naive example, a simple iterable structure >>>> could just map all elements to one dummy key in the multimap. But if you >>>> wanted a list plus its length, then you might map all elements to an >>>> element key and the length to a special length meta-key. >>>> >>>> So there is a problem: if the SDK is outputting a new >>>> KV<"elements-key", ...> and KV<"length-key", ...> for the runner to >>>> materialize then consumers of the side input need to see both updates to >>>> the materialization or neither. In general, these outputs might span many >>>> keys. >>>> >>>> It seems like there are a few ways to resolve this tension: >>>> >>>> - Establish a consistency model so these updates will be observed >>>> together. Seems hard and whatever we come up with will limit runners, limit >>>> efficiency, and potentially leak into users having to reason about >>>> concurrency >>>> >>>> - Instead of building the variety of side input views on one primitive >>>> multimap materialization, force runners to provide many primitive >>>> materializations with consistency under the hood. Not hard to get started, >>>> but adds an unfortunate new dimension for runners to vary in functionality >>>> and performance, versus letting them optimize just one or a few >>>> materializations >>>> >>>> - Have no consistency and just not support side input methods that >>>> would require consistent metadata. I'm curious what features this will >>>> hurt. >>>> >>>> - Have no consistency but require the SDK to build some sort of large >>>> value since single-element consistency is built in to the model always. >>>> Today many runners do concatenate all elements into one value, though that >>>> does not perform well. Making this effective probably requires new model >>>> features. >>>> >>>> Kenn >>>> >>>> On Thu, Apr 11, 2019 at 9:44 AM Reuven Lax <[email protected]> wrote: >>>> >>>>> One thing to keep in mind: triggers that fire multiple times per >>>>> window already tend to be non deterministic. These are element-count or >>>>> processing-time triggers, both of which are fairly non deterministic in >>>>> firing. >>>>> >>>>> Reuven >>>>> >>>>> On Thu, Apr 11, 2019 at 9:27 AM Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> Today, we define that a side input becomes available to be consumed >>>>>> once at least one firing occurs or when the runner detects that no such >>>>>> output could be produced (e.g. watermark is beyond the end of the window >>>>>> when using the default trigger). For triggers that fire at most once, >>>>>> consumers are guaranteed to have a consistent view of the contents of the >>>>>> side input. But what happens when the trigger fire multiple times? >>>>>> >>>>>> Lets say we have a pipeline containing: >>>>>> ParDo(A) --> PCollectionView S >>>>>> \-> PCollectionView T >>>>>> >>>>>> ... >>>>>> | >>>>>> ParDo(C) <-(side input)- PCollectionView S and PCollectionView T >>>>>> | >>>>>> ... >>>>>> >>>>>> 1) Lets say ParDo(A) outputs (during a single bundle) X and Y to >>>>>> PCollectionView S, should ParDo(C) see be guaranteed to see X only if it >>>>>> can also see Y (and vice versa)? >>>>>> >>>>>> 2) Lets say ParDo(A) outputs (during a single bundle) X to >>>>>> PCollectionView S and Y to PCollectionView T, should ParDo(C) be >>>>>> guaranteed >>>>>> to see X only if it can also see Y? >>>>>> >>>>> >> >> -- >> ================ >> Ruoyun Huang >> >>
