Hi all,
There was some interest in this topic at the Beam Summit this week (btw,
great job to everyone involved!), so I thought I’d try to summarize the
current state of things.
First, let me explain the idea behind an external transforms for the
uninitiated.

Problem:

   - there’s a transform that you want to use, but it’s not available in
   your desired language. IO connectors are a good example: there are many
   available in the Java SDK, but not so much in Python or Go.

Solution:

   1. Create a stub transform in your desired language (e.g. Python) whose
   primary role is to serialize the parameters passed to that transform
   2. When you run your portable pipeline, just prior to it being sent to
   the Job Service for execution, your stub transform’s payload is first sent
   to the “Expansion Service” that’s running in the native language (Java),
   where the payload is used to construct an instance of the native transform,
   which is then expanded and converted to a protobuf and sent back to the
   calling process (Python).
   3. The protobuf representation of the expanded transform gets integrated
   back into the pipeline that you’re submitting
   4. Steps 2-3 are repeated for each external transform in your pipeline
   5. Then the whole pipeline gets sent to the Job Service to be invoked on
   Flink/Spark/etc

------------------------------

Now on to my journey to get PubsubIO working in python on Flink.

The first issue I encountered was that there was a lot of boilerplate
involved in serializing the stub python transform’s parameters so they can
be sent to the expansion service.

I created a PR to make this simpler, which has just been merged to master:
https://github.com/apache/beam/pull/9098

With this feature in place, if you’re using python 3.7 you can use a
dataclass and the typing module to create your transform and describe your
schema in one go. For example:

    @dataclasses.dataclass
    class MyAwesomeTransform(beam.ExternalTransform):
      URN = 'beam:external:fakeurn:v1'

      integer_example: int
      string_example: str
      list_of_strings: List[str]
      optional_kv: Optional[Tuple[str, float]] = None
      optional_integer: Optional[int] = None
      expansion_service: dataclasses.InitVar[Optional[str]] = None

For earlier versions of python, you can use typing.NamedTuple to declare
your schema.

    MyAwesomeSchema = typing.NamedTuple(
        'MyAwesomeSchema',
        [
            ('integer_example', int),
            ('string_example', unicode),
            ('list_of_strings', List[unicode]),
            ('optional_kv', Optional[Tuple[unicode, float]]),
            ('optional_integer', Optional[int]),
        ]
    )

There’s also an option to generate the schema implicitly based on the
value(s) you wish to serialize.

There was a slight tangent in implementing this feature in that requesting
a coder for typing.List resulted in pickle coder instead of IterableCoder.
That’s bad because only standard/portable coders can be used for expansion
in Java (for obvious reasons), so as a convenience that was solved here:
https://github.com/apache/beam/pull/9344

The next issue that I encountered was that python did not track the
boundedness of PCollections, which made it impossible to use the expansion
service to create unbounded writes. That’s been solved and merged here:
https://github.com/apache/beam/pull/9426

So that brings us to the actual PR for adding external transform support
for PubsubIO: https://github.com/apache/beam/pull/9268

The PR works, but with one big caveat: in order to use it you must build
your Java containers with this special commit:
https://github.com/chadrik/beam/commit/d12b99084809ec34fcf0be616e94301d3aca4870

That commit solves 2 problems:

   1. Adds the pubsub Java deps so that they’re available in our portable
   pipeline
   2. Makes the coder for the PubsubIO message-holder type, PubsubMessage,
   available as a standard coder. This is required because both PubsubIO.Read
   and PubsubIO.Write expand to ParDos which pass along these PubsubMessage
   objects, but only “standard” (i.e. portable) coders can be used, so we have
   to hack it to make PubsubMessage appear as a standard coder.

More details:

   - There’s a similar magic commit required for Kafka external transforms
   - The Jira issue for this problem is here:
   https://jira.apache.org/jira/browse/BEAM-7870
   - For problem #2 above there seems to be some consensus forming around
   using Avro or schema/row coders to send compound types in a portable way.
   Here’s the PR for making row coders portable
   https://github.com/apache/beam/pull/9188
   - I don’t really have any ideas for problem #1

So the portability expansion system works, and now it’s time to sand off
some of the rough corners. I’d love to hear others’ thoughts on how to
resolve some of these remaining issues.

-chad

Reply via email to