Here's the first draft: https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing

It's rather high-level. We may want to add more details once we have finalized the design. Feel free to make comments and edits.

All of this goes back to the idea that I think the listing of
artifacts (or more general dependencies) should be a property of the
environment themselves.

+1 I came to the same conclusion while thinking about how to store artifact information for deferred execution of the pipeline.

-Max

On 07.05.19 18:10, Robert Bradshaw wrote:
Looking forward to your writeup, Max. In the meantime, some comments below.


From: Lukasz Cwik <lc...@google.com>
Date: Thu, May 2, 2019 at 6:45 PM
To: dev



On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw <rober...@google.com> wrote:

On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik <lc...@google.com> wrote:

We should stick with URN + payload + artifact metadata[1] where the only mandatory one 
that all SDKs and expansion services understand is the "bytes" artifact type. 
This allows us to add optional URNs for file://, http://, Maven, PyPi, ... in the future. 
I would make the artifact staging service use the same URN + payload mechanism to get 
compatibility of artifacts across the different services and also have the artifact 
staging service be able to be queried for the list of artifact types it supports.

+1

Finally, we would need to have environments enumerate the artifact types that 
they support.

Meaning at runtime, or as another field statically set in the proto?


I don't believe runners/SDKs should have to know what artifacts each environment supports 
at runtime and instead have environments enumerate them explicitly in the proto. I have 
been thinking about a more general "capabilities" block on environments which 
allow them to enumerate URNs that the environment understands. This would include 
artifact type URNs, PTransform URNs, coder URNs, ... I haven't proposed anything specific 
down this line yet because I was wondering how environment resources (CPU, min memory, 
hardware like GPU, AWS/GCP/Azure/... machine types) should/could tie into this.


Having everyone have the same "artifact" representation would be beneficial 
since:
a) Python environments could install dependencies from a requirements.txt file 
(something that the Google Cloud Dataflow Python docker container allows for 
today)
b) It provides an extensible and versioned mechanism for SDKs, environments, 
and artifact staging/retrieval services to support additional artifact types
c) Allow for expressing a canonical representation of an artifact like a Maven 
package so a runner could merge environments that the runner deems compatible.

The flow I could see is:
1) (optional) query artifact staging service for supported artifact types
2) SDK request expansion service to expand transform passing in a list of 
artifact types the SDK and artifact staging service support, the expansion 
service returns a list of artifact types limited to those supported types + any 
supported by the environment

The crux of the issue seems to be how the expansion service returns
the artifacts themselves. Is this going with the approach that the
caller of the expansion service must host an artifact staging service?


The caller would not need to host an artifact staging service (but would become 
effectively a proxy service, see my comment below for more details) as I would 
have expected this to be part of the expansion service response.


There is also the question here is how the returned artifacts get
attached to the various environments, or whether they get implicitly
applied to all returned stages (which need not have a consistent
environment)?


I would suggest returning additional information that says what artifact is for 
which environment. Applying all artifacts to all environments is likely to 
cause issues since some environments may not understand certain artifact types 
or may get conflicting versions of artifacts. I would see this happening since 
an expansion service that aggregates other expansion services seems likely, for 
example:
                              /-> ExpansionSerivce(Python)
ExpansionService(Aggregator) --> ExpansionService(Java)
                              \-> ExpansionSerivce(Go)

All of this goes back to the idea that I think the listing of
artifacts (or more general dependencies) should be a property of the
environment themselves.

3) SDK converts any artifact types that the artifact staging service or environment 
doesn't understand, e.g. pulls down Maven dependencies and converts them to 
"bytes" artifacts

Here I think we're conflating two things. The "type" of an artifact is
both (1) how to fetch the bytes and (2) how to interpret them (e.g. is
this a jar file, or a pip tarball, or just some data needed by a DoFn,
or ...) Only (1) can be freely transmuted.


Your right. Thinking about this some more, general artifact conversion is 
unlikely to be practical because how to interpret an artifact is environment 
dependent. For example, a requirements.txt used to install pip packages for a 
Python docker container depends on the filesystem layout of that specific 
docker container. One could simulate doing a pip install on the same 
filesystem, see the diff and then of all the packages in requirements.txt but 
this quickly becomes impractical.


4) SDK sends artifacts to artifact staging service
5) Artifact staging service converts any artifacts to types that the 
environment understands
6) Environment is started and gets artifacts from the artifact retrieval 
service.

On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw <rober...@google.com> wrote:

On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels <m...@apache.org> wrote:

Good idea to let the client expose an artifact staging service that the
ExpansionService could use to stage artifacts. This solves two problems:

(1) The Expansion Service not being able to access the Job Server
artifact staging service
(2) The client not having access to the dependencies returned by the
Expansion Server

The downside is that it adds an additional indirection. The alternative
to let the client handle staging the artifacts returned by the Expansion
Server is more transparent and easier to implement.

The other downside is that it may not always be possible for the
expansion service to connect to the artifact staging service (e.g.
when constructing a pipeline locally against a remote expansion
service).

Just to make sure, your saying the expansion service would return all the 
artifacts (bytes, urls, ...) as part of the response since the expansion 
service wouldn't be able to connect to the SDK that is running locally either.

Yes. Well, more I'm asking how the expansion service would return any
artifacts.

What we have is

Runner <--- SDK ---> Expansion service.

Where the unidirectional arrow means "instantiates a connection with"
and the other direction (and missing arrows) may not be possible.


I believe the ExpansionService Expand request should become a unidirectional 
stream back to the caller so that artifacts could be sent back to the SDK 
(effectively mirroring the artifact staging service API). So the expansion 
response would stream back a bunch artifact data messages and also the 
expansion response containing PTransform information.

+1.

Ideally, the Expansion Service won't return any dependencies because the
environment already contains the required dependencies. We could make it
a requirement for the expansion to be performed inside an environment.
Then we would already ensure during expansion time that the runtime
dependencies are available.

Yes, it's cleanest if the expansion service provides an environment
without all the dependencies provided. Interesting idea to make this a
property of the expansion service itself.

I had thought this too but an opaque docker container that was built on top of 
a base Beam docker container would be very difficult for a runner to introspect 
and check to see if its compatible to allow for fusion across PTransforms. I 
think artifacts need to be communicated in their canonical representation.

It's clean (from the specification point of view), but doesn't allow
for good introspection/fusion (aside from one being a base of another,
perhaps).

In this case, the runner would (as
requested by its configuration) be free to merge environments it
deemed compatible, including swapping out beam-java-X for
beam-java-embedded if it considers itself compatible with the
dependency list.

Could you explain how that would work in practice?

Say one has a pipeline with environments

A: beam-java-sdk-2.12-docker
B: beam-java-sdk-2.12-docker + dep1
C: beam-java-sdk-2.12-docker + dep2
D: beam-java-sdk-2.12-docker + dep3

A runner could (conceivably) be intelligent enough to know that dep1
and dep2 are indeed compatible, and run A, B, and C in a single
beam-java-sdk-2.12-docker + dep1 + dep2 environment (with the
corresponding fusion and lower overhead benefits). If a certain
pipeline option is set, it might further note that dep1 and dep2 are
compatible with its own workers, which are build against sdk-2.12, and
choose to run these in embedded + dep1 + dep2 environment.

We have been talking about the expansion service and cross language transforms a lot 
lately but I believe it will initially come at the cost of poor fusion of transforms 
since "merging" environments that are compatible is a difficult problem since 
it brings up many of the dependency management issues (e.g. diamond dependency issues).

I agree. I think expansion services offering "kitchen-sink"
containers, when possible, can go far here. If we could at least
recognize when one environment/set of deps is a superset of another,
that could be an easy case that would yield a lot of benefit as well.


+1

Reply via email to