+1 I agree with what Robert said and Davor laid out in more detail. Portability is one of the primary concerns of Beam.
On Thu, 28 Apr 2016 at 18:27 Davor Bonaci <[email protected]> wrote: > Generally speaking, the SDKs define all user APIs, including all IOs. We > should strive that users never use any runner-specific APIs directly. As > such, there should be no runner-provided IOs visible to user. ( Of course, > some exceptions will have to apply, such as runner-specific configuration > via PipelineOptions, etc. ) > > All SDK-provided IO should be written in terms of Source / Sink API. All > runners should support running pipelines that use this APIs. In that world, > all IOs would run on all runners. However, neither of this is true > currently: > > - We used to have all sources and sink implemented differently in a > runner-native way. Over the last month, we have converted TextIO, > AvroIO, > and BigQueryIO.Write to follow this design. (Thanks Luke and Pei!) > - BigQueryIO.Read and PubsubIO are the last pieces left in the old > design, and there are pending PRs to address those. (Thanks Pei and > Mark!) > - Neither Flink or Spark fully support Source / Sink API, AFAIK. There > are outstanding JIRA issues to address those -- those are high priority, > IMO. > > At execution time, any runner is free to replace the SDK-provided IO with a > runner-native one, as appropriate. For example, a runner may have a faster > implementation than the SDK-provided one. That choice should be > transparent, and made by the runner, not the user. > > (Aside, this is why the runner API is so important -- that runners have > enough information to make the right choice on behalf of the user, without > needing to delegate implementation details to users -- no user knobs!) > > This is the current design, which we believe addresses all scenarios we > care about: > > - All IOs run on all runners. > - Any runner can provide a better or faster runner-native implementation > of any IO. > - Users are abstracted away from all implementation details. > - All pipelines are runner-portable, because users don't use any > runner-specific APIs directly. > > > On Thu, Apr 28, 2016 at 8:33 AM, Amit Sela <[email protected]> wrote: > > > From the Spark runner point of view, the implementation of the KafkaIO > (for > > example) is to define the "settings" required to read from Kafka and > from a > > quick look at the SDK's kafkaIO, it looks like it could be used instead > of > > the runner's implementation (and if not now, then probably once Spark > > supports Kafka 0.9 connector API). > > > > As for the bigger picture here, as far as I can see, IOs *translation > *will > > always be runner-specific because they either create whatever > PCollections > > represent from external source output from whatever PCollections > represent. > > So I think translation will always be runner-specific for IOs. > > > > Back to the IOs themselves, the SDK should allow the runner to extend > it's > > implementation of the IO if and where needed, so if the KafkaIO is > missing > > Encoder/Decoder kafka serializer settings, it could just add those. > > > > Does this make sense ? > > > > > > On Thu, Apr 28, 2016 at 3:45 PM Jean-Baptiste Onofré <[email protected]> > > wrote: > > > > > Hi all, > > > > > > regarding the recent threads on the mailing list, I would like to start > > > a format discussion around the IO. > > > As we can expect the first contributions on this area (I already have > > > some work in progress around this ;)), I think it's a fair discussion > to > > > have. > > > > > > Now, we have two kinds of IO: the one "generic" to Beam, the one > "local" > > > to the runners. > > > > > > For example, let's take Kafka: we have the KafkaIO (in IO), and for > > > instance, we have the spark-streaming kafka connector (in Spark > Runner). > > > > > > Right now, we have two approaches for the user: > > > 1. In the pipeline, we use KafkaIO from Beam: it's the preferred > > > approach for sure. However, the user may want to use the runner > specific > > > IO for two reasons: > > > * Beam doesn't provide the IO yet (for instance, spark > cassandra > > > connector is available whereas we don't have yet any CassandraIO (I'm > > > working on it anyway ;)) > > > * The runner native IO is optimized or contain more features > that > > > the > > > Beam native IO > > > 2. So, for the previous reasons, the user could want to use the native > > > runner IO. The drawback of this approach is that the pipeline will be > > > tight to a specific runner, which is completely against the Beam > design. > > > > > > I wonder if it wouldn't make sense to add flag on the IO API (and > > > related on Runner API) like .useNative(). > > > > > > For instance, the user would be able to do: > > > > > > > > > > > > pipeline.apply(KafkaIO.read().withBootstrapServers("...").withTopics("...").useNative(true); > > > > > > then, if the runner has a "native" IO, it will use it, else, if > > > useNative(false) (the default), it won't use any runner native IO. > > > > > > The point there is for the configuration: assuming the Beam IO and the > > > runner IO can differ, it means that the "Beam IO" would have to > populate > > > all runner specific IO configuration. > > > > > > Of course, it's always possible to use a PTransform to wrap the runner > > > native IO, but we are back on the same concern: the pipeline will be > > > couple to a specific runner. > > > > > > The purpose of the useNative() flag is to "automatically" inform the > > > runner to use a specific IO if it has one: the pipeline stays decoupled > > > from the runners. > > > > > > Thoughts ? > > > > > > Thanks > > > Regards > > > JB > > > -- > > > Jean-Baptiste Onofré > > > [email protected] > > > http://blog.nanthrax.net > > > Talend - http://www.talend.com > > > > > >
