This is not exactly a problem. Having the runner to explicitly declare its capabilities is of course a possibility. But do we want to modify each runner, in case all the functionality is actually provided by a common library? This is about usability. I can imagine (and easier if we can provide these requirements/capabilities) a runner that it completely decoupled from development of core. Then, runner maintainers might not even follow closely the development of core. They might not know about a new actual requirement of a pipeline, and it would seem weird to force adding new capability without actually changing a line of code.

I'm not saying we have to implement this "automatic capabilities propagation" in the first iteration. It would be just nice not to close doors somewhere. Adding capabilities actually implemented common library in an automated way would be more convenient. Although it might add unnecessary complexity, so this has to be carefully designed and discussed if this feature would be worth it. From the top of my head, it might be possible to export an enum representing the common capabilities and let runner declare a switch statement, returning boolean marking the feature supported/unsopported. That way, compiler would at least notify runner maintainers that there was something added to the library, that they might want to export. But there will be better solutions, for sure.

 Jan

On 2/21/20 7:34 AM, Kenneth Knowles wrote:
Good question. My last sentence was not clear. We do not need to automatically propagate the capabilities offered by runners-core to a particular runner. The runner can (and should) own the claim of what its capabilities are.

Kenn

On Thu, Feb 20, 2020 at 10:05 PM Luke Cwik <lc...@google.com <mailto:lc...@google.com>> wrote:

    Which part of the proposal do you think is solving a problem we
    may not have?

    On Thu, Feb 20, 2020 at 8:19 PM Kenneth Knowles <k...@apache.org
    <mailto:k...@apache.org>> wrote:

        I would rather say that "runners-core" is a utility library
        with some helpful things. Like other libraries. The runner
        still decides how to use the library. That was the idea,
        anyhow. A runner could have a bunch of "if" statements around
        how it uses some generic runners-core utility, etc. I think at
        this point the proposal is trying to solve a problem we may
        not have.

        Kenn

        On Thu, Feb 20, 2020 at 1:25 PM Jan Lukavský <je...@seznam.cz
        <mailto:je...@seznam.cz>> wrote:


            On 2/20/20 8:24 PM, Robert Bradshaw wrote:
            On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský<je...@seznam.cz>  
<mailto:je...@seznam.cz>  wrote:
            Hi,

            +1 for adding pipeline required features. I think being able to 
reject pipeline with unknown requirement is pretty much needed, mostly because 
that enables runners to completely decouple from SDKs, while being able to 
recognize when a pipeline constructed with incomplatible version of SDK is run.

            I'll add some observations I made when implementing the latest "requires 
time sorted input" addition with regards to this discussion:

              a) the features of pipeline are not simple function of set of 
PTransforms being present in the pipeline, but also depend on (type of) inputs. 
For instance a PTransform might have a simple expansion to primitive 
PTransforms in streaming case, but don't have such expansion in batch case. 
That is to say, runner that doesn't actually know of a specific extension to 
some PTransform _might_ actually execute it correctly under some conditions. 
But _must_ fail in other cases.
            It sounds like what you're getting at here is a Statful ParDo that
            requires "mostly" time sorted input (to keep the amount of state 
held
            bounded) which is somewhat provided (with no bounds given) for
            unbounded PCollections but not at all (in general) for batch. Rather
            than phrase this as a conditional requirement, I would make a new
            requirement "requires mostly time sorted input" (precise definition
            TBD, it's hard to specify or guarantee upper bounds) which a runner
            could then implement via exact time sorted input in batch and but 
more
            cheaply as a no-op in streaming.
            +1, that makes sense. My example was a little incomplete,
            in the sense that, for @RequiresTimeSortedInput does not
            have any requirements on runner in streaming case, with
            one exception - the runner must be compiled with the
            newest runners-core. That brings us to the fact, that
            runners capabilities are actually not just function of the
            runner's code, but also code that is imported from
            runners-core. There probably should be a way for the core
            to export its capabilities (e.g. provides:
            beam:requirement:pardo:time_sorted_input:streaming:v1),
            which should then be united with capabilities of the
            runner itself. That way a runner which uses runners-core
            (and StatefulDoFnRunner, that is a complication, not sure
            how to deal with that), could be made able to satify
            'beam:requirement:pardo:time_sorted_input:streaming:v1'
            simply by recompiling the runner with newest core.
              b) it would be good if this feature would work independently of 
portability (for Java SDK). We still have (at least two) non-portable runners 
that are IMO widely used in production and are likely to last for some time.
            Yes. As mentioned, we can still convert to portability to do such
            analysis even if we don't use it for execution.

              c) we can take advantage of these pipeline features to get rid of 
the categories of @ValidatesRunner tests, because we could have just simply 
@ValidatesRunner and each test would be matched against runner capabilities 
(i.e. a runner would be tested with given test if and only if it would not 
reject it)
            +1

            Jan

            On 2/13/20 8:42 PM, Robert Burke wrote:

            +1 to deferring for now. Since they should not be modified after 
adoption, it makes sense not to get ahead of ourselves.

            On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw<rober...@google.com>  
<mailto:rober...@google.com>  wrote:
            On Thu, Feb 13, 2020 at 10:12 AM Robert Burke<rob...@frantil.com>  
<mailto:rob...@frantil.com>  wrote:
            One thing that doesn't appear to have been suggested yet is we could "batch" 
urns together under a "super urn" so that adding one super urn is like adding each of the 
represented batch of features. This prevents needing to send dozens of urns to be individually sent 
over.


            The super urns would need to be static after definition to avoid 
mismatched definitions down the road.

            We collect together urns what is reasonably consider "vX" support, 
and can then increment that later.

            This would simplify new SDKs, as they can have a goal of initial v1 
support as we define what level of feature support it has, and doesn't prevent 
new capabilities from being added incrementally.
            Yes, this is a very good idea. I've also been thinking of certain 
sets
            of common operations/well known DoFns that often occur on opposite
            sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) 
that
            are commonly supported that could be grouped under these meta-urns.

            Note that these need not be monotonic, for example a current v1 
might
            be requiring LengthPrefixCoderV1, but if a more efficient
            LengthPrefixCoderV2 comes along eventually v2 could require that and
            *not* require the old, now rarely used LengthPrefixCoderV1.

            Probably makes sense to defer adding such super-urns until we 
notice a
            set that is commonly used together in practice.

            Of course there's still value in SDKs being able to support features
            piecemeal as well, which is the big reason we're avoiding a simple
            monotonically-increasing version number.

            Similarly, certain features sets could stand alone, eg around SQL. 
It's benefitial for optimization reasons if an SDK has native projection and 
UDF support for example, which a runner could take advantage of by avoiding 
extra cross language hops. These could then also be grouped under a SQL super 
urn.

            This is from the SDK capability side of course, rather than the SDK 
pipeline requirements side.

            -------
            Related to that last point, it might be good to nail down early the perspective used when discussing these 
things, as there's a dual between "what and SDK can do", and "what the runner will do to a pipeline that 
the SDK can understand" (eg. Combiner lifting, and state backed iterables), as well as "what the pipeline 
requires from the runner" and "what the runner is able to do" (eg. Requires sorted input)


            On Thu, Feb 13, 2020, 9:06 AM Luke Cwik<lc...@google.com>  
<mailto:lc...@google.com>  wrote:
            On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles<k...@apache.org>  
<mailto:k...@apache.org>  wrote:
            On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw<rober...@google.com>  
<mailto:rober...@google.com>  wrote:
            On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik<lc...@google.com>  
<mailto:lc...@google.com>  wrote:
            We can always detect on the runner/SDK side whether there is an 
unknown field[1] within a payload and fail to process it but this is painful in 
two situations:
            1) It doesn't provide for a good error message since you can't say 
what the purpose of the field is. With a capability URN, the runner/SDK could 
say which URN it doesn't understand.
            2) It doesn't allow for the addition of fields which don't impact 
semantics of execution. For example, if the display data feature was being 
developed, a runner could ignore it and still execute the pipeline correctly.
            Yeah, I don't think proto reflection is a flexible enough tool to do
            this well either.

            If we think this to be common enough, we can add capabilities list 
to the PTransform so each PTransform can do this and has a natural way of being 
extended for additions which are forwards compatible. The alternative to having 
capabilities on PTransform (and other constructs) is that we would have a new 
URN when the specification of the transform changes. For forwards compatible 
changes, each SDK/runner would map older versions of the URN onto the latest 
and internally treat it as the latest version but always downgrade it to the 
version the other party expects when communicating with it. Backwards 
incompatible changes would always require a new URN which capabilities at the 
PTransform level would not help with.
            As you point out, stateful+splittable may not be a particularly 
useful
            combination, but as another example, we have
            (backwards-incompatible-when-introduced) markers on DoFn as to 
whether
            it requires finalization, stable inputs, and now time sorting. I 
don't
            think we should have a new URN for each combination.
            Agree with this. I don't think stateful, splittable, and "plain" 
ParDo are comparable to these. Each is an entirely different computational paradigm: 
per-element independent processing, per-key-and-window linear processing, and 
per-element-and-restriction splittable processing. Most relevant IMO is the nature of the 
parallelism. If you added state to splittable processing, it would still be splittable 
processing. Just as Combine and ParDo can share the SideInput specification, it is easy 
to share relevant sub-structures like state declarations. But it is a fair point that the 
ability to split can be ignored and run as a plain-old ParDo. It brings up the question 
of whether a runner that doesn't know SDF is should have to reject it or should be 
allowed to run poorly.
            Being splittable means that the SDK could choose to return a 
continuation saying please process the rest of my element in X amount of time 
which would require the runner to inspect certain fields on responses. One 
example would be I don't have many more messages to read from this message 
stream at the moment and another example could be that I detected that this 
filesystem is throttling me or is down and I would like to resume processing 
later.

            It isn't a huge deal. Three different top-level URNS versus three different 
sub-URNs will achieve the same result in the end if we get this "capability" 
thing in place.

            Kenn

            I do think that splittable ParDo and stateful ParDo should have separate 
PTransform URNs since they are different paradigms than "vanilla" ParDo.
            Here I disagree. What about one that is both splittable and 
stateful? Would one have a fourth URN for that? If/when another flavor of DoFn 
comes out, would we then want 8 distinct URNs? (SplitableParDo in particular 
can be executed as a normal ParDo as long as the output is bounded.)
            I agree that you could have stateful and splittable dofns where the 
element is the key and you share state and timers across restrictions. No 
runner is capable of executing this efficiently.

            On the SDK requirements side: the constructing SDK owns the 
Environment proto completely, so it is in a position to ensure the involved 
docker images support the necessary features.
            Yes.
            I believe capabilities do exist on a Pipeline and it informs runners about 
new types of fields to be aware of either within Components or on the Pipeline object 
itself but for this discussion it makes sense that an environment would store most 
"capabilities" related to execution.

            [snip]
            As for the proto clean-ups, the scope is to cover almost all things 
needed for execution now and to follow-up with optional transforms, payloads, 
and coders later which would exclude job managment APIs and artifact staging. A 
formal enumeration would be useful here. Also, we should provide formal 
guidance about adding new fields, adding new types of transforms, new types of 
proto messages, ... (best to describe this on a case by case basis as to how 
people are trying to modify the protos and evolve this guidance over time).
            What we need is the ability for (1) runners to reject future 
pipelines
            they cannot faithfully execute and (2) runners to be able to take
            advantage of advanced features/protocols when interacting with those
            SDKs that understand them while avoiding them for older (or newer)
            SDKs that don't. Let's call (1) (hard) requirements and (2) 
(optional)
            capabilities.

            Where possible, I think this is best expressed inherently in the set
            of transform (and possibly other component) URNs. For example, when 
an
            SDK uses a combine_per_key composite, that's a signal that it
            understands the various related combine_* transforms. Similarly, a
            pipeline with a test_stream URN would be rejected by pipelines not
            recognizing/supporting this primitive. However, this is not always
            possible, e.g. for (1) we have the aforementioned boolean flags on
            ParDo and for (2) we have features like large iterable and progress
            support.

            For (1) we have to enumerate now everywhere a runner must look a far
            into the future as we want to remain backwards compatible. This is 
why
            I suggested putting something on the pipeline itself, but we could
            (likely in addition) add it to Transform and/or ParDoPayload if we
            think that'd be useful now. (Note that a future pipeline-level
            requirement could be "inspect (previously non-existent) requirements
            field attached to objects of type X.")

            For (2) I think adding a capabilities field to the environment for 
now
            makes the most sense, and as it's optional to inspect them adding it
            elsewhere if needed is backwards compatible. (The motivation to do 
it
            now is that there are some capabilities that we'd like to enumerate
            now rather than make part of the minimal set of things an SDK must
            support.)

            Agree on the separation of requirements from capabilities where 
requirements is a set of MUST understand while capabilities are a set of MAY 
understand.

            All in all, I think "capabilities" is about informing a runner about what they should 
know about and what they are allowed to do. If we go with a list of "capabilities", we could always 
add a "parameterized capabilities" urn which would tell runners they need to also look at some 
other field.
            Good point. That lets us keep it as a list for now. (The risk is 
that
            it makes possible the bug of populating parameters without adding 
the
            required notification to the list.)

            I also believe capabilities should NOT be "inherited". For example if we define 
capabilities on a ParDoPayload, and on a PTransform and on Environment, then ParDoPayload capabilities 
shouldn't be copied to PTransform and PTransform specific capabilities shouldn't be copied to the 
Environment. My reasoning about this is that some "capabilities" can only be scoped to a single 
ParDoPayload or a single PTransform and wouldn't apply generally everywhere. The best example I could think 
of is that Environment A supports progress reporting while Environment B doesn't so it wouldn't have made 
sense to say the "Pipeline" supports progress reporting.

            Are capabilities strictly different from "resources" (transform needs python 
package X) or "execution hints" (e.g. deploy on machines that have GPUs, some generic but 
mostly runner specific hints)? At first glance I would say yes.
            Agreed.

Reply via email to