Thanks for bringing this up again. My thoughts on the open questions below.

On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <chad...@gmail.com> wrote:
> That commit solves 2 problems:
>
> Adds the pubsub Java deps so that they’re available in our portable pipeline
> Makes the coder for the PubsubIO message-holder type, PubsubMessage, 
> available as a standard coder. This is required because both PubsubIO.Read 
> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage 
> objects, but only “standard” (i.e. portable) coders can be used, so we have 
> to hack it to make PubsubMessage appear as a standard coder.
>
> More details:
>
> There’s a similar magic commit required for Kafka external transforms
> The Jira issue for this problem is here: 
> https://jira.apache.org/jira/browse/BEAM-7870
> For problem #2 above there seems to be some consensus forming around using 
> Avro or schema/row coders to send compound types in a portable way. Here’s 
> the PR for making row coders portable
> https://github.com/apache/beam/pull/9188

+1. Note that this doesn't mean that the IO itself must produce rows;
part of the Schema work in Java is to make it easy to automatically
convert from various Java classes to schemas transparently, so this
same logic that would allow one to apply an SQL filter directly to a
Kafka/PubSub read would allow cross-language. Even if that doesn't
work, we need not uglify the Java API; we can have an
option/alternative transform that appends the convert-to-Row DoFn for
easier use by external (though the goal of the former work is to make
this step unnecissary).

> I don’t really have any ideas for problem #1

The crux of the issue here is that the jobs API was not designed with
cross-language in mind, and so the artifact API ties artifacts to jobs
rather than to environments. To solve this we need to augment the
notion of environment to allow the specification of additional
dependencies (e.g. jar files in this specific case, or better as
maven/pypi/... dependencies (with version ranges) such that
environment merging and dependency resolution can be sanely done), and
a way for the expansion service to provide such dependencies.

Max wrote up a summary of the prior discussions at
https://docs.google.com/document/d/1XaiNekAY2sptuQRIXpjGAyaYdSc-wlJ-VKjl04c8N48/edit#heading=h.900gc947qrw8

In the short term, one can build a custom docker image that has all
the requisite dependencies installed.

This touches on a related but separable issue that one may want to run
some of these transforms "natively" in the same process as the runner
(e.g. a Java IO in the Flink Java Runner) rather than via docker.
(Similarly with subprocess.) Exactly how that works with environment
specifications is also a bit TBD, but my proposal has been that these
are best viewed as runner-specific substitutions of standard
environments.

> So the portability expansion system works, and now it’s time to sand off some 
> of the rough corners. I’d love to hear others’ thoughts on how to resolve 
> some of these remaining issues.

+1


On Mon, Sep 16, 2019 at 11:51 AM Chad Dombrova <chad...@gmail.com> wrote:
>
> Hi all,
> There was some interest in this topic at the Beam Summit this week (btw, 
> great job to everyone involved!), so I thought I’d try to summarize the 
> current state of things.
> First, let me explain the idea behind an external transforms for the 
> uninitiated.
>
> Problem:
>
> there’s a transform that you want to use, but it’s not available in your 
> desired language. IO connectors are a good example: there are many available 
> in the Java SDK, but not so much in Python or Go.
>
> Solution:
>
> Create a stub transform in your desired language (e.g. Python) whose primary 
> role is to serialize the parameters passed to that transform
> When you run your portable pipeline, just prior to it being sent to the Job 
> Service for execution, your stub transform’s payload is first sent to the 
> “Expansion Service” that’s running in the native language (Java), where the 
> payload is used to construct an instance of the native transform, which is 
> then expanded and converted to a protobuf and sent back to the calling 
> process (Python).
> The protobuf representation of the expanded transform gets integrated back 
> into the pipeline that you’re submitting
> Steps 2-3 are repeated for each external transform in your pipeline
> Then the whole pipeline gets sent to the Job Service to be invoked on 
> Flink/Spark/etc
>
> ________________________________
>
> Now on to my journey to get PubsubIO working in python on Flink.
>
> The first issue I encountered was that there was a lot of boilerplate 
> involved in serializing the stub python transform’s parameters so they can be 
> sent to the expansion service.
>
> I created a PR to make this simpler, which has just been merged to master: 
> https://github.com/apache/beam/pull/9098
>
> With this feature in place, if you’re using python 3.7 you can use a 
> dataclass and the typing module to create your transform and describe your 
> schema in one go. For example:
>
>     @dataclasses.dataclass
>     class MyAwesomeTransform(beam.ExternalTransform):
>       URN = 'beam:external:fakeurn:v1'
>
>       integer_example: int
>       string_example: str
>       list_of_strings: List[str]
>       optional_kv: Optional[Tuple[str, float]] = None
>       optional_integer: Optional[int] = None
>       expansion_service: dataclasses.InitVar[Optional[str]] = None
>
> For earlier versions of python, you can use typing.NamedTuple to declare your 
> schema.
>
>     MyAwesomeSchema = typing.NamedTuple(
>         'MyAwesomeSchema',
>         [
>             ('integer_example', int),
>             ('string_example', unicode),
>             ('list_of_strings', List[unicode]),
>             ('optional_kv', Optional[Tuple[unicode, float]]),
>             ('optional_integer', Optional[int]),
>         ]
>     )
>
> There’s also an option to generate the schema implicitly based on the 
> value(s) you wish to serialize.
>
> There was a slight tangent in implementing this feature in that requesting a 
> coder for typing.List resulted in pickle coder instead of IterableCoder. 
> That’s bad because only standard/portable coders can be used for expansion in 
> Java (for obvious reasons), so as a convenience that was solved here: 
> https://github.com/apache/beam/pull/9344
>
> The next issue that I encountered was that python did not track the 
> boundedness of PCollections, which made it impossible to use the expansion 
> service to create unbounded writes. That’s been solved and merged here: 
> https://github.com/apache/beam/pull/9426
>
> So that brings us to the actual PR for adding external transform support for 
> PubsubIO: https://github.com/apache/beam/pull/9268
>
> The PR works, but with one big caveat: in order to use it you must build your 
> Java containers with this special commit: 
> https://github.com/chadrik/beam/commit/d12b99084809ec34fcf0be616e94301d3aca4870
>
> That commit solves 2 problems:
>
> Adds the pubsub Java deps so that they’re available in our portable pipeline
> Makes the coder for the PubsubIO message-holder type, PubsubMessage, 
> available as a standard coder. This is required because both PubsubIO.Read 
> and PubsubIO.Write expand to ParDos which pass along these PubsubMessage 
> objects, but only “standard” (i.e. portable) coders can be used, so we have 
> to hack it to make PubsubMessage appear as a standard coder.
>
> More details:
>
> There’s a similar magic commit required for Kafka external transforms
> The Jira issue for this problem is here: 
> https://jira.apache.org/jira/browse/BEAM-7870
> For problem #2 above there seems to be some consensus forming around using 
> Avro or schema/row coders to send compound types in a portable way. Here’s 
> the PR for making row coders portable
> https://github.com/apache/beam/pull/9188
> I don’t really have any ideas for problem #1
>
> So the portability expansion system works, and now it’s time to sand off some 
> of the rough corners. I’d love to hear others’ thoughts on how to resolve 
> some of these remaining issues.
>
> -chad

Reply via email to