Generally speaking, the SDKs define all user APIs, including all IOs. We should strive that users never use any runner-specific APIs directly. As such, there should be no runner-provided IOs visible to user. ( Of course, some exceptions will have to apply, such as runner-specific configuration via PipelineOptions, etc. )
All SDK-provided IO should be written in terms of Source / Sink API. All runners should support running pipelines that use this APIs. In that world, all IOs would run on all runners. However, neither of this is true currently: - We used to have all sources and sink implemented differently in a runner-native way. Over the last month, we have converted TextIO, AvroIO, and BigQueryIO.Write to follow this design. (Thanks Luke and Pei!) - BigQueryIO.Read and PubsubIO are the last pieces left in the old design, and there are pending PRs to address those. (Thanks Pei and Mark!) - Neither Flink or Spark fully support Source / Sink API, AFAIK. There are outstanding JIRA issues to address those -- those are high priority, IMO. At execution time, any runner is free to replace the SDK-provided IO with a runner-native one, as appropriate. For example, a runner may have a faster implementation than the SDK-provided one. That choice should be transparent, and made by the runner, not the user. (Aside, this is why the runner API is so important -- that runners have enough information to make the right choice on behalf of the user, without needing to delegate implementation details to users -- no user knobs!) This is the current design, which we believe addresses all scenarios we care about: - All IOs run on all runners. - Any runner can provide a better or faster runner-native implementation of any IO. - Users are abstracted away from all implementation details. - All pipelines are runner-portable, because users don't use any runner-specific APIs directly. On Thu, Apr 28, 2016 at 8:33 AM, Amit Sela <amitsel...@gmail.com> wrote: > From the Spark runner point of view, the implementation of the KafkaIO (for > example) is to define the "settings" required to read from Kafka and from a > quick look at the SDK's kafkaIO, it looks like it could be used instead of > the runner's implementation (and if not now, then probably once Spark > supports Kafka 0.9 connector API). > > As for the bigger picture here, as far as I can see, IOs *translation *will > always be runner-specific because they either create whatever PCollections > represent from external source output from whatever PCollections represent. > So I think translation will always be runner-specific for IOs. > > Back to the IOs themselves, the SDK should allow the runner to extend it's > implementation of the IO if and where needed, so if the KafkaIO is missing > Encoder/Decoder kafka serializer settings, it could just add those. > > Does this make sense ? > > > On Thu, Apr 28, 2016 at 3:45 PM Jean-Baptiste Onofré <j...@nanthrax.net> > wrote: > > > Hi all, > > > > regarding the recent threads on the mailing list, I would like to start > > a format discussion around the IO. > > As we can expect the first contributions on this area (I already have > > some work in progress around this ;)), I think it's a fair discussion to > > have. > > > > Now, we have two kinds of IO: the one "generic" to Beam, the one "local" > > to the runners. > > > > For example, let's take Kafka: we have the KafkaIO (in IO), and for > > instance, we have the spark-streaming kafka connector (in Spark Runner). > > > > Right now, we have two approaches for the user: > > 1. In the pipeline, we use KafkaIO from Beam: it's the preferred > > approach for sure. However, the user may want to use the runner specific > > IO for two reasons: > > * Beam doesn't provide the IO yet (for instance, spark cassandra > > connector is available whereas we don't have yet any CassandraIO (I'm > > working on it anyway ;)) > > * The runner native IO is optimized or contain more features that > > the > > Beam native IO > > 2. So, for the previous reasons, the user could want to use the native > > runner IO. The drawback of this approach is that the pipeline will be > > tight to a specific runner, which is completely against the Beam design. > > > > I wonder if it wouldn't make sense to add flag on the IO API (and > > related on Runner API) like .useNative(). > > > > For instance, the user would be able to do: > > > > > > > pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true); > > > > then, if the runner has a "native" IO, it will use it, else, if > > useNative(false) (the default), it won't use any runner native IO. > > > > The point there is for the configuration: assuming the Beam IO and the > > runner IO can differ, it means that the "Beam IO" would have to populate > > all runner specific IO configuration. > > > > Of course, it's always possible to use a PTransform to wrap the runner > > native IO, but we are back on the same concern: the pipeline will be > > couple to a specific runner. > > > > The purpose of the useNative() flag is to "automatically" inform the > > runner to use a specific IO if it has one: the pipeline stays decoupled > > from the runners. > > > > Thoughts ? > > > > Thanks > > Regards > > JB > > -- > > Jean-Baptiste Onofré > > jbono...@apache.org > > http://blog.nanthrax.net > > Talend - http://www.talend.com > > >