Hi,

Is anyone actively working on artifact staging extension for cross-language
pipelines? I'm thinking I can contribute to it in coming Dec. If anyone has
any progress on this and needs help, please let me know.

Thanks,

On Wed, Jun 12, 2019 at 2:42 AM Ismaël Mejía <ieme...@gmail.com> wrote:

> Can you please add this to the design documents webpage.
> https://beam.apache.org/contribute/design-documents/
>
> On Wed, May 8, 2019 at 7:29 PM Chamikara Jayalath <chamik...@google.com>
> wrote:
> >
> >
> >
> > On Tue, May 7, 2019 at 10:21 AM Maximilian Michels <m...@apache.org>
> wrote:
> >>
> >> Here's the first draft:
> >>
> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit?usp=sharing
> >>
> >> It's rather high-level. We may want to add more details once we have
> >> finalized the design. Feel free to make comments and edits.
> >
> >
> > Thanks Max. Added some comments.
> >
> >>
> >>
> >> > All of this goes back to the idea that I think the listing of
> >> > artifacts (or more general dependencies) should be a property of the
> >> > environment themselves.
> >>
> >> +1 I came to the same conclusion while thinking about how to store
> >> artifact information for deferred execution of the pipeline.
> >>
> >> -Max
> >>
> >> On 07.05.19 18:10, Robert Bradshaw wrote:
> >> > Looking forward to your writeup, Max. In the meantime, some comments
> below.
> >> >
> >> >
> >> > From: Lukasz Cwik <lc...@google.com>
> >> > Date: Thu, May 2, 2019 at 6:45 PM
> >> > To: dev
> >> >
> >> >>
> >> >>
> >> >> On Thu, May 2, 2019 at 7:20 AM Robert Bradshaw <rober...@google.com>
> wrote:
> >> >>>
> >> >>> On Sat, Apr 27, 2019 at 1:14 AM Lukasz Cwik <lc...@google.com>
> wrote:
> >> >>>>
> >> >>>> We should stick with URN + payload + artifact metadata[1] where
> the only mandatory one that all SDKs and expansion services understand is
> the "bytes" artifact type. This allows us to add optional URNs for file://,
> http://, Maven, PyPi, ... in the future. I would make the artifact
> staging service use the same URN + payload mechanism to get compatibility
> of artifacts across the different services and also have the artifact
> staging service be able to be queried for the list of artifact types it
> supports.
> >> >>>
> >> >>> +1
> >> >>>
> >> >>>> Finally, we would need to have environments enumerate the artifact
> types that they support.
> >> >>>
> >> >>> Meaning at runtime, or as another field statically set in the proto?
> >> >>
> >> >>
> >> >> I don't believe runners/SDKs should have to know what artifacts each
> environment supports at runtime and instead have environments enumerate
> them explicitly in the proto. I have been thinking about a more general
> "capabilities" block on environments which allow them to enumerate URNs
> that the environment understands. This would include artifact type URNs,
> PTransform URNs, coder URNs, ... I haven't proposed anything specific down
> this line yet because I was wondering how environment resources (CPU, min
> memory, hardware like GPU, AWS/GCP/Azure/... machine types) should/could
> tie into this.
> >> >>
> >> >>>
> >> >>>> Having everyone have the same "artifact" representation would be
> beneficial since:
> >> >>>> a) Python environments could install dependencies from a
> requirements.txt file (something that the Google Cloud Dataflow Python
> docker container allows for today)
> >> >>>> b) It provides an extensible and versioned mechanism for SDKs,
> environments, and artifact staging/retrieval services to support additional
> artifact types
> >> >>>> c) Allow for expressing a canonical representation of an artifact
> like a Maven package so a runner could merge environments that the runner
> deems compatible.
> >> >>>>
> >> >>>> The flow I could see is:
> >> >>>> 1) (optional) query artifact staging service for supported
> artifact types
> >> >>>> 2) SDK request expansion service to expand transform passing in a
> list of artifact types the SDK and artifact staging service support, the
> expansion service returns a list of artifact types limited to those
> supported types + any supported by the environment
> >> >>>
> >> >>> The crux of the issue seems to be how the expansion service returns
> >> >>> the artifacts themselves. Is this going with the approach that the
> >> >>> caller of the expansion service must host an artifact staging
> service?
> >> >>
> >> >>
> >> >> The caller would not need to host an artifact staging service (but
> would become effectively a proxy service, see my comment below for more
> details) as I would have expected this to be part of the expansion service
> response.
> >> >>
> >> >>>
> >> >>> There is also the question here is how the returned artifacts get
> >> >>> attached to the various environments, or whether they get implicitly
> >> >>> applied to all returned stages (which need not have a consistent
> >> >>> environment)?
> >> >>
> >> >>
> >> >> I would suggest returning additional information that says what
> artifact is for which environment. Applying all artifacts to all
> environments is likely to cause issues since some environments may not
> understand certain artifact types or may get conflicting versions of
> artifacts. I would see this happening since an expansion service that
> aggregates other expansion services seems likely, for example:
> >> >>                               /-> ExpansionSerivce(Python)
> >> >> ExpansionService(Aggregator) --> ExpansionService(Java)
> >> >>                               \-> ExpansionSerivce(Go)
> >> >
> >> > All of this goes back to the idea that I think the listing of
> >> > artifacts (or more general dependencies) should be a property of the
> >> > environment themselves.
> >> >
> >> >>>> 3) SDK converts any artifact types that the artifact staging
> service or environment doesn't understand, e.g. pulls down Maven
> dependencies and converts them to "bytes" artifacts
> >> >>>
> >> >>> Here I think we're conflating two things. The "type" of an artifact
> is
> >> >>> both (1) how to fetch the bytes and (2) how to interpret them (e.g.
> is
> >> >>> this a jar file, or a pip tarball, or just some data needed by a
> DoFn,
> >> >>> or ...) Only (1) can be freely transmuted.
> >> >>
> >> >>
> >> >> Your right. Thinking about this some more, general artifact
> conversion is unlikely to be practical because how to interpret an artifact
> is environment dependent. For example, a requirements.txt used to install
> pip packages for a Python docker container depends on the filesystem layout
> of that specific docker container. One could simulate doing a pip install
> on the same filesystem, see the diff and then of all the packages in
> requirements.txt but this quickly becomes impractical.
> >> >>
> >> >>>
> >> >>>> 4) SDK sends artifacts to artifact staging service
> >> >>>> 5) Artifact staging service converts any artifacts to types that
> the environment understands
> >> >>>> 6) Environment is started and gets artifacts from the artifact
> retrieval service.
> >> >>>>
> >> >>>> On Wed, Apr 24, 2019 at 4:44 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >> >>>>>
> >> >>>>> On Wed, Apr 24, 2019 at 12:21 PM Maximilian Michels <
> m...@apache.org> wrote:
> >> >>>>>>
> >> >>>>>> Good idea to let the client expose an artifact staging service
> that the
> >> >>>>>> ExpansionService could use to stage artifacts. This solves two
> problems:
> >> >>>>>>
> >> >>>>>> (1) The Expansion Service not being able to access the Job Server
> >> >>>>>> artifact staging service
> >> >>>>>> (2) The client not having access to the dependencies returned by
> the
> >> >>>>>> Expansion Server
> >> >>>>>>
> >> >>>>>> The downside is that it adds an additional indirection. The
> alternative
> >> >>>>>> to let the client handle staging the artifacts returned by the
> Expansion
> >> >>>>>> Server is more transparent and easier to implement.
> >> >>>>>
> >> >>>>> The other downside is that it may not always be possible for the
> >> >>>>> expansion service to connect to the artifact staging service (e.g.
> >> >>>>> when constructing a pipeline locally against a remote expansion
> >> >>>>> service).
> >> >>>>
> >> >>>> Just to make sure, your saying the expansion service would return
> all the artifacts (bytes, urls, ...) as part of the response since the
> expansion service wouldn't be able to connect to the SDK that is running
> locally either.
> >> >>>
> >> >>> Yes. Well, more I'm asking how the expansion service would return
> any
> >> >>> artifacts.
> >> >>>
> >> >>> What we have is
> >> >>>
> >> >>> Runner <--- SDK ---> Expansion service.
> >> >>>
> >> >>> Where the unidirectional arrow means "instantiates a connection
> with"
> >> >>> and the other direction (and missing arrows) may not be possible.
> >> >>
> >> >>
> >> >> I believe the ExpansionService Expand request should become a
> unidirectional stream back to the caller so that artifacts could be sent
> back to the SDK (effectively mirroring the artifact staging service API).
> So the expansion response would stream back a bunch artifact data messages
> and also the expansion response containing PTransform information.
> >> >
> >> > +1.
> >> >
> >> >>>>>> Ideally, the Expansion Service won't return any dependencies
> because the
> >> >>>>>> environment already contains the required dependencies. We could
> make it
> >> >>>>>> a requirement for the expansion to be performed inside an
> environment.
> >> >>>>>> Then we would already ensure during expansion time that the
> runtime
> >> >>>>>> dependencies are available.
> >> >>>>>
> >> >>>>> Yes, it's cleanest if the expansion service provides an
> environment
> >> >>>>> without all the dependencies provided. Interesting idea to make
> this a
> >> >>>>> property of the expansion service itself.
> >> >>>>
> >> >>>> I had thought this too but an opaque docker container that was
> built on top of a base Beam docker container would be very difficult for a
> runner to introspect and check to see if its compatible to allow for fusion
> across PTransforms. I think artifacts need to be communicated in their
> canonical representation.
> >> >>>
> >> >>> It's clean (from the specification point of view), but doesn't allow
> >> >>> for good introspection/fusion (aside from one being a base of
> another,
> >> >>> perhaps).
> >> >>>
> >> >>>>>>> In this case, the runner would (as
> >> >>>>>>> requested by its configuration) be free to merge environments it
> >> >>>>>>> deemed compatible, including swapping out beam-java-X for
> >> >>>>>>> beam-java-embedded if it considers itself compatible with the
> >> >>>>>>> dependency list.
> >> >>>>>>
> >> >>>>>> Could you explain how that would work in practice?
> >> >>>>>
> >> >>>>> Say one has a pipeline with environments
> >> >>>>>
> >> >>>>> A: beam-java-sdk-2.12-docker
> >> >>>>> B: beam-java-sdk-2.12-docker + dep1
> >> >>>>> C: beam-java-sdk-2.12-docker + dep2
> >> >>>>> D: beam-java-sdk-2.12-docker + dep3
> >> >>>>>
> >> >>>>> A runner could (conceivably) be intelligent enough to know that
> dep1
> >> >>>>> and dep2 are indeed compatible, and run A, B, and C in a single
> >> >>>>> beam-java-sdk-2.12-docker + dep1 + dep2 environment (with the
> >> >>>>> corresponding fusion and lower overhead benefits). If a certain
> >> >>>>> pipeline option is set, it might further note that dep1 and dep2
> are
> >> >>>>> compatible with its own workers, which are build against
> sdk-2.12, and
> >> >>>>> choose to run these in embedded + dep1 + dep2 environment.
> >> >>>>
> >> >>>> We have been talking about the expansion service and cross
> language transforms a lot lately but I believe it will initially come at
> the cost of poor fusion of transforms since "merging" environments that are
> compatible is a difficult problem since it brings up many of the dependency
> management issues (e.g. diamond dependency issues).
> >> >>>
> >> >>> I agree. I think expansion services offering "kitchen-sink"
> >> >>> containers, when possible, can go far here. If we could at least
> >> >>> recognize when one environment/set of deps is a superset of another,
> >> >>> that could be an easy case that would yield a lot of benefit as
> well.
> >> >>
> >> >>
> >> >> +1
>

Reply via email to