Thanks! I've left some comments on the doc. On Tue, Dec 17, 2019, 5:03 PM Heejong Lee <heej...@google.com> wrote:
> Hi, > > I wrote the draft on implementation plan[1]. The summary is on the first > page. Any help would be appreciated! > > [1]: > https://docs.google.com/document/d/1L7MJcfyy9mg2Ahfw5XPhUeBe-dyvAPMOYOiFA1-kAog/edit?usp=sharing > > On Thu, Dec 12, 2019 at 5:02 PM Heejong Lee <heej...@google.com> wrote: > >> I'm brushing up memory by revisiting the doc[1] and it seems like we've >> already reached the consensus on the bigger picture. I would start drafting >> the implementation plan. >> >> [1]: >> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing >> >> On Tue, Nov 26, 2019 at 3:54 AM Maximilian Michels <m...@apache.org> >> wrote: >> >>> Hey Heejong, >>> >>> I don't think so. It would be great to push this forward. >>> >>> Thanks, >>> Max >>> >>> On 26.11.19 02:49, Heejong Lee wrote: >>> > Hi, >>> > >>> > Is anyone actively working on artifact staging extension for >>> > cross-language pipelines? I'm thinking I can contribute to it in >>> coming >>> > Dec. If anyone has any progress on this and needs help, please let me >>> know. >>> > >>> > Thanks, >>> > >>> > On Wed, Jun 12, 2019 at 2:42 AM Ismaël Mejía <ieme...@gmail.com >>> > <mailto:ieme...@gmail.com>> wrote: >>> > >>> > Can you please add this to the design documents webpage. >>> > https://beam.apache.org/contribute/design-documents/ >>> > >>> > On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath >>> > <chamik...@google.com <mailto:chamik...@google.com>> wrote: >>> > > >>> > > >>> > > >>> > > On Tue, May 7, 2019 at 10:21 AM Maximilian Michels >>> > <m...@apache.org <mailto:m...@apache.org>> wrote: >>> > >> >>> > >> Here's the first draft: >>> > >> >>> > >>> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing >>> > >> >>> > >> It's rather high-level. We may want to add more details once >>> we have >>> > >> finalized the design. Feel free to make comments and edits. >>> > > >>> > > >>> > > Thanks Max. Added some comments. >>> > > >>> > >> >>> > >> >>> > >> > All of this goes back to the idea that I think the listing of >>> > >> > artifacts (or more general dependencies) should be a property >>> > of the >>> > >> > environment themselves. >>> > >> >>> > >> +1 I came to the same conclusion while thinking about how to >>> store >>> > >> artifact information for deferred execution of the pipeline. >>> > >> >>> > >> -Max >>> > >> >>> > >> On 07.05.19 18:10, Robert Bradshaw wrote: >>> > >> > Looking forward to your writeup, Max. In the meantime, some >>> > comments below. >>> > >> > >>> > >> > >>> > >> > From: Lukasz Cwik <lc...@google.com <mailto:lc...@google.com >>> >> >>> > >> > Date: Thu, May 2, 2019 at 6:45 PM >>> > >> > To: dev >>> > >> > >>> > >> >> >>> > >> >> >>> > >> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw >>> > <rober...@google.com <mailto:rober...@google.com>> wrote: >>> > >> >>> >>> > >> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik >>> > <lc...@google.com <mailto:lc...@google.com>> wrote: >>> > >> >>>> >>> > >> >>>> We should stick with URN + payload + artifact metadata[1] >>> > where the only mandatory one that all SDKs and expansion services >>> > understand is the "bytes" artifact type. This allows us to add >>> > optional URNs for file://, http://, Maven, PyPi, ... in the >>> future. >>> > I would make the artifact staging service use the same URN + >>> payload >>> > mechanism to get compatibility of artifacts across the different >>> > services and also have the artifact staging service be able to be >>> > queried for the list of artifact types it supports. >>> > >> >>> >>> > >> >>> +1 >>> > >> >>> >>> > >> >>>> Finally, we would need to have environments enumerate the >>> > artifact types that they support. >>> > >> >>> >>> > >> >>> Meaning at runtime, or as another field statically set in >>> > the proto? >>> > >> >> >>> > >> >> >>> > >> >> I don't believe runners/SDKs should have to know what >>> > artifacts each environment supports at runtime and instead have >>> > environments enumerate them explicitly in the proto. I have been >>> > thinking about a more general "capabilities" block on environments >>> > which allow them to enumerate URNs that the environment >>> understands. >>> > This would include artifact type URNs, PTransform URNs, coder URNs, >>> > ... I haven't proposed anything specific down this line yet because >>> > I was wondering how environment resources (CPU, min memory, >>> hardware >>> > like GPU, AWS/GCP/Azure/... machine types) should/could tie into >>> this. >>> > >> >> >>> > >> >>> >>> > >> >>>> Having everyone have the same "artifact" representation >>> > would be beneficial since: >>> > >> >>>> a) Python environments could install dependencies from a >>> > requirements.txt file (something that the Google Cloud Dataflow >>> > Python docker container allows for today) >>> > >> >>>> b) It provides an extensible and versioned mechanism for >>> > SDKs, environments, and artifact staging/retrieval services to >>> > support additional artifact types >>> > >> >>>> c) Allow for expressing a canonical representation of an >>> > artifact like a Maven package so a runner could merge environments >>> > that the runner deems compatible. >>> > >> >>>> >>> > >> >>>> The flow I could see is: >>> > >> >>>> 1) (optional) query artifact staging service for supported >>> > artifact types >>> > >> >>>> 2) SDK request expansion service to expand transform >>> > passing in a list of artifact types the SDK and artifact staging >>> > service support, the expansion service returns a list of artifact >>> > types limited to those supported types + any supported by the >>> > environment >>> > >> >>> >>> > >> >>> The crux of the issue seems to be how the expansion service >>> > returns >>> > >> >>> the artifacts themselves. Is this going with the approach >>> > that the >>> > >> >>> caller of the expansion service must host an artifact >>> > staging service? >>> > >> >> >>> > >> >> >>> > >> >> The caller would not need to host an artifact staging >>> service >>> > (but would become effectively a proxy service, see my comment below >>> > for more details) as I would have expected this to be part of the >>> > expansion service response. >>> > >> >> >>> > >> >>> >>> > >> >>> There is also the question here is how the returned >>> > artifacts get >>> > >> >>> attached to the various environments, or whether they get >>> > implicitly >>> > >> >>> applied to all returned stages (which need not have a >>> consistent >>> > >> >>> environment)? >>> > >> >> >>> > >> >> >>> > >> >> I would suggest returning additional information that says >>> > what artifact is for which environment. Applying all artifacts to >>> > all environments is likely to cause issues since some environments >>> > may not understand certain artifact types or may get conflicting >>> > versions of artifacts. I would see this happening since an >>> expansion >>> > service that aggregates other expansion services seems likely, for >>> > example: >>> > >> >> /-> ExpansionSerivce(Python) >>> > >> >> ExpansionService(Aggregator) --> ExpansionService(Java) >>> > >> >> \-> ExpansionSerivce(Go) >>> > >> > >>> > >> > All of this goes back to the idea that I think the listing of >>> > >> > artifacts (or more general dependencies) should be a property >>> > of the >>> > >> > environment themselves. >>> > >> > >>> > >> >>>> 3) SDK converts any artifact types that the artifact >>> > staging service or environment doesn't understand, e.g. pulls down >>> > Maven dependencies and converts them to "bytes" artifacts >>> > >> >>> >>> > >> >>> Here I think we're conflating two things. The "type" of an >>> > artifact is >>> > >> >>> both (1) how to fetch the bytes and (2) how to interpret >>> > them (e.g. is >>> > >> >>> this a jar file, or a pip tarball, or just some data needed >>> > by a DoFn, >>> > >> >>> or ...) Only (1) can be freely transmuted. >>> > >> >> >>> > >> >> >>> > >> >> Your right. Thinking about this some more, general artifact >>> > conversion is unlikely to be practical because how to interpret an >>> > artifact is environment dependent. For example, a requirements.txt >>> > used to install pip packages for a Python docker container depends >>> > on the filesystem layout of that specific docker container. One >>> > could simulate doing a pip install on the same filesystem, see the >>> > diff and then of all the packages in requirements.txt but this >>> > quickly becomes impractical. >>> > >> >> >>> > >> >>> >>> > >> >>>> 4) SDK sends artifacts to artifact staging service >>> > >> >>>> 5) Artifact staging service converts any artifacts to >>> types >>> > that the environment understands >>> > >> >>>> 6) Environment is started and gets artifacts from the >>> > artifact retrieval service. >>> > >> >>>> >>> > >> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw >>> > <rober...@google.com <mailto:rober...@google.com>> wrote: >>> > >> >>>>> >>> > >> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels >>> > <m...@apache.org <mailto:m...@apache.org>> wrote: >>> > >> >>>>>> >>> > >> >>>>>> Good idea to let the client expose an artifact staging >>> > service that the >>> > >> >>>>>> ExpansionService could use to stage artifacts. This >>> > solves two problems: >>> > >> >>>>>> >>> > >> >>>>>> (1) The Expansion Service not being able to access the >>> > Job Server >>> > >> >>>>>> artifact staging service >>> > >> >>>>>> (2) The client not having access to the dependencies >>> > returned by the >>> > >> >>>>>> Expansion Server >>> > >> >>>>>> >>> > >> >>>>>> The downside is that it adds an additional indirection. >>> > The alternative >>> > >> >>>>>> to let the client handle staging the artifacts returned >>> > by the Expansion >>> > >> >>>>>> Server is more transparent and easier to implement. >>> > >> >>>>> >>> > >> >>>>> The other downside is that it may not always be possible >>> > for the >>> > >> >>>>> expansion service to connect to the artifact staging >>> > service (e.g. >>> > >> >>>>> when constructing a pipeline locally against a remote >>> > expansion >>> > >> >>>>> service). >>> > >> >>>> >>> > >> >>>> Just to make sure, your saying the expansion service would >>> > return all the artifacts (bytes, urls, ...) as part of the response >>> > since the expansion service wouldn't be able to connect to the SDK >>> > that is running locally either. >>> > >> >>> >>> > >> >>> Yes. Well, more I'm asking how the expansion service would >>> > return any >>> > >> >>> artifacts. >>> > >> >>> >>> > >> >>> What we have is >>> > >> >>> >>> > >> >>> Runner <--- SDK ---> Expansion service. >>> > >> >>> >>> > >> >>> Where the unidirectional arrow means "instantiates a >>> > connection with" >>> > >> >>> and the other direction (and missing arrows) may not be >>> > possible. >>> > >> >> >>> > >> >> >>> > >> >> I believe the ExpansionService Expand request should become >>> a >>> > unidirectional stream back to the caller so that artifacts could be >>> > sent back to the SDK (effectively mirroring the artifact staging >>> > service API). So the expansion response would stream back a bunch >>> > artifact data messages and also the expansion response containing >>> > PTransform information. >>> > >> > >>> > >> > +1. >>> > >> > >>> > >> >>>>>> Ideally, the Expansion Service won't return any >>> > dependencies because the >>> > >> >>>>>> environment already contains the required dependencies. >>> > We could make it >>> > >> >>>>>> a requirement for the expansion to be performed inside >>> an >>> > environment. >>> > >> >>>>>> Then we would already ensure during expansion time that >>> > the runtime >>> > >> >>>>>> dependencies are available. >>> > >> >>>>> >>> > >> >>>>> Yes, it's cleanest if the expansion service provides an >>> > environment >>> > >> >>>>> without all the dependencies provided. Interesting idea >>> to >>> > make this a >>> > >> >>>>> property of the expansion service itself. >>> > >> >>>> >>> > >> >>>> I had thought this too but an opaque docker container that >>> > was built on top of a base Beam docker container would be very >>> > difficult for a runner to introspect and check to see if its >>> > compatible to allow for fusion across PTransforms. I think >>> artifacts >>> > need to be communicated in their canonical representation. >>> > >> >>> >>> > >> >>> It's clean (from the specification point of view), but >>> > doesn't allow >>> > >> >>> for good introspection/fusion (aside from one being a base >>> > of another, >>> > >> >>> perhaps). >>> > >> >>> >>> > >> >>>>>>> In this case, the runner would (as >>> > >> >>>>>>> requested by its configuration) be free to merge >>> > environments it >>> > >> >>>>>>> deemed compatible, including swapping out beam-java-X >>> for >>> > >> >>>>>>> beam-java-embedded if it considers itself compatible >>> > with the >>> > >> >>>>>>> dependency list. >>> > >> >>>>>> >>> > >> >>>>>> Could you explain how that would work in practice? >>> > >> >>>>> >>> > >> >>>>> Say one has a pipeline with environments >>> > >> >>>>> >>> > >> >>>>> A: beam-java-sdk-2.12-docker >>> > >> >>>>> B: beam-java-sdk-2.12-docker + dep1 >>> > >> >>>>> C: beam-java-sdk-2.12-docker + dep2 >>> > >> >>>>> D: beam-java-sdk-2.12-docker + dep3 >>> > >> >>>>> >>> > >> >>>>> A runner could (conceivably) be intelligent enough to >>> know >>> > that dep1 >>> > >> >>>>> and dep2 are indeed compatible, and run A, B, and C in a >>> > single >>> > >> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment >>> (with the >>> > >> >>>>> corresponding fusion and lower overhead benefits). If a >>> > certain >>> > >> >>>>> pipeline option is set, it might further note that dep1 >>> > and dep2 are >>> > >> >>>>> compatible with its own workers, which are build against >>> > sdk-2.12, and >>> > >> >>>>> choose to run these in embedded + dep1 + dep2 >>> environment. >>> > >> >>>> >>> > >> >>>> We have been talking about the expansion service and cross >>> > language transforms a lot lately but I believe it will initially >>> > come at the cost of poor fusion of transforms since "merging" >>> > environments that are compatible is a difficult problem since it >>> > brings up many of the dependency management issues (e.g. diamond >>> > dependency issues). >>> > >> >>> >>> > >> >>> I agree. I think expansion services offering "kitchen-sink" >>> > >> >>> containers, when possible, can go far here. If we could at >>> least >>> > >> >>> recognize when one environment/set of deps is a superset of >>> > another, >>> > >> >>> that could be an easy case that would yield a lot of >>> benefit >>> > as well. >>> > >> >> >>> > >> >> >>> > >> >> +1 >>> > >>> >>