Hi, I wrote the draft on implementation plan[1]. The summary is on the first page. Any help would be appreciated!
[1]: https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog/edit?usp=sharing On Thu, Dec 12, 2019 at 5:02 PM Heejong Lee <[email protected]> wrote: > I'm brushing up memory by revisiting the doc[1] and it seems like we've > already reached the consensus on the bigger picture. I would start drafting > the implementation plan. > > [1]: > https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing > > On Tue, Nov 26, 2019 at 3:54 AM Maximilian Michels <[email protected]> wrote: > >> Hey Heejong, >> >> I don't think so. It would be great to push this forward. >> >> Thanks, >> Max >> >> On 26.11.19 02:49, Heejong Lee wrote: >> > Hi, >> > >> > Is anyone actively working on artifact staging extension for >> > cross-language pipelines? I'm thinking I can contribute to it in coming >> > Dec. If anyone has any progress on this and needs help, please let me >> know. >> > >> > Thanks, >> > >> > On Wed, Jun 12, 2019 at 2:42 AM Ismaël Mejía <[email protected] >> > <mailto:[email protected]>> wrote: >> > >> > Can you please add this to the design documents webpage. >> > https://beam.apache.org/contribute/design-documents/ >> > >> > On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath >> > <[email protected] <mailto:[email protected]>> wrote: >> > > >> > > >> > > >> > > On Tue, May 7, 2019 at 10:21 AM Maximilian Michels >> > <[email protected] <mailto:[email protected]>> wrote: >> > >> >> > >> Here's the first draft: >> > >> >> > >> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing >> > >> >> > >> It's rather high-level. We may want to add more details once we >> have >> > >> finalized the design. Feel free to make comments and edits. >> > > >> > > >> > > Thanks Max. Added some comments. >> > > >> > >> >> > >> >> > >> > All of this goes back to the idea that I think the listing of >> > >> > artifacts (or more general dependencies) should be a property >> > of the >> > >> > environment themselves. >> > >> >> > >> +1 I came to the same conclusion while thinking about how to >> store >> > >> artifact information for deferred execution of the pipeline. >> > >> >> > >> -Max >> > >> >> > >> On 07.05.19 18:10, Robert Bradshaw wrote: >> > >> > Looking forward to your writeup, Max. In the meantime, some >> > comments below. >> > >> > >> > >> > >> > >> > From: Lukasz Cwik <[email protected] <mailto:[email protected] >> >> >> > >> > Date: Thu, May 2, 2019 at 6:45 PM >> > >> > To: dev >> > >> > >> > >> >> >> > >> >> >> > >> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw >> > <[email protected] <mailto:[email protected]>> wrote: >> > >> >>> >> > >> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik >> > <[email protected] <mailto:[email protected]>> wrote: >> > >> >>>> >> > >> >>>> We should stick with URN + payload + artifact metadata[1] >> > where the only mandatory one that all SDKs and expansion services >> > understand is the "bytes" artifact type. This allows us to add >> > optional URNs for file://, http://, Maven, PyPi, ... in the future. >> > I would make the artifact staging service use the same URN + payload >> > mechanism to get compatibility of artifacts across the different >> > services and also have the artifact staging service be able to be >> > queried for the list of artifact types it supports. >> > >> >>> >> > >> >>> +1 >> > >> >>> >> > >> >>>> Finally, we would need to have environments enumerate the >> > artifact types that they support. >> > >> >>> >> > >> >>> Meaning at runtime, or as another field statically set in >> > the proto? >> > >> >> >> > >> >> >> > >> >> I don't believe runners/SDKs should have to know what >> > artifacts each environment supports at runtime and instead have >> > environments enumerate them explicitly in the proto. I have been >> > thinking about a more general "capabilities" block on environments >> > which allow them to enumerate URNs that the environment understands. >> > This would include artifact type URNs, PTransform URNs, coder URNs, >> > ... I haven't proposed anything specific down this line yet because >> > I was wondering how environment resources (CPU, min memory, hardware >> > like GPU, AWS/GCP/Azure/... machine types) should/could tie into >> this. >> > >> >> >> > >> >>> >> > >> >>>> Having everyone have the same "artifact" representation >> > would be beneficial since: >> > >> >>>> a) Python environments could install dependencies from a >> > requirements.txt file (something that the Google Cloud Dataflow >> > Python docker container allows for today) >> > >> >>>> b) It provides an extensible and versioned mechanism for >> > SDKs, environments, and artifact staging/retrieval services to >> > support additional artifact types >> > >> >>>> c) Allow for expressing a canonical representation of an >> > artifact like a Maven package so a runner could merge environments >> > that the runner deems compatible. >> > >> >>>> >> > >> >>>> The flow I could see is: >> > >> >>>> 1) (optional) query artifact staging service for supported >> > artifact types >> > >> >>>> 2) SDK request expansion service to expand transform >> > passing in a list of artifact types the SDK and artifact staging >> > service support, the expansion service returns a list of artifact >> > types limited to those supported types + any supported by the >> > environment >> > >> >>> >> > >> >>> The crux of the issue seems to be how the expansion service >> > returns >> > >> >>> the artifacts themselves. Is this going with the approach >> > that the >> > >> >>> caller of the expansion service must host an artifact >> > staging service? >> > >> >> >> > >> >> >> > >> >> The caller would not need to host an artifact staging service >> > (but would become effectively a proxy service, see my comment below >> > for more details) as I would have expected this to be part of the >> > expansion service response. >> > >> >> >> > >> >>> >> > >> >>> There is also the question here is how the returned >> > artifacts get >> > >> >>> attached to the various environments, or whether they get >> > implicitly >> > >> >>> applied to all returned stages (which need not have a >> consistent >> > >> >>> environment)? >> > >> >> >> > >> >> >> > >> >> I would suggest returning additional information that says >> > what artifact is for which environment. Applying all artifacts to >> > all environments is likely to cause issues since some environments >> > may not understand certain artifact types or may get conflicting >> > versions of artifacts. I would see this happening since an expansion >> > service that aggregates other expansion services seems likely, for >> > example: >> > >> >> /-> ExpansionSerivce(Python) >> > >> >> ExpansionService(Aggregator) --> ExpansionService(Java) >> > >> >> \-> ExpansionSerivce(Go) >> > >> > >> > >> > All of this goes back to the idea that I think the listing of >> > >> > artifacts (or more general dependencies) should be a property >> > of the >> > >> > environment themselves. >> > >> > >> > >> >>>> 3) SDK converts any artifact types that the artifact >> > staging service or environment doesn't understand, e.g. pulls down >> > Maven dependencies and converts them to "bytes" artifacts >> > >> >>> >> > >> >>> Here I think we're conflating two things. The "type" of an >> > artifact is >> > >> >>> both (1) how to fetch the bytes and (2) how to interpret >> > them (e.g. is >> > >> >>> this a jar file, or a pip tarball, or just some data needed >> > by a DoFn, >> > >> >>> or ...) Only (1) can be freely transmuted. >> > >> >> >> > >> >> >> > >> >> Your right. Thinking about this some more, general artifact >> > conversion is unlikely to be practical because how to interpret an >> > artifact is environment dependent. For example, a requirements.txt >> > used to install pip packages for a Python docker container depends >> > on the filesystem layout of that specific docker container. One >> > could simulate doing a pip install on the same filesystem, see the >> > diff and then of all the packages in requirements.txt but this >> > quickly becomes impractical. >> > >> >> >> > >> >>> >> > >> >>>> 4) SDK sends artifacts to artifact staging service >> > >> >>>> 5) Artifact staging service converts any artifacts to types >> > that the environment understands >> > >> >>>> 6) Environment is started and gets artifacts from the >> > artifact retrieval service. >> > >> >>>> >> > >> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw >> > <[email protected] <mailto:[email protected]>> wrote: >> > >> >>>>> >> > >> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels >> > <[email protected] <mailto:[email protected]>> wrote: >> > >> >>>>>> >> > >> >>>>>> Good idea to let the client expose an artifact staging >> > service that the >> > >> >>>>>> ExpansionService could use to stage artifacts. This >> > solves two problems: >> > >> >>>>>> >> > >> >>>>>> (1) The Expansion Service not being able to access the >> > Job Server >> > >> >>>>>> artifact staging service >> > >> >>>>>> (2) The client not having access to the dependencies >> > returned by the >> > >> >>>>>> Expansion Server >> > >> >>>>>> >> > >> >>>>>> The downside is that it adds an additional indirection. >> > The alternative >> > >> >>>>>> to let the client handle staging the artifacts returned >> > by the Expansion >> > >> >>>>>> Server is more transparent and easier to implement. >> > >> >>>>> >> > >> >>>>> The other downside is that it may not always be possible >> > for the >> > >> >>>>> expansion service to connect to the artifact staging >> > service (e.g. >> > >> >>>>> when constructing a pipeline locally against a remote >> > expansion >> > >> >>>>> service). >> > >> >>>> >> > >> >>>> Just to make sure, your saying the expansion service would >> > return all the artifacts (bytes, urls, ...) as part of the response >> > since the expansion service wouldn't be able to connect to the SDK >> > that is running locally either. >> > >> >>> >> > >> >>> Yes. Well, more I'm asking how the expansion service would >> > return any >> > >> >>> artifacts. >> > >> >>> >> > >> >>> What we have is >> > >> >>> >> > >> >>> Runner <--- SDK ---> Expansion service. >> > >> >>> >> > >> >>> Where the unidirectional arrow means "instantiates a >> > connection with" >> > >> >>> and the other direction (and missing arrows) may not be >> > possible. >> > >> >> >> > >> >> >> > >> >> I believe the ExpansionService Expand request should become a >> > unidirectional stream back to the caller so that artifacts could be >> > sent back to the SDK (effectively mirroring the artifact staging >> > service API). So the expansion response would stream back a bunch >> > artifact data messages and also the expansion response containing >> > PTransform information. >> > >> > >> > >> > +1. >> > >> > >> > >> >>>>>> Ideally, the Expansion Service won't return any >> > dependencies because the >> > >> >>>>>> environment already contains the required dependencies. >> > We could make it >> > >> >>>>>> a requirement for the expansion to be performed inside an >> > environment. >> > >> >>>>>> Then we would already ensure during expansion time that >> > the runtime >> > >> >>>>>> dependencies are available. >> > >> >>>>> >> > >> >>>>> Yes, it's cleanest if the expansion service provides an >> > environment >> > >> >>>>> without all the dependencies provided. Interesting idea to >> > make this a >> > >> >>>>> property of the expansion service itself. >> > >> >>>> >> > >> >>>> I had thought this too but an opaque docker container that >> > was built on top of a base Beam docker container would be very >> > difficult for a runner to introspect and check to see if its >> > compatible to allow for fusion across PTransforms. I think artifacts >> > need to be communicated in their canonical representation. >> > >> >>> >> > >> >>> It's clean (from the specification point of view), but >> > doesn't allow >> > >> >>> for good introspection/fusion (aside from one being a base >> > of another, >> > >> >>> perhaps). >> > >> >>> >> > >> >>>>>>> In this case, the runner would (as >> > >> >>>>>>> requested by its configuration) be free to merge >> > environments it >> > >> >>>>>>> deemed compatible, including swapping out beam-java-X >> for >> > >> >>>>>>> beam-java-embedded if it considers itself compatible >> > with the >> > >> >>>>>>> dependency list. >> > >> >>>>>> >> > >> >>>>>> Could you explain how that would work in practice? >> > >> >>>>> >> > >> >>>>> Say one has a pipeline with environments >> > >> >>>>> >> > >> >>>>> A: beam-java-sdk-2.12-docker >> > >> >>>>> B: beam-java-sdk-2.12-docker + dep1 >> > >> >>>>> C: beam-java-sdk-2.12-docker + dep2 >> > >> >>>>> D: beam-java-sdk-2.12-docker + dep3 >> > >> >>>>> >> > >> >>>>> A runner could (conceivably) be intelligent enough to know >> > that dep1 >> > >> >>>>> and dep2 are indeed compatible, and run A, B, and C in a >> > single >> > >> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment (with >> the >> > >> >>>>> corresponding fusion and lower overhead benefits). If a >> > certain >> > >> >>>>> pipeline option is set, it might further note that dep1 >> > and dep2 are >> > >> >>>>> compatible with its own workers, which are build against >> > sdk-2.12, and >> > >> >>>>> choose to run these in embedded + dep1 + dep2 environment. >> > >> >>>> >> > >> >>>> We have been talking about the expansion service and cross >> > language transforms a lot lately but I believe it will initially >> > come at the cost of poor fusion of transforms since "merging" >> > environments that are compatible is a difficult problem since it >> > brings up many of the dependency management issues (e.g. diamond >> > dependency issues). >> > >> >>> >> > >> >>> I agree. I think expansion services offering "kitchen-sink" >> > >> >>> containers, when possible, can go far here. If we could at >> least >> > >> >>> recognize when one environment/set of deps is a superset of >> > another, >> > >> >>> that could be an easy case that would yield a lot of benefit >> > as well. >> > >> >> >> > >> >> >> > >> >> +1 >> > >> >
