Thanks for the nice write up Chad. On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw <rober...@google.com> wrote:
> Thanks for bringing this up again. My thoughts on the open questions below. > > On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <chad...@gmail.com> wrote: > > That commit solves 2 problems: > > > > Adds the pubsub Java deps so that they’re available in our portable > pipeline > > Makes the coder for the PubsubIO message-holder type, PubsubMessage, > available as a standard coder. This is required because both PubsubIO.Read > and PubsubIO.Write expand to ParDos which pass along these PubsubMessage > objects, but only “standard” (i.e. portable) coders can be used, so we have > to hack it to make PubsubMessage appear as a standard coder. > > > > More details: > > > > There’s a similar magic commit required for Kafka external transforms > > The Jira issue for this problem is here: > https://jira.apache.org/jira/browse/BEAM-7870 > > For problem #2 above there seems to be some consensus forming around > using Avro or schema/row coders to send compound types in a portable way. > Here’s the PR for making row coders portable > > https://github.com/apache/beam/pull/9188 > > +1. Note that this doesn't mean that the IO itself must produce rows; > part of the Schema work in Java is to make it easy to automatically > convert from various Java classes to schemas transparently, so this > same logic that would allow one to apply an SQL filter directly to a > Kafka/PubSub read would allow cross-language. Even if that doesn't > work, we need not uglify the Java API; we can have an > option/alternative transform that appends the convert-to-Row DoFn for > easier use by external (though the goal of the former work is to make > this step unnecissary). > Updating all IO connectors / transforms to have a version that produces/consumes a PCollection<Row> is infeasible so I agree that we need an automatic conversion to/from PCollection<Row> possibly by injecting PTransfroms during ExternalTransform expansion. > > > I don’t really have any ideas for problem #1 > > The crux of the issue here is that the jobs API was not designed with > cross-language in mind, and so the artifact API ties artifacts to jobs > rather than to environments. To solve this we need to augment the > notion of environment to allow the specification of additional > dependencies (e.g. jar files in this specific case, or better as > maven/pypi/... dependencies (with version ranges) such that > environment merging and dependency resolution can be sanely done), and > a way for the expansion service to provide such dependencies. > > Max wrote up a summary of the prior discussions at > > https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8 > > In the short term, one can build a custom docker image that has all > the requisite dependencies installed. > > This touches on a related but separable issue that one may want to run > some of these transforms "natively" in the same process as the runner > (e.g. a Java IO in the Flink Java Runner) rather than via docker. > (Similarly with subprocess.) Exactly how that works with environment > specifications is also a bit TBD, but my proposal has been that these > are best viewed as runner-specific substitutions of standard > environments. > We need a permanent solution for this but for now we have a temporary solution where additional jar files can be specified through an experiment when running a Python pipeline: https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55 Thanks, Cham > > > So the portability expansion system works, and now it’s time to sand off > some of the rough corners. I’d love to hear others’ thoughts on how to > resolve some of these remaining issues. > > +1 > > > On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <chad...@gmail.com> wrote: > > > > Hi all, > > There was some interest in this topic at the Beam Summit this week (btw, > great job to everyone involved!), so I thought I’d try to summarize the > current state of things. > > First, let me explain the idea behind an external transforms for the > uninitiated. > > > > Problem: > > > > there’s a transform that you want to use, but it’s not available in your > desired language. IO connectors are a good example: there are many > available in the Java SDK, but not so much in Python or Go. > > > > Solution: > > > > Create a stub transform in your desired language (e.g. Python) whose > primary role is to serialize the parameters passed to that transform > > When you run your portable pipeline, just prior to it being sent to the > Job Service for execution, your stub transform’s payload is first sent to > the “Expansion Service” that’s running in the native language (Java), where > the payload is used to construct an instance of the native transform, which > is then expanded and converted to a protobuf and sent back to the calling > process (Python). > > The protobuf representation of the expanded transform gets integrated > back into the pipeline that you’re submitting > > Steps 2-3 are repeated for each external transform in your pipeline > > Then the whole pipeline gets sent to the Job Service to be invoked on > Flink/Spark/etc > > > > ________________________________ > > > > Now on to my journey to get PubsubIO working in python on Flink. > > > > The first issue I encountered was that there was a lot of boilerplate > involved in serializing the stub python transform’s parameters so they can > be sent to the expansion service. > > > > I created a PR to make this simpler, which has just been merged to > master: https://github.com/apache/beam/pull/9098 > > > > With this feature in place, if you’re using python 3.7 you can use a > dataclass and the typing module to create your transform and describe your > schema in one go. For example: > > > > @dataclasses.dataclass > > class MyAwesomeTransform(beam.ExternalTransform): > > URN = 'beam:external:fakeurn:v1' > > > > integer_example: int > > string_example: str > > list_of_strings: List[str] > > optional_kv: Optional[Tuple[str, float]] = None > > optional_integer: Optional[int] = None > > expansion_service: dataclasses.InitVar[Optional[str]] = None > > > > For earlier versions of python, you can use typing.NamedTuple to declare > your schema. > > > > MyAwesomeSchema = typing.NamedTuple( > > 'MyAwesomeSchema', > > [ > > ('integer_example', int), > > ('string_example', unicode), > > ('list_of_strings', List[unicode]), > > ('optional_kv', Optional[Tuple[unicode, float]]), > > ('optional_integer', Optional[int]), > > ] > > ) > > > > There’s also an option to generate the schema implicitly based on the > value(s) you wish to serialize. > > > > There was a slight tangent in implementing this feature in that > requesting a coder for typing.List resulted in pickle coder instead of > IterableCoder. That’s bad because only standard/portable coders can be used > for expansion in Java (for obvious reasons), so as a convenience that was > solved here: https://github.com/apache/beam/pull/9344 > > > > The next issue that I encountered was that python did not track the > boundedness of PCollections, which made it impossible to use the expansion > service to create unbounded writes. That’s been solved and merged here: > https://github.com/apache/beam/pull/9426 > > > > So that brings us to the actual PR for adding external transform support > for PubsubIO: https://github.com/apache/beam/pull/9268 > > > > The PR works, but with one big caveat: in order to use it you must build > your Java containers with this special commit: > https://github.com/chadrik/beam/commit/d12b99084809ec34fcf0be616e94301d3aca4870 > > > > That commit solves 2 problems: > > > > Adds the pubsub Java deps so that they’re available in our portable > pipeline > > Makes the coder for the PubsubIO message-holder type, PubsubMessage, > available as a standard coder. This is required because both PubsubIO.Read > and PubsubIO.Write expand to ParDos which pass along these PubsubMessage > objects, but only “standard” (i.e. portable) coders can be used, so we have > to hack it to make PubsubMessage appear as a standard coder. > > > > More details: > > > > There’s a similar magic commit required for Kafka external transforms > > The Jira issue for this problem is here: > https://jira.apache.org/jira/browse/BEAM-7870 > > For problem #2 above there seems to be some consensus forming around > using Avro or schema/row coders to send compound types in a portable way. > Here’s the PR for making row coders portable > > https://github.com/apache/beam/pull/9188 > > I don’t really have any ideas for problem #1 > > > > So the portability expansion system works, and now it’s time to sand off > some of the rough corners. I’d love to hear others’ thoughts on how to > resolve some of these remaining issues. > > > > -chad >