BTW another issue is when a single triggered PCollectionView is read by two
different ParDos - each one might have a different view of the trigger.
This is noticeable if the output of those two ParDos is then joined
together.

Reuven

On Thu, Apr 11, 2019 at 10:39 AM Kenneth Knowles <[email protected]> wrote:

> The consistency problem occurs even in a single output PCollection that is
> read as a side input, because two output elements can be re-bundled and
> materialized in separate updates to the side input.
>
> Kenn
>
> On Thu, Apr 11, 2019 at 10:36 AM Ruoyun Huang <[email protected]> wrote:
>
>> With little to none experience on Trigger, I am trying to understand the
>> problem statement in this discussion.
>>
>> If a user is aware of the potential non-deterministic behavior, isn't it
>> almost trivial to refactor his/her user code, by putting PCollectionViews S
>> and T into one single PCollectionView S', to get around the issue?     I
>> cannot think of a reason (wrong?) why a user *have* to put data into two
>> separate PCollectionViews in a single ParDo(A).
>>
>> On Thu, Apr 11, 2019 at 10:16 AM Lukasz Cwik <[email protected]> wrote:
>>
>>> Even though what Kenn points out is a major reason for me bringing up
>>> this topic, I didn't want to limit this discussion to how side inputs could
>>> work but in general what users want from their side inputs when dealing
>>> with multiple firings.
>>>
>>> On Thu, Apr 11, 2019 at 10:09 AM Kenneth Knowles <[email protected]>
>>> wrote:
>>>
>>>> Luke & I talked in person a bit. I want to give context for what is at
>>>> stake here, in terms of side inputs in portability. A decent starting place
>>>> is https://s.apache.org/beam-side-inputs-1-pager
>>>>
>>>> In that general design, the runner offers the SDK just one (or a few)
>>>> materialization strategies, and the SDK builds idiomatic structures on top
>>>> of it. Concretely, the Fn API today offers a multimap structure, and the
>>>> idea was that the SDK could cleverly prepare a PCollection<KV<...>> for the
>>>> runner to materialize. As a naive example, a simple iterable structure
>>>> could just map all elements to one dummy key in the multimap. But if you
>>>> wanted a list plus its length, then you might map all elements to an
>>>> element key and the length to a special length meta-key.
>>>>
>>>> So there is a problem: if the SDK is outputting a new
>>>> KV<"elements-key", ...> and KV<"length-key", ...> for the runner to
>>>> materialize then consumers of the side input need to see both updates to
>>>> the materialization or neither. In general, these outputs might span many
>>>> keys.
>>>>
>>>> It seems like there are a few ways to resolve this tension:
>>>>
>>>>  - Establish a consistency model so these updates will be observed
>>>> together. Seems hard and whatever we come up with will limit runners, limit
>>>> efficiency, and potentially leak into users having to reason about
>>>> concurrency
>>>>
>>>>  - Instead of building the variety of side input views on one primitive
>>>> multimap materialization, force runners to provide many primitive
>>>> materializations with consistency under the hood. Not hard to get started,
>>>> but adds an unfortunate new dimension for runners to vary in functionality
>>>> and performance, versus letting them optimize just one or a few
>>>> materializations
>>>>
>>>>  - Have no consistency and just not support side input methods that
>>>> would require consistent metadata. I'm curious what features this will 
>>>> hurt.
>>>>
>>>>  - Have no consistency but require the SDK to build some sort of large
>>>> value since single-element consistency is built in to the model always.
>>>> Today many runners do concatenate all elements into one value, though that
>>>> does not perform well. Making this effective probably requires new model
>>>> features.
>>>>
>>>> Kenn
>>>>
>>>> On Thu, Apr 11, 2019 at 9:44 AM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> One thing to keep in mind: triggers that fire multiple times per
>>>>> window already tend to be non deterministic. These are element-count or
>>>>> processing-time triggers, both of which are fairly non deterministic in
>>>>> firing.
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Thu, Apr 11, 2019 at 9:27 AM Lukasz Cwik <[email protected]> wrote:
>>>>>
>>>>>> Today, we define that a side input becomes available to be consumed
>>>>>> once at least one firing occurs or when the runner detects that no such
>>>>>> output could be produced (e.g. watermark is beyond the end of the window
>>>>>> when using the default trigger). For triggers that fire at most once,
>>>>>> consumers are guaranteed to have a consistent view of the contents of the
>>>>>> side input. But what happens when the trigger fire multiple times?
>>>>>>
>>>>>> Lets say we have a pipeline containing:
>>>>>> ParDo(A) --> PCollectionView S
>>>>>>          \-> PCollectionView T
>>>>>>
>>>>>>   ...
>>>>>>    |
>>>>>> ParDo(C) <-(side input)- PCollectionView S and PCollectionView T
>>>>>>    |
>>>>>>   ...
>>>>>>
>>>>>> 1) Lets say ParDo(A) outputs (during a single bundle) X and Y to
>>>>>> PCollectionView S, should ParDo(C) see be guaranteed to see X only if it
>>>>>> can also see Y (and vice versa)?
>>>>>>
>>>>>> 2) Lets say ParDo(A) outputs (during a single bundle) X to
>>>>>> PCollectionView S and Y to PCollectionView T, should ParDo(C) be 
>>>>>> guaranteed
>>>>>> to see X only if it can also see Y?
>>>>>>
>>>>>
>>
>> --
>> ================
>> Ruoyun  Huang
>>
>>

Reply via email to