Wrt per DoFn/ParDo level, there's the similar case of wether the DoFn has
an Urn for requiring something or it's an annotation for saying the DoFn
provides something (eg. Provides K-anonymization with k defined)

The general theme of this thread seems to be trying to ensure a runner can
reject a pipeline if it's not able to provide the right guarantees, so that
latter case isn't handled.

Eg. The latter provisions could be used to analyze a pipeline to ensure the
outputs are all properly anonymized to a certain degree at construction
time.

On Thu, Feb 13, 2020, 2:26 PM Kenneth Knowles <k...@apache.org> wrote:

>
>
> On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> +1 for adding pipeline required features. I think being able to reject
>> pipeline with unknown requirement is pretty much needed, mostly because
>> that enables runners to completely decouple from SDKs, while being able to
>> recognize when a pipeline constructed with incomplatible version of SDK is
>> run.
>>
>> I'll add some observations I made when implementing the latest "requires
>> time sorted input" addition with regards to this discussion:
>>
>>  a) the features of pipeline are not simple function of set of
>> PTransforms being present in the pipeline, but also depend on (type of)
>> inputs. For instance a PTransform might have a simple expansion to
>> primitive PTransforms in streaming case, but don't have such expansion in
>> batch case. That is to say, runner that doesn't actually know of a specific
>> extension to some PTransform _might_ actually execute it correctly under
>> some conditions. But _must_ fail in other cases.
>>
>>  b) it would be good if this feature would work independently of
>> portability (for Java SDK). We still have (at least two) non-portable
>> runners that are IMO widely used in production and are likely to last for
>> some time.
>>
> I think even if these runners keep their execution not using portability,
> they should migrate to use the portable pipeline definition. Then they can
> share the same model w/ runners that execute using portability. The Fn API
> is not required to be used as long as the runner implements the semantics
> of the pipeline.
>
> Kenn
>
>
>>  c) we can take advantage of these pipeline features to get rid of the
>> categories of @ValidatesRunner tests, because we could have just simply
>> @ValidatesRunner and each test would be matched against runner capabilities
>> (i.e. a runner would be tested with given test if and only if it would not
>> reject it)
>>
>> Jan
>> On 2/13/20 8:42 PM, Robert Burke wrote:
>>
>> +1 to deferring for now. Since they should not be modified after
>> adoption, it makes sense not to get ahead of ourselves.
>>
>> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <rob...@frantil.com>
>>> wrote:
>>> >
>>> > One thing that doesn't appear to have been suggested yet is we could
>>> "batch" urns together under a "super urn" so that adding one super urn is
>>> like adding each of the represented batch of features. This prevents
>>> needing to send dozens of urns to be individually sent over.
>>> >
>>> >
>>> > The super urns would need to be static after definition to avoid
>>> mismatched definitions down the road.
>>> >
>>> > We collect together urns what is reasonably consider "vX" support, and
>>> can then increment that later.
>>> >
>>> > This would simplify new SDKs, as they can have a goal of initial v1
>>> support as we define what level of feature support it has, and doesn't
>>> prevent new capabilities from being added incrementally.
>>>
>>> Yes, this is a very good idea. I've also been thinking of certain sets
>>> of common operations/well known DoFns that often occur on opposite
>>> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
>>> are commonly supported that could be grouped under these meta-urns.
>>>
>>> Note that these need not be monotonic, for example a current v1 might
>>> be requiring LengthPrefixCoderV1, but if a more efficient
>>> LengthPrefixCoderV2 comes along eventually v2 could require that and
>>> *not* require the old, now rarely used LengthPrefixCoderV1.
>>>
>>> Probably makes sense to defer adding such super-urns until we notice a
>>> set that is commonly used together in practice.
>>>
>>> Of course there's still value in SDKs being able to support features
>>> piecemeal as well, which is the big reason we're avoiding a simple
>>> monotonically-increasing version number.
>>>
>>> > Similarly, certain features sets could stand alone, eg around SQL.
>>> It's benefitial for optimization reasons if an SDK has native projection
>>> and UDF support for example, which a runner could take advantage of by
>>> avoiding extra cross language hops. These could then also be grouped under
>>> a SQL super urn.
>>> >
>>> > This is from the SDK capability side of course, rather than the SDK
>>> pipeline requirements side.
>>> >
>>> > -------
>>> > Related to that last point, it might be good to nail down early the
>>> perspective used when discussing these things, as there's a dual between
>>> "what and SDK can do", and "what the runner will do to a pipeline that the
>>> SDK can understand" (eg. Combiner lifting, and state backed iterables), as
>>> well as "what the pipeline requires from the runner" and "what the runner
>>> is able to do" (eg. Requires sorted input)
>>> >
>>> >
>>> > On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> wrote:
>>> >>
>>> >>
>>> >>
>>> >> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <k...@apache.org>
>>> wrote:
>>> >>>
>>> >>>
>>> >>>
>>> >>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <
>>> rober...@google.com> wrote:
>>> >>>>
>>> >>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com>
>>> wrote:
>>> >>>> >
>>> >>>> > We can always detect on the runner/SDK side whether there is an
>>> unknown field[1] within a payload and fail to process it but this is
>>> painful in two situations:
>>> >>>> > 1) It doesn't provide for a good error message since you can't
>>> say what the purpose of the field is. With a capability URN, the runner/SDK
>>> could say which URN it doesn't understand.
>>> >>>> > 2) It doesn't allow for the addition of fields which don't impact
>>> semantics of execution. For example, if the display data feature was being
>>> developed, a runner could ignore it and still execute the pipeline
>>> correctly.
>>> >>>>
>>> >>>> Yeah, I don't think proto reflection is a flexible enough tool to do
>>> >>>> this well either.
>>> >>>>
>>> >>>> > If we think this to be common enough, we can add capabilities
>>> list to the PTransform so each PTransform can do this and has a natural way
>>> of being extended for additions which are forwards compatible. The
>>> alternative to having capabilities on PTransform (and other constructs) is
>>> that we would have a new URN when the specification of the transform
>>> changes. For forwards compatible changes, each SDK/runner would map older
>>> versions of the URN onto the latest and internally treat it as the latest
>>> version but always downgrade it to the version the other party expects when
>>> communicating with it. Backwards incompatible changes would always require
>>> a new URN which capabilities at the PTransform level would not help with.
>>> >>>>
>>> >>>> As you point out, stateful+splittable may not be a particularly
>>> useful
>>> >>>> combination, but as another example, we have
>>> >>>> (backwards-incompatible-when-introduced) markers on DoFn as to
>>> whether
>>> >>>> it requires finalization, stable inputs, and now time sorting. I
>>> don't
>>> >>>> think we should have a new URN for each combination.
>>> >>>
>>> >>>
>>> >>> Agree with this. I don't think stateful, splittable, and "plain"
>>> ParDo are comparable to these. Each is an entirely different computational
>>> paradigm: per-element independent processing, per-key-and-window linear
>>> processing, and per-element-and-restriction splittable processing. Most
>>> relevant IMO is the nature of the parallelism. If you added state to
>>> splittable processing, it would still be splittable processing. Just as
>>> Combine and ParDo can share the SideInput specification, it is easy to
>>> share relevant sub-structures like state declarations. But it is a fair
>>> point that the ability to split can be ignored and run as a plain-old
>>> ParDo. It brings up the question of whether a runner that doesn't know SDF
>>> is should have to reject it or should be allowed to run poorly.
>>> >>
>>> >>
>>> >> Being splittable means that the SDK could choose to return a
>>> continuation saying please process the rest of my element in X amount of
>>> time which would require the runner to inspect certain fields on responses.
>>> One example would be I don't have many more messages to read from this
>>> message stream at the moment and another example could be that I detected
>>> that this filesystem is throttling me or is down and I would like to resume
>>> processing later.
>>> >>
>>> >>>
>>> >>> It isn't a huge deal. Three different top-level URNS versus three
>>> different sub-URNs will achieve the same result in the end if we get this
>>> "capability" thing in place.
>>> >>>
>>> >>> Kenn
>>> >>>
>>> >>>>
>>> >>>>
>>> >>>> >> > I do think that splittable ParDo and stateful ParDo should
>>> have separate PTransform URNs since they are different paradigms than
>>> "vanilla" ParDo.
>>> >>>> >>
>>> >>>> >> Here I disagree. What about one that is both splittable and
>>> stateful? Would one have a fourth URN for that? If/when another flavor of
>>> DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in
>>> particular can be executed as a normal ParDo as long as the output is
>>> bounded.)
>>> >>>> >
>>> >>>> > I agree that you could have stateful and splittable dofns where
>>> the element is the key and you share state and timers across restrictions.
>>> No runner is capable of executing this efficiently.
>>> >>>> >
>>> >>>> >> >> > On the SDK requirements side: the constructing SDK owns the
>>> Environment proto completely, so it is in a position to ensure the involved
>>> docker images support the necessary features.
>>> >>>> >> >>
>>> >>>> >> >> Yes.
>>> >>>> >
>>> >>>> >
>>> >>>> > I believe capabilities do exist on a Pipeline and it informs
>>> runners about new types of fields to be aware of either within Components
>>> or on the Pipeline object itself but for this discussion it makes sense
>>> that an environment would store most "capabilities" related to execution.
>>> >>>> >
>>> >>>> >> [snip]
>>> >>>> >
>>> >>>> > As for the proto clean-ups, the scope is to cover almost all
>>> things needed for execution now and to follow-up with optional transforms,
>>> payloads, and coders later which would exclude job managment APIs and
>>> artifact staging. A formal enumeration would be useful here. Also, we
>>> should provide formal guidance about adding new fields, adding new types of
>>> transforms, new types of proto messages, ... (best to describe this on a
>>> case by case basis as to how people are trying to modify the protos and
>>> evolve this guidance over time).
>>> >>>>
>>> >>>> What we need is the ability for (1) runners to reject future
>>> pipelines
>>> >>>> they cannot faithfully execute and (2) runners to be able to take
>>> >>>> advantage of advanced features/protocols when interacting with those
>>> >>>> SDKs that understand them while avoiding them for older (or newer)
>>> >>>> SDKs that don't. Let's call (1) (hard) requirements and (2)
>>> (optional)
>>> >>>> capabilities.
>>> >>>>
>>> >>>> Where possible, I think this is best expressed inherently in the set
>>> >>>> of transform (and possibly other component) URNs. For example, when
>>> an
>>> >>>> SDK uses a combine_per_key composite, that's a signal that it
>>> >>>> understands the various related combine_* transforms. Similarly, a
>>> >>>> pipeline with a test_stream URN would be rejected by pipelines not
>>> >>>> recognizing/supporting this primitive. However, this is not always
>>> >>>> possible, e.g. for (1) we have the aforementioned boolean flags on
>>> >>>> ParDo and for (2) we have features like large iterable and progress
>>> >>>> support.
>>> >>>>
>>> >>>> For (1) we have to enumerate now everywhere a runner must look a far
>>> >>>> into the future as we want to remain backwards compatible. This is
>>> why
>>> >>>> I suggested putting something on the pipeline itself, but we could
>>> >>>> (likely in addition) add it to Transform and/or ParDoPayload if we
>>> >>>> think that'd be useful now. (Note that a future pipeline-level
>>> >>>> requirement could be "inspect (previously non-existent) requirements
>>> >>>> field attached to objects of type X.")
>>> >>>>
>>> >>>> For (2) I think adding a capabilities field to the environment for
>>> now
>>> >>>> makes the most sense, and as it's optional to inspect them adding it
>>> >>>> elsewhere if needed is backwards compatible. (The motivation to do
>>> it
>>> >>>> now is that there are some capabilities that we'd like to enumerate
>>> >>>> now rather than make part of the minimal set of things an SDK must
>>> >>>> support.)
>>> >>>>
>>> >>
>>> >> Agree on the separation of requirements from capabilities where
>>> requirements is a set of MUST understand while capabilities are a set of
>>> MAY understand.
>>> >>
>>> >>>>
>>> >>>> > All in all, I think "capabilities" is about informing a runner
>>> about what they should know about and what they are allowed to do. If we go
>>> with a list of "capabilities", we could always add a "parameterized
>>> capabilities" urn which would tell runners they need to also look at some
>>> other field.
>>> >>>>
>>> >>>> Good point. That lets us keep it as a list for now. (The risk is
>>> that
>>> >>>> it makes possible the bug of populating parameters without adding
>>> the
>>> >>>> required notification to the list.)
>>> >>>>
>>> >>>> > I also believe capabilities should NOT be "inherited". For
>>> example if we define capabilities on a ParDoPayload, and on a PTransform
>>> and on Environment, then ParDoPayload capabilities shouldn't be copied to
>>> PTransform and PTransform specific capabilities shouldn't be copied to the
>>> Environment. My reasoning about this is that some "capabilities" can only
>>> be scoped to a single ParDoPayload or a single PTransform and wouldn't
>>> apply generally everywhere. The best example I could think of is that
>>> Environment A supports progress reporting while Environment B doesn't so it
>>> wouldn't have made sense to say the "Pipeline" supports progress reporting.
>>> >>>> >
>>> >>>> > Are capabilities strictly different from "resources" (transform
>>> needs python package X) or "execution hints" (e.g. deploy on machines that
>>> have GPUs, some generic but mostly runner specific hints)? At first glance
>>> I would say yes.
>>> >>>>
>>> >>>> Agreed.
>>>
>>

Reply via email to