Generally speaking, the SDKs define all user APIs, including all IOs. We
should strive that users never use any runner-specific APIs directly. As
such, there should be no runner-provided IOs visible to user. ( Of course,
some exceptions will have to apply, such as runner-specific configuration
via PipelineOptions, etc. )

All SDK-provided IO should be written in terms of Source / Sink API. All
runners should support running pipelines that use this APIs. In that world,
all IOs would run on all runners. However, neither of this is true
currently:

   - We used to have all sources and sink implemented differently in a
   runner-native way. Over the last month, we have converted TextIO, AvroIO,
   and BigQueryIO.Write to follow this design. (Thanks Luke and Pei!)
   - BigQueryIO.Read and PubsubIO are the last pieces left in the old
   design, and there are pending PRs to address those. (Thanks Pei and Mark!)
   - Neither Flink or Spark fully support Source / Sink API, AFAIK. There
   are outstanding JIRA issues to address those -- those are high priority,
   IMO.

At execution time, any runner is free to replace the SDK-provided IO with a
runner-native one, as appropriate. For example, a runner may have a faster
implementation than the SDK-provided one. That choice should be
transparent, and made by the runner, not the user.

(Aside, this is why the runner API is so important -- that runners have
enough information to make the right choice on behalf of the user, without
needing to delegate implementation details to users -- no user knobs!)

This is the current design, which we believe addresses all scenarios we
care about:

   - All IOs run on all runners.
   - Any runner can provide a better or faster runner-native implementation
   of any IO.
   - Users are abstracted away from all implementation details.
   - All pipelines are runner-portable, because users don't use any
   runner-specific APIs directly.


On Thu, Apr 28, 2016 at 8:33 AM, Amit Sela <amitsel...@gmail.com> wrote:

> From the Spark runner point of view, the implementation of the KafkaIO (for
> example) is to define the "settings" required to read from Kafka and from a
> quick look at the SDK's kafkaIO, it looks like it could be used instead of
> the runner's implementation (and if not now, then probably once Spark
> supports Kafka 0.9 connector API).
>
> As for the bigger picture here, as far as I can see, IOs *translation *will
> always be runner-specific because they either create whatever PCollections
> represent from external source output from whatever PCollections represent.
> So I think translation will always be runner-specific for IOs.
>
> Back to the IOs themselves, the SDK should allow the runner to extend it's
> implementation of the IO if and where needed, so if the KafkaIO is missing
> Encoder/Decoder kafka serializer settings, it could just add those.
>
> Does this make sense ?
>
>
> On Thu, Apr 28, 2016 at 3:45 PM Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
> > Hi all,
> >
> > regarding the recent threads on the mailing list, I would like to start
> > a format discussion around the IO.
> > As we can expect the first contributions on this area (I already have
> > some work in progress around this ;)), I think it's a fair discussion to
> > have.
> >
> > Now, we have two kinds of IO: the one "generic" to Beam, the one "local"
> > to the runners.
> >
> > For example, let's take Kafka: we have the KafkaIO (in IO), and for
> > instance, we have the spark-streaming kafka connector (in Spark Runner).
> >
> > Right now, we have two approaches for the user:
> > 1. In the pipeline, we use KafkaIO from Beam: it's the preferred
> > approach for sure. However, the user may want to use the runner specific
> > IO for two reasons:
> >         * Beam doesn't provide the IO yet (for instance, spark cassandra
> > connector is available whereas we don't have yet any CassandraIO (I'm
> > working on it anyway ;))
> >         * The runner native IO is optimized or contain more features that
> > the
> > Beam native IO
> > 2. So, for the previous reasons, the user could want to use the native
> > runner IO. The drawback of this approach is that the pipeline will be
> > tight to a specific runner, which is completely against the Beam design.
> >
> > I wonder if it wouldn't make sense to add flag on the IO API (and
> > related on Runner API) like .useNative().
> >
> > For instance, the user would be able to do:
> >
> >
> >
> pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true);
> >
> > then, if the runner has a "native" IO, it will use it, else, if
> > useNative(false) (the default), it won't use any runner native IO.
> >
> > The point there is for the configuration: assuming the Beam IO and the
> > runner IO can differ, it means that the "Beam IO" would have to populate
> > all runner specific IO configuration.
> >
> > Of course, it's always possible to use a PTransform to wrap the runner
> > native IO, but we are back on the same concern: the pipeline will be
> > couple to a specific runner.
> >
> > The purpose of the useNative() flag is to "automatically" inform the
> > runner to use a specific IO if it has one: the pipeline stays decoupled
> > from the runners.
> >
> > Thoughts ?
> >
> > Thanks
> > Regards
> > JB
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>

Reply via email to