Which part of the proposal do you think is solving a problem we may not have?
On Thu, Feb 20, 2020 at 8:19 PM Kenneth Knowles <[email protected]> wrote: > I would rather say that "runners-core" is a utility library with some > helpful things. Like other libraries. The runner still decides how to use > the library. That was the idea, anyhow. A runner could have a bunch of "if" > statements around how it uses some generic runners-core utility, etc. I > think at this point the proposal is trying to solve a problem we may not > have. > > Kenn > > On Thu, Feb 20, 2020 at 1:25 PM Jan Lukavský <[email protected]> wrote: > >> >> On 2/20/20 8:24 PM, Robert Bradshaw wrote: >> >> On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <[email protected]> >> <[email protected]> wrote: >> >> Hi, >> >> +1 for adding pipeline required features. I think being able to reject >> pipeline with unknown requirement is pretty much needed, mostly because that >> enables runners to completely decouple from SDKs, while being able to >> recognize when a pipeline constructed with incomplatible version of SDK is >> run. >> >> I'll add some observations I made when implementing the latest "requires >> time sorted input" addition with regards to this discussion: >> >> a) the features of pipeline are not simple function of set of PTransforms >> being present in the pipeline, but also depend on (type of) inputs. For >> instance a PTransform might have a simple expansion to primitive PTransforms >> in streaming case, but don't have such expansion in batch case. That is to >> say, runner that doesn't actually know of a specific extension to some >> PTransform _might_ actually execute it correctly under some conditions. But >> _must_ fail in other cases. >> >> It sounds like what you're getting at here is a Statful ParDo that >> requires "mostly" time sorted input (to keep the amount of state held >> bounded) which is somewhat provided (with no bounds given) for >> unbounded PCollections but not at all (in general) for batch. Rather >> than phrase this as a conditional requirement, I would make a new >> requirement "requires mostly time sorted input" (precise definition >> TBD, it's hard to specify or guarantee upper bounds) which a runner >> could then implement via exact time sorted input in batch and but more >> cheaply as a no-op in streaming. >> >> +1, that makes sense. My example was a little incomplete, in the sense >> that, for @RequiresTimeSortedInput does not have any requirements on runner >> in streaming case, with one exception - the runner must be compiled with >> the newest runners-core. That brings us to the fact, that runners >> capabilities are actually not just function of the runner's code, but also >> code that is imported from runners-core. There probably should be a way for >> the core to export its capabilities (e.g. provides: >> beam:requirement:pardo:time_sorted_input:streaming:v1), which should >> then be united with capabilities of the runner itself. That way a runner >> which uses runners-core (and StatefulDoFnRunner, that is a complication, >> not sure how to deal with that), could be made able to satify >> 'beam:requirement:pardo:time_sorted_input:streaming:v1' >> simply by recompiling the runner with newest core. >> >> b) it would be good if this feature would work independently of portability >> (for Java SDK). We still have (at least two) non-portable runners that are >> IMO widely used in production and are likely to last for some time. >> >> Yes. As mentioned, we can still convert to portability to do such >> analysis even if we don't use it for execution. >> >> >> c) we can take advantage of these pipeline features to get rid of the >> categories of @ValidatesRunner tests, because we could have just simply >> @ValidatesRunner and each test would be matched against runner capabilities >> (i.e. a runner would be tested with given test if and only if it would not >> reject it) >> >> +1 >> >> >> Jan >> >> On 2/13/20 8:42 PM, Robert Burke wrote: >> >> +1 to deferring for now. Since they should not be modified after adoption, >> it makes sense not to get ahead of ourselves. >> >> On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <[email protected]> >> <[email protected]> wrote: >> >> On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <[email protected]> >> <[email protected]> wrote: >> >> One thing that doesn't appear to have been suggested yet is we could "batch" >> urns together under a "super urn" so that adding one super urn is like >> adding each of the represented batch of features. This prevents needing to >> send dozens of urns to be individually sent over. >> >> >> The super urns would need to be static after definition to avoid mismatched >> definitions down the road. >> >> We collect together urns what is reasonably consider "vX" support, and can >> then increment that later. >> >> This would simplify new SDKs, as they can have a goal of initial v1 support >> as we define what level of feature support it has, and doesn't prevent new >> capabilities from being added incrementally. >> >> Yes, this is a very good idea. I've also been thinking of certain sets >> of common operations/well known DoFns that often occur on opposite >> sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that >> are commonly supported that could be grouped under these meta-urns. >> >> Note that these need not be monotonic, for example a current v1 might >> be requiring LengthPrefixCoderV1, but if a more efficient >> LengthPrefixCoderV2 comes along eventually v2 could require that and >> *not* require the old, now rarely used LengthPrefixCoderV1. >> >> Probably makes sense to defer adding such super-urns until we notice a >> set that is commonly used together in practice. >> >> Of course there's still value in SDKs being able to support features >> piecemeal as well, which is the big reason we're avoiding a simple >> monotonically-increasing version number. >> >> >> Similarly, certain features sets could stand alone, eg around SQL. It's >> benefitial for optimization reasons if an SDK has native projection and UDF >> support for example, which a runner could take advantage of by avoiding >> extra cross language hops. These could then also be grouped under a SQL >> super urn. >> >> This is from the SDK capability side of course, rather than the SDK pipeline >> requirements side. >> >> ------- >> Related to that last point, it might be good to nail down early the >> perspective used when discussing these things, as there's a dual between >> "what and SDK can do", and "what the runner will do to a pipeline that the >> SDK can understand" (eg. Combiner lifting, and state backed iterables), as >> well as "what the pipeline requires from the runner" and "what the runner is >> able to do" (eg. Requires sorted input) >> >> >> On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <[email protected]> >> <[email protected]> wrote: >> >> On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <[email protected]> >> <[email protected]> wrote: >> >> On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <[email protected]> >> <[email protected]> wrote: >> >> On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <[email protected]> >> <[email protected]> wrote: >> >> We can always detect on the runner/SDK side whether there is an unknown >> field[1] within a payload and fail to process it but this is painful in two >> situations: >> 1) It doesn't provide for a good error message since you can't say what the >> purpose of the field is. With a capability URN, the runner/SDK could say >> which URN it doesn't understand. >> 2) It doesn't allow for the addition of fields which don't impact semantics >> of execution. For example, if the display data feature was being developed, >> a runner could ignore it and still execute the pipeline correctly. >> >> Yeah, I don't think proto reflection is a flexible enough tool to do >> this well either. >> >> >> If we think this to be common enough, we can add capabilities list to the >> PTransform so each PTransform can do this and has a natural way of being >> extended for additions which are forwards compatible. The alternative to >> having capabilities on PTransform (and other constructs) is that we would >> have a new URN when the specification of the transform changes. For forwards >> compatible changes, each SDK/runner would map older versions of the URN onto >> the latest and internally treat it as the latest version but always >> downgrade it to the version the other party expects when communicating with >> it. Backwards incompatible changes would always require a new URN which >> capabilities at the PTransform level would not help with. >> >> As you point out, stateful+splittable may not be a particularly useful >> combination, but as another example, we have >> (backwards-incompatible-when-introduced) markers on DoFn as to whether >> it requires finalization, stable inputs, and now time sorting. I don't >> think we should have a new URN for each combination. >> >> Agree with this. I don't think stateful, splittable, and "plain" ParDo are >> comparable to these. Each is an entirely different computational paradigm: >> per-element independent processing, per-key-and-window linear processing, >> and per-element-and-restriction splittable processing. Most relevant IMO is >> the nature of the parallelism. If you added state to splittable processing, >> it would still be splittable processing. Just as Combine and ParDo can share >> the SideInput specification, it is easy to share relevant sub-structures >> like state declarations. But it is a fair point that the ability to split >> can be ignored and run as a plain-old ParDo. It brings up the question of >> whether a runner that doesn't know SDF is should have to reject it or should >> be allowed to run poorly. >> >> Being splittable means that the SDK could choose to return a continuation >> saying please process the rest of my element in X amount of time which would >> require the runner to inspect certain fields on responses. One example would >> be I don't have many more messages to read from this message stream at the >> moment and another example could be that I detected that this filesystem is >> throttling me or is down and I would like to resume processing later. >> >> >> It isn't a huge deal. Three different top-level URNS versus three different >> sub-URNs will achieve the same result in the end if we get this "capability" >> thing in place. >> >> Kenn >> >> >> I do think that splittable ParDo and stateful ParDo should have separate >> PTransform URNs since they are different paradigms than "vanilla" ParDo. >> >> Here I disagree. What about one that is both splittable and stateful? Would >> one have a fourth URN for that? If/when another flavor of DoFn comes out, >> would we then want 8 distinct URNs? (SplitableParDo in particular can be >> executed as a normal ParDo as long as the output is bounded.) >> >> I agree that you could have stateful and splittable dofns where the element >> is the key and you share state and timers across restrictions. No runner is >> capable of executing this efficiently. >> >> >> On the SDK requirements side: the constructing SDK owns the Environment >> proto completely, so it is in a position to ensure the involved docker >> images support the necessary features. >> >> Yes. >> >> I believe capabilities do exist on a Pipeline and it informs runners about >> new types of fields to be aware of either within Components or on the >> Pipeline object itself but for this discussion it makes sense that an >> environment would store most "capabilities" related to execution. >> >> >> [snip] >> >> As for the proto clean-ups, the scope is to cover almost all things needed >> for execution now and to follow-up with optional transforms, payloads, and >> coders later which would exclude job managment APIs and artifact staging. A >> formal enumeration would be useful here. Also, we should provide formal >> guidance about adding new fields, adding new types of transforms, new types >> of proto messages, ... (best to describe this on a case by case basis as to >> how people are trying to modify the protos and evolve this guidance over >> time). >> >> What we need is the ability for (1) runners to reject future pipelines >> they cannot faithfully execute and (2) runners to be able to take >> advantage of advanced features/protocols when interacting with those >> SDKs that understand them while avoiding them for older (or newer) >> SDKs that don't. Let's call (1) (hard) requirements and (2) (optional) >> capabilities. >> >> Where possible, I think this is best expressed inherently in the set >> of transform (and possibly other component) URNs. For example, when an >> SDK uses a combine_per_key composite, that's a signal that it >> understands the various related combine_* transforms. Similarly, a >> pipeline with a test_stream URN would be rejected by pipelines not >> recognizing/supporting this primitive. However, this is not always >> possible, e.g. for (1) we have the aforementioned boolean flags on >> ParDo and for (2) we have features like large iterable and progress >> support. >> >> For (1) we have to enumerate now everywhere a runner must look a far >> into the future as we want to remain backwards compatible. This is why >> I suggested putting something on the pipeline itself, but we could >> (likely in addition) add it to Transform and/or ParDoPayload if we >> think that'd be useful now. (Note that a future pipeline-level >> requirement could be "inspect (previously non-existent) requirements >> field attached to objects of type X.") >> >> For (2) I think adding a capabilities field to the environment for now >> makes the most sense, and as it's optional to inspect them adding it >> elsewhere if needed is backwards compatible. (The motivation to do it >> now is that there are some capabilities that we'd like to enumerate >> now rather than make part of the minimal set of things an SDK must >> support.) >> >> >> Agree on the separation of requirements from capabilities where requirements >> is a set of MUST understand while capabilities are a set of MAY understand. >> >> >> All in all, I think "capabilities" is about informing a runner about what >> they should know about and what they are allowed to do. If we go with a list >> of "capabilities", we could always add a "parameterized capabilities" urn >> which would tell runners they need to also look at some other field. >> >> Good point. That lets us keep it as a list for now. (The risk is that >> it makes possible the bug of populating parameters without adding the >> required notification to the list.) >> >> >> I also believe capabilities should NOT be "inherited". For example if we >> define capabilities on a ParDoPayload, and on a PTransform and on >> Environment, then ParDoPayload capabilities shouldn't be copied to >> PTransform and PTransform specific capabilities shouldn't be copied to the >> Environment. My reasoning about this is that some "capabilities" can only be >> scoped to a single ParDoPayload or a single PTransform and wouldn't apply >> generally everywhere. The best example I could think of is that Environment >> A supports progress reporting while Environment B doesn't so it wouldn't >> have made sense to say the "Pipeline" supports progress reporting. >> >> Are capabilities strictly different from "resources" (transform needs python >> package X) or "execution hints" (e.g. deploy on machines that have GPUs, >> some generic but mostly runner specific hints)? At first glance I would say >> yes. >> >> Agreed. >> >>
