Cool, thanks. :-)

Jan

On 2/14/20 11:35 PM, Robert Bradshaw wrote:
Oh, sorry. Try it again
https://docs.google.com/document/d/1CyVElQDYHBRfXu6k1VSXv3Yok_4r8c4V0bkh2nFAWYc/edit?usp=sharing

On Fri, Feb 14, 2020 at 2:04 PM Jan Lukavský <je...@seznam.cz> wrote:
Hi Robert,

the doc seems to be locked.

Jan

On 2/14/20 10:56 PM, Robert Bradshaw wrote:
I created https://github.com/apache/beam/pull/10873 to add the minimal
set of fields to the existing protos, and also created
https://docs.google.com/document/d/1CyVElQDYHBRfXu6k1VSXv3Yok_4r8c4V0bkh2nFAWYc/edit
to start enumerating some of the URNs we may want to have. It would be
a good milestone to get this in by the release next week.

On Thu, Feb 13, 2020 at 5:14 PM Kyle Weaver <kcwea...@google.com> wrote:
we can take advantage of these pipeline features to get rid of the categories 
of @ValidatesRunner tests, because we could have just simply @ValidatesRunner 
and each test would be matched against runner capabilities
+1, I think the potential to formally integrate our idea of compatibility and 
unit testing is a big advantage of this proposal. Also, when deciding where to 
draw lines between different URNs, it may help to look at the existing 
validates runner test categories, which are currently the most accurate signal 
we have regarding a runner's capabilities.

On Thu, Feb 13, 2020 at 4:04 PM Robert Burke <rob...@frantil.com> wrote:
Wrt per DoFn/ParDo level, there's the similar case of wether the DoFn has an 
Urn for requiring something or it's an annotation for saying the DoFn provides 
something (eg. Provides K-anonymization with k defined)

The general theme of this thread seems to be trying to ensure a runner can 
reject a pipeline if it's not able to provide the right guarantees, so that 
latter case isn't handled.

Eg. The latter provisions could be used to analyze a pipeline to ensure the 
outputs are all properly anonymized to a certain degree at construction time.

On Thu, Feb 13, 2020, 2:26 PM Kenneth Knowles <k...@apache.org> wrote:

On Thu, Feb 13, 2020 at 12:42 PM Jan Lukavský <je...@seznam.cz> wrote:
Hi,

+1 for adding pipeline required features. I think being able to reject pipeline 
with unknown requirement is pretty much needed, mostly because that enables 
runners to completely decouple from SDKs, while being able to recognize when a 
pipeline constructed with incomplatible version of SDK is run.

I'll add some observations I made when implementing the latest "requires time sorted 
input" addition with regards to this discussion:

   a) the features of pipeline are not simple function of set of PTransforms 
being present in the pipeline, but also depend on (type of) inputs. For 
instance a PTransform might have a simple expansion to primitive PTransforms in 
streaming case, but don't have such expansion in batch case. That is to say, 
runner that doesn't actually know of a specific extension to some PTransform 
_might_ actually execute it correctly under some conditions. But _must_ fail in 
other cases.

   b) it would be good if this feature would work independently of portability 
(for Java SDK). We still have (at least two) non-portable runners that are IMO 
widely used in production and are likely to last for some time.
I think even if these runners keep their execution not using portability, they 
should migrate to use the portable pipeline definition. Then they can share the 
same model w/ runners that execute using portability. The Fn API is not 
required to be used as long as the runner implements the semantics of the 
pipeline.

Kenn

   c) we can take advantage of these pipeline features to get rid of the 
categories of @ValidatesRunner tests, because we could have just simply 
@ValidatesRunner and each test would be matched against runner capabilities 
(i.e. a runner would be tested with given test if and only if it would not 
reject it)

Jan

On 2/13/20 8:42 PM, Robert Burke wrote:

+1 to deferring for now. Since they should not be modified after adoption, it 
makes sense not to get ahead of ourselves.

On Thu, Feb 13, 2020, 10:59 AM Robert Bradshaw <rober...@google.com> wrote:
On Thu, Feb 13, 2020 at 10:12 AM Robert Burke <rob...@frantil.com> wrote:
One thing that doesn't appear to have been suggested yet is we could "batch" urns 
together under a "super urn" so that adding one super urn is like adding each of the 
represented batch of features. This prevents needing to send dozens of urns to be individually sent 
over.


The super urns would need to be static after definition to avoid mismatched 
definitions down the road.

We collect together urns what is reasonably consider "vX" support, and can then 
increment that later.

This would simplify new SDKs, as they can have a goal of initial v1 support as 
we define what level of feature support it has, and doesn't prevent new 
capabilities from being added incrementally.
Yes, this is a very good idea. I've also been thinking of certain sets
of common operations/well known DoFns that often occur on opposite
sides of GBKs (e.g. the pair-with-one, sum-ints, drop-keys, ...) that
are commonly supported that could be grouped under these meta-urns.

Note that these need not be monotonic, for example a current v1 might
be requiring LengthPrefixCoderV1, but if a more efficient
LengthPrefixCoderV2 comes along eventually v2 could require that and
*not* require the old, now rarely used LengthPrefixCoderV1.

Probably makes sense to defer adding such super-urns until we notice a
set that is commonly used together in practice.

Of course there's still value in SDKs being able to support features
piecemeal as well, which is the big reason we're avoiding a simple
monotonically-increasing version number.

Similarly, certain features sets could stand alone, eg around SQL. It's 
benefitial for optimization reasons if an SDK has native projection and UDF 
support for example, which a runner could take advantage of by avoiding extra 
cross language hops. These could then also be grouped under a SQL super urn.

This is from the SDK capability side of course, rather than the SDK pipeline 
requirements side.

-------
Related to that last point, it might be good to nail down early the perspective used when discussing these things, as 
there's a dual between "what and SDK can do", and "what the runner will do to a pipeline that the SDK 
can understand" (eg. Combiner lifting, and state backed iterables), as well as "what the pipeline requires 
from the runner" and "what the runner is able to do" (eg. Requires sorted input)


On Thu, Feb 13, 2020, 9:06 AM Luke Cwik <lc...@google.com> wrote:

On Wed, Feb 12, 2020 at 2:24 PM Kenneth Knowles <k...@apache.org> wrote:

On Wed, Feb 12, 2020 at 12:04 PM Robert Bradshaw <rober...@google.com> wrote:
On Wed, Feb 12, 2020 at 11:08 AM Luke Cwik <lc...@google.com> wrote:
We can always detect on the runner/SDK side whether there is an unknown 
field[1] within a payload and fail to process it but this is painful in two 
situations:
1) It doesn't provide for a good error message since you can't say what the 
purpose of the field is. With a capability URN, the runner/SDK could say which 
URN it doesn't understand.
2) It doesn't allow for the addition of fields which don't impact semantics of 
execution. For example, if the display data feature was being developed, a 
runner could ignore it and still execute the pipeline correctly.
Yeah, I don't think proto reflection is a flexible enough tool to do
this well either.

If we think this to be common enough, we can add capabilities list to the 
PTransform so each PTransform can do this and has a natural way of being 
extended for additions which are forwards compatible. The alternative to having 
capabilities on PTransform (and other constructs) is that we would have a new 
URN when the specification of the transform changes. For forwards compatible 
changes, each SDK/runner would map older versions of the URN onto the latest 
and internally treat it as the latest version but always downgrade it to the 
version the other party expects when communicating with it. Backwards 
incompatible changes would always require a new URN which capabilities at the 
PTransform level would not help with.
As you point out, stateful+splittable may not be a particularly useful
combination, but as another example, we have
(backwards-incompatible-when-introduced) markers on DoFn as to whether
it requires finalization, stable inputs, and now time sorting. I don't
think we should have a new URN for each combination.
Agree with this. I don't think stateful, splittable, and "plain" ParDo are 
comparable to these. Each is an entirely different computational paradigm: per-element 
independent processing, per-key-and-window linear processing, and 
per-element-and-restriction splittable processing. Most relevant IMO is the nature of the 
parallelism. If you added state to splittable processing, it would still be splittable 
processing. Just as Combine and ParDo can share the SideInput specification, it is easy 
to share relevant sub-structures like state declarations. But it is a fair point that the 
ability to split can be ignored and run as a plain-old ParDo. It brings up the question 
of whether a runner that doesn't know SDF is should have to reject it or should be 
allowed to run poorly.
Being splittable means that the SDK could choose to return a continuation 
saying please process the rest of my element in X amount of time which would 
require the runner to inspect certain fields on responses. One example would be 
I don't have many more messages to read from this message stream at the moment 
and another example could be that I detected that this filesystem is throttling 
me or is down and I would like to resume processing later.

It isn't a huge deal. Three different top-level URNS versus three different sub-URNs will 
achieve the same result in the end if we get this "capability" thing in place.

Kenn

I do think that splittable ParDo and stateful ParDo should have separate PTransform URNs 
since they are different paradigms than "vanilla" ParDo.
Here I disagree. What about one that is both splittable and stateful? Would one 
have a fourth URN for that? If/when another flavor of DoFn comes out, would we 
then want 8 distinct URNs? (SplitableParDo in particular can be executed as a 
normal ParDo as long as the output is bounded.)
I agree that you could have stateful and splittable dofns where the element is 
the key and you share state and timers across restrictions. No runner is 
capable of executing this efficiently.

On the SDK requirements side: the constructing SDK owns the Environment proto 
completely, so it is in a position to ensure the involved docker images support 
the necessary features.
Yes.
I believe capabilities do exist on a Pipeline and it informs runners about new types of 
fields to be aware of either within Components or on the Pipeline object itself but for 
this discussion it makes sense that an environment would store most 
"capabilities" related to execution.

[snip]
As for the proto clean-ups, the scope is to cover almost all things needed for 
execution now and to follow-up with optional transforms, payloads, and coders 
later which would exclude job managment APIs and artifact staging. A formal 
enumeration would be useful here. Also, we should provide formal guidance about 
adding new fields, adding new types of transforms, new types of proto messages, 
... (best to describe this on a case by case basis as to how people are trying 
to modify the protos and evolve this guidance over time).
What we need is the ability for (1) runners to reject future pipelines
they cannot faithfully execute and (2) runners to be able to take
advantage of advanced features/protocols when interacting with those
SDKs that understand them while avoiding them for older (or newer)
SDKs that don't. Let's call (1) (hard) requirements and (2) (optional)
capabilities.

Where possible, I think this is best expressed inherently in the set
of transform (and possibly other component) URNs. For example, when an
SDK uses a combine_per_key composite, that's a signal that it
understands the various related combine_* transforms. Similarly, a
pipeline with a test_stream URN would be rejected by pipelines not
recognizing/supporting this primitive. However, this is not always
possible, e.g. for (1) we have the aforementioned boolean flags on
ParDo and for (2) we have features like large iterable and progress
support.

For (1) we have to enumerate now everywhere a runner must look a far
into the future as we want to remain backwards compatible. This is why
I suggested putting something on the pipeline itself, but we could
(likely in addition) add it to Transform and/or ParDoPayload if we
think that'd be useful now. (Note that a future pipeline-level
requirement could be "inspect (previously non-existent) requirements
field attached to objects of type X.")

For (2) I think adding a capabilities field to the environment for now
makes the most sense, and as it's optional to inspect them adding it
elsewhere if needed is backwards compatible. (The motivation to do it
now is that there are some capabilities that we'd like to enumerate
now rather than make part of the minimal set of things an SDK must
support.)

Agree on the separation of requirements from capabilities where requirements is 
a set of MUST understand while capabilities are a set of MAY understand.

All in all, I think "capabilities" is about informing a runner about what they should know about 
and what they are allowed to do. If we go with a list of "capabilities", we could always add a 
"parameterized capabilities" urn which would tell runners they need to also look at some other 
field.
Good point. That lets us keep it as a list for now. (The risk is that
it makes possible the bug of populating parameters without adding the
required notification to the list.)

I also believe capabilities should NOT be "inherited". For example if we define capabilities on a 
ParDoPayload, and on a PTransform and on Environment, then ParDoPayload capabilities shouldn't be copied to 
PTransform and PTransform specific capabilities shouldn't be copied to the Environment. My reasoning about 
this is that some "capabilities" can only be scoped to a single ParDoPayload or a single PTransform 
and wouldn't apply generally everywhere. The best example I could think of is that Environment A supports 
progress reporting while Environment B doesn't so it wouldn't have made sense to say the "Pipeline" 
supports progress reporting.

Are capabilities strictly different from "resources" (transform needs python package X) 
or "execution hints" (e.g. deploy on machines that have GPUs, some generic but mostly 
runner specific hints)? At first glance I would say yes.
Agreed.

Reply via email to