On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> wrote:
> Hi, > > +1 for adding pipeline required features. I think being able to reject > pipeline with unknown requirement is pretty much needed, mostly because > that enables runners to completely decouple from SDKs, while being able to > recognize when a pipeline constructed with incomplatible version of SDK is > run. > > I'll add some observations I made when implementing the latest "requires > time sorted input" addition with regards to this discussion: > > a) the features of pipeline are not simple function of set of PTransforms > being present in the pipeline, but also depend on (type of) inputs. For > instance a PTransform might have a simple expansion to primitive > PTransforms in streaming case, but don't have such expansion in batch case. > That is to say, runner that doesn't actually know of a specific extension > to some PTransform _might_ actually execute it correctly under some > conditions. But _must_ fail in other cases. > > b) it would be good if this feature would work independently of > portability (for Java SDK). We still have (at least two) non-portable > runners that are IMO widely used in production and are likely to last for > some time. > I think even if these runners keep their execution not using portability, they should migrate to use the portable pipeline definition. Then they can share the same model w/ runners that execute using portability. The Fn API is not required to be used as long as the runner implements the semantics of the pipeline. Kenn > c) we can take advantage of these pipeline features to get rid of the > categories of @ValidatesRunner tests, because we could have just simply > @ValidatesRunner and each test would be matched against runner capabilities > (i.e. a runner would be tested with given test if and only if it would not > reject it) > > Jan > On 2/13/20 8:42 PM, Robert Burke wrote: > > +1 to deferring for now. Since they should not be modified after adoption, > it makes sense not to get ahead of ourselves. > > On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <rober...@google.com> > wrote: > >> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <rob...@frantil.com> wrote: >> > >> > One thing that doesn't appear to have been suggested yet is we could >> "batch" urns together under a "super urn" so that adding one super urn is >> like adding each of the represented batch of features. This prevents >> needing to send dozens of urns to be individually sent over. >> > >> > >> > The super urns would need to be static after definition to avoid >> mismatched definitions down the road. >> > >> > We collect together urns what is reasonably consider "vX" support, and >> can then increment that later. >> > >> > This would simplify new SDKs, as they can have a goal of initial v1 >> support as we define what level of feature support it has, and doesn't >> prevent new capabilities from being added incrementally. >> >> Yes, this is a very good idea. I've also been thinking of certain sets >> of common operations/well known DoFns that often occur on opposite >> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that >> are commonly supported that could be grouped under these meta-urns. >> >> Note that these need not be monotonic, for example a current v1 might >> be requiring LengthPrefixCoderV1, but if a more efficient >> LengthPrefixCoderV2 comes along eventually v2 could require that and >> *not* require the old, now rarely used LengthPrefixCoderV1. >> >> Probably makes sense to defer adding such super-urns until we notice a >> set that is commonly used together in practice. >> >> Of course there's still value in SDKs being able to support features >> piecemeal as well, which is the big reason we're avoiding a simple >> monotonically-increasing version number. >> >> > Similarly, certain features sets could stand alone, eg around SQL. It's >> benefitial for optimization reasons if an SDK has native projection and UDF >> support for example, which a runner could take advantage of by avoiding >> extra cross language hops. These could then also be grouped under a SQL >> super urn. >> > >> > This is from the SDK capability side of course, rather than the SDK >> pipeline requirements side. >> > >> > ------- >> > Related to that last point, it might be good to nail down early the >> perspective used when discussing these things, as there's a dual between >> "what and SDK can do", and "what the runner will do to a pipeline that the >> SDK can understand" (eg. Combiner lifting, and state backed iterables), as >> well as "what the pipeline requires from the runner" and "what the runner >> is able to do" (eg. Requires sorted input) >> > >> > >> > On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> wrote: >> >> >> >> >> >> >> >> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <k...@apache.org> >> wrote: >> >>> >> >>> >> >>> >> >>> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <rober...@google.com> >> wrote: >> >>>> >> >>>> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> wrote: >> >>>> > >> >>>> > We can always detect on the runner/SDK side whether there is an >> unknown field[1] within a payload and fail to process it but this is >> painful in two situations: >> >>>> > 1) It doesn't provide for a good error message since you can't say >> what the purpose of the field is. With a capability URN, the runner/SDK >> could say which URN it doesn't understand. >> >>>> > 2) It doesn't allow for the addition of fields which don't impact >> semantics of execution. For example, if the display data feature was being >> developed, a runner could ignore it and still execute the pipeline >> correctly. >> >>>> >> >>>> Yeah, I don't think proto reflection is a flexible enough tool to do >> >>>> this well either. >> >>>> >> >>>> > If we think this to be common enough, we can add capabilities list >> to the PTransform so each PTransform can do this and has a natural way of >> being extended for additions which are forwards compatible. The alternative >> to having capabilities on PTransform (and other constructs) is that we >> would have a new URN when the specification of the transform changes. For >> forwards compatible changes, each SDK/runner would map older versions of >> the URN onto the latest and internally treat it as the latest version but >> always downgrade it to the version the other party expects when >> communicating with it. Backwards incompatible changes would always require >> a new URN which capabilities at the PTransform level would not help with. >> >>>> >> >>>> As you point out, stateful+splittable may not be a particularly >> useful >> >>>> combination, but as another example, we have >> >>>> (backwards-incompatible-when-introduced) markers on DoFn as to >> whether >> >>>> it requires finalization, stable inputs, and now time sorting. I >> don't >> >>>> think we should have a new URN for each combination. >> >>> >> >>> >> >>> Agree with this. I don't think stateful, splittable, and "plain" >> ParDo are comparable to these. Each is an entirely different computational >> paradigm: per-element independent processing, per-key-and-window linear >> processing, and per-element-and-restriction splittable processing. Most >> relevant IMO is the nature of the parallelism. If you added state to >> splittable processing, it would still be splittable processing. Just as >> Combine and ParDo can share the SideInput specification, it is easy to >> share relevant sub-structures like state declarations. But it is a fair >> point that the ability to split can be ignored and run as a plain-old >> ParDo. It brings up the question of whether a runner that doesn't know SDF >> is should have to reject it or should be allowed to run poorly. >> >> >> >> >> >> Being splittable means that the SDK could choose to return a >> continuation saying please process the rest of my element in X amount of >> time which would require the runner to inspect certain fields on responses. >> One example would be I don't have many more messages to read from this >> message stream at the moment and another example could be that I detected >> that this filesystem is throttling me or is down and I would like to resume >> processing later. >> >> >> >>> >> >>> It isn't a huge deal. Three different top-level URNS versus three >> different sub-URNs will achieve the same result in the end if we get this >> "capability" thing in place. >> >>> >> >>> Kenn >> >>> >> >>>> >> >>>> >> >>>> >> > I do think that splittable ParDo and stateful ParDo should have >> separate PTransform URNs since they are different paradigms than "vanilla" >> ParDo. >> >>>> >> >> >>>> >> Here I disagree. What about one that is both splittable and >> stateful? Would one have a fourth URN for that? If/when another flavor of >> DoFn comes out, would we then want 8 distinct URNs? (SplitableParDo in >> particular can be executed as a normal ParDo as long as the output is >> bounded.) >> >>>> > >> >>>> > I agree that you could have stateful and splittable dofns where >> the element is the key and you share state and timers across restrictions. >> No runner is capable of executing this efficiently. >> >>>> > >> >>>> >> >> > On the SDK requirements side: the constructing SDK owns the >> Environment proto completely, so it is in a position to ensure the involved >> docker images support the necessary features. >> >>>> >> >> >> >>>> >> >> Yes. >> >>>> > >> >>>> > >> >>>> > I believe capabilities do exist on a Pipeline and it informs >> runners about new types of fields to be aware of either within Components >> or on the Pipeline object itself but for this discussion it makes sense >> that an environment would store most "capabilities" related to execution. >> >>>> > >> >>>> >> [snip] >> >>>> > >> >>>> > As for the proto clean-ups, the scope is to cover almost all >> things needed for execution now and to follow-up with optional transforms, >> payloads, and coders later which would exclude job managment APIs and >> artifact staging. A formal enumeration would be useful here. Also, we >> should provide formal guidance about adding new fields, adding new types of >> transforms, new types of proto messages, ... (best to describe this on a >> case by case basis as to how people are trying to modify the protos and >> evolve this guidance over time). >> >>>> >> >>>> What we need is the ability for (1) runners to reject future >> pipelines >> >>>> they cannot faithfully execute and (2) runners to be able to take >> >>>> advantage of advanced features/protocols when interacting with those >> >>>> SDKs that understand them while avoiding them for older (or newer) >> >>>> SDKs that don't. Let's call (1) (hard) requirements and (2) >> (optional) >> >>>> capabilities. >> >>>> >> >>>> Where possible, I think this is best expressed inherently in the set >> >>>> of transform (and possibly other component) URNs. For example, when >> an >> >>>> SDK uses a combine_per_key composite, that's a signal that it >> >>>> understands the various related combine_* transforms. Similarly, a >> >>>> pipeline with a test_stream URN would be rejected by pipelines not >> >>>> recognizing/supporting this primitive. However, this is not always >> >>>> possible, e.g. for (1) we have the aforementioned boolean flags on >> >>>> ParDo and for (2) we have features like large iterable and progress >> >>>> support. >> >>>> >> >>>> For (1) we have to enumerate now everywhere a runner must look a far >> >>>> into the future as we want to remain backwards compatible. This is >> why >> >>>> I suggested putting something on the pipeline itself, but we could >> >>>> (likely in addition) add it to Transform and/or ParDoPayload if we >> >>>> think that'd be useful now. (Note that a future pipeline-level >> >>>> requirement could be "inspect (previously non-existent) requirements >> >>>> field attached to objects of type X.") >> >>>> >> >>>> For (2) I think adding a capabilities field to the environment for >> now >> >>>> makes the most sense, and as it's optional to inspect them adding it >> >>>> elsewhere if needed is backwards compatible. (The motivation to do it >> >>>> now is that there are some capabilities that we'd like to enumerate >> >>>> now rather than make part of the minimal set of things an SDK must >> >>>> support.) >> >>>> >> >> >> >> Agree on the separation of requirements from capabilities where >> requirements is a set of MUST understand while capabilities are a set of >> MAY understand. >> >> >> >>>> >> >>>> > All in all, I think "capabilities" is about informing a runner >> about what they should know about and what they are allowed to do. If we go >> with a list of "capabilities", we could always add a "parameterized >> capabilities" urn which would tell runners they need to also look at some >> other field. >> >>>> >> >>>> Good point. That lets us keep it as a list for now. (The risk is that >> >>>> it makes possible the bug of populating parameters without adding the >> >>>> required notification to the list.) >> >>>> >> >>>> > I also believe capabilities should NOT be "inherited". For example >> if we define capabilities on a ParDoPayload, and on a PTransform and on >> Environment, then ParDoPayload capabilities shouldn't be copied to >> PTransform and PTransform specific capabilities shouldn't be copied to the >> Environment. My reasoning about this is that some "capabilities" can only >> be scoped to a single ParDoPayload or a single PTransform and wouldn't >> apply generally everywhere. The best example I could think of is that >> Environment A supports progress reporting while Environment B doesn't so it >> wouldn't have made sense to say the "Pipeline" supports progress reporting. >> >>>> > >> >>>> > Are capabilities strictly different from "resources" (transform >> needs python package X) or "execution hints" (e.g. deploy on machines that >> have GPUs, some generic but mostly runner specific hints)? At first glance >> I would say yes. >> >>>> >> >>>> Agreed. >> >