Hi, Is anyone actively working on artifact staging extension for cross-language pipelines? I'm thinking I can contribute to it in coming Dec. If anyone has any progress on this and needs help, please let me know.
Thanks, On Wed, Jun 12, 2019 at 2:42 AM Ismaël Mejía <ieme...@gmail.com> wrote: > Can you please add this to the design documents webpage. > https://beam.apache.org/contribute/design-documents/ > > On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath <chamik...@google.com> > wrote: > > > > > > > > On Tue, May 7, 2019 at 10:21 AM Maximilian Michels <m...@apache.org> > wrote: > >> > >> Here's the first draft: > >> > https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing > >> > >> It's rather high-level. We may want to add more details once we have > >> finalized the design. Feel free to make comments and edits. > > > > > > Thanks Max. Added some comments. > > > >> > >> > >> > All of this goes back to the idea that I think the listing of > >> > artifacts (or more general dependencies) should be a property of the > >> > environment themselves. > >> > >> +1 I came to the same conclusion while thinking about how to store > >> artifact information for deferred execution of the pipeline. > >> > >> -Max > >> > >> On 07.05.19 18:10, Robert Bradshaw wrote: > >> > Looking forward to your writeup, Max. In the meantime, some comments > below. > >> > > >> > > >> > From: Lukasz Cwik <lc...@google.com> > >> > Date: Thu, May 2, 2019 at 6:45 PM > >> > To: dev > >> > > >> >> > >> >> > >> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw <rober...@google.com> > wrote: > >> >>> > >> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik <lc...@google.com> > wrote: > >> >>>> > >> >>>> We should stick with URN + payload + artifact metadata[1] where > the only mandatory one that all SDKs and expansion services understand is > the "bytes" artifact type. This allows us to add optional URNs for file://, > http://, Maven, PyPi, ... in the future. I would make the artifact > staging service use the same URN + payload mechanism to get compatibility > of artifacts across the different services and also have the artifact > staging service be able to be queried for the list of artifact types it > supports. > >> >>> > >> >>> +1 > >> >>> > >> >>>> Finally, we would need to have environments enumerate the artifact > types that they support. > >> >>> > >> >>> Meaning at runtime, or as another field statically set in the proto? > >> >> > >> >> > >> >> I don't believe runners/SDKs should have to know what artifacts each > environment supports at runtime and instead have environments enumerate > them explicitly in the proto. I have been thinking about a more general > "capabilities" block on environments which allow them to enumerate URNs > that the environment understands. This would include artifact type URNs, > PTransform URNs, coder URNs, ... I haven't proposed anything specific down > this line yet because I was wondering how environment resources (CPU, min > memory, hardware like GPU, AWS/GCP/Azure/... machine types) should/could > tie into this. > >> >> > >> >>> > >> >>>> Having everyone have the same "artifact" representation would be > beneficial since: > >> >>>> a) Python environments could install dependencies from a > requirements.txt file (something that the Google Cloud Dataflow Python > docker container allows for today) > >> >>>> b) It provides an extensible and versioned mechanism for SDKs, > environments, and artifact staging/retrieval services to support additional > artifact types > >> >>>> c) Allow for expressing a canonical representation of an artifact > like a Maven package so a runner could merge environments that the runner > deems compatible. > >> >>>> > >> >>>> The flow I could see is: > >> >>>> 1) (optional) query artifact staging service for supported > artifact types > >> >>>> 2) SDK request expansion service to expand transform passing in a > list of artifact types the SDK and artifact staging service support, the > expansion service returns a list of artifact types limited to those > supported types + any supported by the environment > >> >>> > >> >>> The crux of the issue seems to be how the expansion service returns > >> >>> the artifacts themselves. Is this going with the approach that the > >> >>> caller of the expansion service must host an artifact staging > service? > >> >> > >> >> > >> >> The caller would not need to host an artifact staging service (but > would become effectively a proxy service, see my comment below for more > details) as I would have expected this to be part of the expansion service > response. > >> >> > >> >>> > >> >>> There is also the question here is how the returned artifacts get > >> >>> attached to the various environments, or whether they get implicitly > >> >>> applied to all returned stages (which need not have a consistent > >> >>> environment)? > >> >> > >> >> > >> >> I would suggest returning additional information that says what > artifact is for which environment. Applying all artifacts to all > environments is likely to cause issues since some environments may not > understand certain artifact types or may get conflicting versions of > artifacts. I would see this happening since an expansion service that > aggregates other expansion services seems likely, for example: > >> >> /-> ExpansionSerivce(Python) > >> >> ExpansionService(Aggregator) --> ExpansionService(Java) > >> >> \-> ExpansionSerivce(Go) > >> > > >> > All of this goes back to the idea that I think the listing of > >> > artifacts (or more general dependencies) should be a property of the > >> > environment themselves. > >> > > >> >>>> 3) SDK converts any artifact types that the artifact staging > service or environment doesn't understand, e.g. pulls down Maven > dependencies and converts them to "bytes" artifacts > >> >>> > >> >>> Here I think we're conflating two things. The "type" of an artifact > is > >> >>> both (1) how to fetch the bytes and (2) how to interpret them (e.g. > is > >> >>> this a jar file, or a pip tarball, or just some data needed by a > DoFn, > >> >>> or ...) Only (1) can be freely transmuted. > >> >> > >> >> > >> >> Your right. Thinking about this some more, general artifact > conversion is unlikely to be practical because how to interpret an artifact > is environment dependent. For example, a requirements.txt used to install > pip packages for a Python docker container depends on the filesystem layout > of that specific docker container. One could simulate doing a pip install > on the same filesystem, see the diff and then of all the packages in > requirements.txt but this quickly becomes impractical. > >> >> > >> >>> > >> >>>> 4) SDK sends artifacts to artifact staging service > >> >>>> 5) Artifact staging service converts any artifacts to types that > the environment understands > >> >>>> 6) Environment is started and gets artifacts from the artifact > retrieval service. > >> >>>> > >> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw < > rober...@google.com> wrote: > >> >>>>> > >> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels < > m...@apache.org> wrote: > >> >>>>>> > >> >>>>>> Good idea to let the client expose an artifact staging service > that the > >> >>>>>> ExpansionService could use to stage artifacts. This solves two > problems: > >> >>>>>> > >> >>>>>> (1) The Expansion Service not being able to access the Job Server > >> >>>>>> artifact staging service > >> >>>>>> (2) The client not having access to the dependencies returned by > the > >> >>>>>> Expansion Server > >> >>>>>> > >> >>>>>> The downside is that it adds an additional indirection. The > alternative > >> >>>>>> to let the client handle staging the artifacts returned by the > Expansion > >> >>>>>> Server is more transparent and easier to implement. > >> >>>>> > >> >>>>> The other downside is that it may not always be possible for the > >> >>>>> expansion service to connect to the artifact staging service (e.g. > >> >>>>> when constructing a pipeline locally against a remote expansion > >> >>>>> service). > >> >>>> > >> >>>> Just to make sure, your saying the expansion service would return > all the artifacts (bytes, urls, ...) as part of the response since the > expansion service wouldn't be able to connect to the SDK that is running > locally either. > >> >>> > >> >>> Yes. Well, more I'm asking how the expansion service would return > any > >> >>> artifacts. > >> >>> > >> >>> What we have is > >> >>> > >> >>> Runner <--- SDK ---> Expansion service. > >> >>> > >> >>> Where the unidirectional arrow means "instantiates a connection > with" > >> >>> and the other direction (and missing arrows) may not be possible. > >> >> > >> >> > >> >> I believe the ExpansionService Expand request should become a > unidirectional stream back to the caller so that artifacts could be sent > back to the SDK (effectively mirroring the artifact staging service API). > So the expansion response would stream back a bunch artifact data messages > and also the expansion response containing PTransform information. > >> > > >> > +1. > >> > > >> >>>>>> Ideally, the Expansion Service won't return any dependencies > because the > >> >>>>>> environment already contains the required dependencies. We could > make it > >> >>>>>> a requirement for the expansion to be performed inside an > environment. > >> >>>>>> Then we would already ensure during expansion time that the > runtime > >> >>>>>> dependencies are available. > >> >>>>> > >> >>>>> Yes, it's cleanest if the expansion service provides an > environment > >> >>>>> without all the dependencies provided. Interesting idea to make > this a > >> >>>>> property of the expansion service itself. > >> >>>> > >> >>>> I had thought this too but an opaque docker container that was > built on top of a base Beam docker container would be very difficult for a > runner to introspect and check to see if its compatible to allow for fusion > across PTransforms. I think artifacts need to be communicated in their > canonical representation. > >> >>> > >> >>> It's clean (from the specification point of view), but doesn't allow > >> >>> for good introspection/fusion (aside from one being a base of > another, > >> >>> perhaps). > >> >>> > >> >>>>>>> In this case, the runner would (as > >> >>>>>>> requested by its configuration) be free to merge environments it > >> >>>>>>> deemed compatible, including swapping out beam-java-X for > >> >>>>>>> beam-java-embedded if it considers itself compatible with the > >> >>>>>>> dependency list. > >> >>>>>> > >> >>>>>> Could you explain how that would work in practice? > >> >>>>> > >> >>>>> Say one has a pipeline with environments > >> >>>>> > >> >>>>> A: beam-java-sdk-2.12-docker > >> >>>>> B: beam-java-sdk-2.12-docker + dep1 > >> >>>>> C: beam-java-sdk-2.12-docker + dep2 > >> >>>>> D: beam-java-sdk-2.12-docker + dep3 > >> >>>>> > >> >>>>> A runner could (conceivably) be intelligent enough to know that > dep1 > >> >>>>> and dep2 are indeed compatible, and run A, B, and C in a single > >> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment (with the > >> >>>>> corresponding fusion and lower overhead benefits). If a certain > >> >>>>> pipeline option is set, it might further note that dep1 and dep2 > are > >> >>>>> compatible with its own workers, which are build against > sdk-2.12, and > >> >>>>> choose to run these in embedded + dep1 + dep2 environment. > >> >>>> > >> >>>> We have been talking about the expansion service and cross > language transforms a lot lately but I believe it will initially come at > the cost of poor fusion of transforms since "merging" environments that are > compatible is a difficult problem since it brings up many of the dependency > management issues (e.g. diamond dependency issues). > >> >>> > >> >>> I agree. I think expansion services offering "kitchen-sink" > >> >>> containers, when possible, can go far here. If we could at least > >> >>> recognize when one environment/set of deps is a superset of another, > >> >>> that could be an easy case that would yield a lot of benefit as > well. > >> >> > >> >> > >> >> +1 >