Thanks for the nice write up Chad.

On Mon, Sep 16, 2019 at 12:17 PM Robert Bradshaw <rober...@google.com>
wrote:

> Thanks for bringing this up again. My thoughts on the open questions below.
>
> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <chad...@gmail.com> wrote:
> > That commit solves 2 problems:
> >
> > Adds the pubsub Java deps so that they’re available in our portable
> pipeline
> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
> available as a standard coder. This is required because both PubsubIO.Read
> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
> objects, but only “standard” (i.e. portable) coders can be used, so we have
> to hack it to make PubsubMessage appear as a standard coder.
> >
> > More details:
> >
> > There’s a similar magic commit required for Kafka external transforms
> > The Jira issue for this problem is here:
> https://jira.apache.org/jira/browse/BEAM-7870
> > For problem #2 above there seems to be some consensus forming around
> using Avro or schema/row coders to send compound types in a portable way.
> Here’s the PR for making row coders portable
> > https://github.com/apache/beam/pull/9188
>
> +1. Note that this doesn't mean that the IO itself must produce rows;
> part of the Schema work in Java is to make it easy to automatically
> convert from various Java classes to schemas transparently, so this
> same logic that would allow one to apply an SQL filter directly to a
> Kafka/PubSub read would allow cross-language. Even if that doesn't
> work, we need not uglify the Java API; we can have an
> option/alternative transform that appends the convert-to-Row DoFn for
> easier use by external (though the goal of the former work is to make
> this step unnecissary).
>

Updating all IO connectors / transforms to have a version that
produces/consumes a PCollection<Row> is infeasible so I agree that we need
an automatic conversion to/from PCollection<Row> possibly by injecting
PTransfroms during ExternalTransform expansion.

>
> > I don’t really have any ideas for problem #1
>
> The crux of the issue here is that the jobs API was not designed with
> cross-language in mind, and so the artifact API ties artifacts to jobs
> rather than to environments. To solve this we need to augment the
> notion of environment to allow the specification of additional
> dependencies (e.g. jar files in this specific case, or better as
> maven/pypi/... dependencies (with version ranges) such that
> environment merging and dependency resolution can be sanely done), and
> a way for the expansion service to provide such dependencies.
>
> Max wrote up a summary of the prior discussions at
>
> https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8
>
> In the short term, one can build a custom docker image that has all
> the requisite dependencies installed.
>
> This touches on a related but separable issue that one may want to run
> some of these transforms "natively" in the same process as the runner
> (e.g. a Java IO in the Flink Java Runner) rather than via docker.
> (Similarly with subprocess.) Exactly how that works with environment
> specifications is also a bit TBD, but my proposal has been that these
> are best viewed as runner-specific substitutions of standard
> environments.
>

We need a permanent solution for this but for now we have a temporary
solution where additional jar files can be specified through an experiment
when running a Python pipeline:
https://github.com/apache/beam/blob/9678149872de2799ea1643f834f2bec88d346af8/sdks/python/apache_beam/io/external/xlang_parquetio_test.py#L55

Thanks,
Cham


>
> > So the portability expansion system works, and now it’s time to sand off
> some of the rough corners. I’d love to hear others’ thoughts on how to
> resolve some of these remaining issues.
>
> +1
>
>
> On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <chad...@gmail.com> wrote:
> >
> > Hi all,
> > There was some interest in this topic at the Beam Summit this week (btw,
> great job to everyone involved!), so I thought I’d try to summarize the
> current state of things.
> > First, let me explain the idea behind an external transforms for the
> uninitiated.
> >
> > Problem:
> >
> > there’s a transform that you want to use, but it’s not available in your
> desired language. IO connectors are a good example: there are many
> available in the Java SDK, but not so much in Python or Go.
> >
> > Solution:
> >
> > Create a stub transform in your desired language (e.g. Python) whose
> primary role is to serialize the parameters passed to that transform
> > When you run your portable pipeline, just prior to it being sent to the
> Job Service for execution, your stub transform’s payload is first sent to
> the “Expansion Service” that’s running in the native language (Java), where
> the payload is used to construct an instance of the native transform, which
> is then expanded and converted to a protobuf and sent back to the calling
> process (Python).
> > The protobuf representation of the expanded transform gets integrated
> back into the pipeline that you’re submitting
> > Steps 2-3 are repeated for each external transform in your pipeline
> > Then the whole pipeline gets sent to the Job Service to be invoked on
> Flink/Spark/etc
> >
> > ________________________________
> >
> > Now on to my journey to get PubsubIO working in python on Flink.
> >
> > The first issue I encountered was that there was a lot of boilerplate
> involved in serializing the stub python transform’s parameters so they can
> be sent to the expansion service.
> >
> > I created a PR to make this simpler, which has just been merged to
> master: https://github.com/apache/beam/pull/9098
> >
> > With this feature in place, if you’re using python 3.7 you can use a
> dataclass and the typing module to create your transform and describe your
> schema in one go. For example:
> >
> >     @dataclasses.dataclass
> >     class MyAwesomeTransform(beam.ExternalTransform):
> >       URN = 'beam:external:fakeurn:v1'
> >
> >       integer_example: int
> >       string_example: str
> >       list_of_strings: List[str]
> >       optional_kv: Optional[Tuple[str, float]] = None
> >       optional_integer: Optional[int] = None
> >       expansion_service: dataclasses.InitVar[Optional[str]] = None
> >
> > For earlier versions of python, you can use typing.NamedTuple to declare
> your schema.
> >
> >     MyAwesomeSchema = typing.NamedTuple(
> >         'MyAwesomeSchema',
> >         [
> >             ('integer_example', int),
> >             ('string_example', unicode),
> >             ('list_of_strings', List[unicode]),
> >             ('optional_kv', Optional[Tuple[unicode, float]]),
> >             ('optional_integer', Optional[int]),
> >         ]
> >     )
> >
> > There’s also an option to generate the schema implicitly based on the
> value(s) you wish to serialize.
> >
> > There was a slight tangent in implementing this feature in that
> requesting a coder for typing.List resulted in pickle coder instead of
> IterableCoder. That’s bad because only standard/portable coders can be used
> for expansion in Java (for obvious reasons), so as a convenience that was
> solved here: https://github.com/apache/beam/pull/9344
> >
> > The next issue that I encountered was that python did not track the
> boundedness of PCollections, which made it impossible to use the expansion
> service to create unbounded writes. That’s been solved and merged here:
> https://github.com/apache/beam/pull/9426
> >
> > So that brings us to the actual PR for adding external transform support
> for PubsubIO: https://github.com/apache/beam/pull/9268
> >
> > The PR works, but with one big caveat: in order to use it you must build
> your Java containers with this special commit:
> https://github.com/chadrik/beam/commit/d12b99084809ec34fcf0be616e94301d3aca4870
> >
> > That commit solves 2 problems:
> >
> > Adds the pubsub Java deps so that they’re available in our portable
> pipeline
> > Makes the coder for the PubsubIO message-holder type, PubsubMessage,
> available as a standard coder. This is required because both PubsubIO.Read
> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
> objects, but only “standard” (i.e. portable) coders can be used, so we have
> to hack it to make PubsubMessage appear as a standard coder.
> >
> > More details:
> >
> > There’s a similar magic commit required for Kafka external transforms
> > The Jira issue for this problem is here:
> https://jira.apache.org/jira/browse/BEAM-7870
> > For problem #2 above there seems to be some consensus forming around
> using Avro or schema/row coders to send compound types in a portable way.
> Here’s the PR for making row coders portable
> > https://github.com/apache/beam/pull/9188
> > I don’t really have any ideas for problem #1
> >
> > So the portability expansion system works, and now it’s time to sand off
> some of the rough corners. I’d love to hear others’ thoughts on how to
> resolve some of these remaining issues.
> >
> > -chad
>

Reply via email to