I agree with Thomas' sentiment that cross-language IO is very important because of how much work it takes to produce a mature connector implementation in a language. Looking at implementations of BigQueryIO, PubSubIO, KafkaIO, FileIO in Java, only a very daring soul would be tempted to reimplement them entirely in Python and Go.
I'm imagining pretty much what Kenn is describing: a pipeline would specify some transforms by URN + payload, and rely on the runner to do whatever it takes to run this - either by expanding it into a Beam implementation of this transform that the runner chooses to use (could be in the same language or in a different language; either way, the runner would indeed need to invoke the respective SDK to expand it given the parameters), or by doing something entirely runner-specific (e.g. using the built-in Flink Kafka connector). I don't see a reason to require that there *must* exist a Beam implementation of this transform. There only, ideally, must be a runner- and language-agnostic spec for the URN and payload; of course, then the transform is only as portable as the set of runners that implement this URN. I actually really like the idea that the transform can be implemented in a completely runner-specific way without a Beam expansion to back it up - it would let us unblock a lot of the work earlier than full-blown cross-language IO is delivered or even than SDFs work in all languages/runners. On Wed, Apr 25, 2018 at 10:02 PM Kenneth Knowles <[email protected]> wrote: > It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka connector, > right? I was imagining: Python SDK submits pipeline with a KafkaIO (with > URN + payload) maybe bogus contents. It is replaced with a small Flink > subgraph, including the native Flink Kafka connector and some compensating > transfoms to match the required semantics. To me, this is preferable to > making single-runner transform URNs, since that breaks runner portability > by definition. > > Kenn > > On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <[email protected]> > wrote: > >> >> >> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <[email protected]> wrote: >> >>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <[email protected]> wrote: >>> >>>> The premise of URN + payload is that you can establish a spec. A native >>>> override still needs to meet the spec - it may still require some >>>> compensating code. Worrying about weird differences between runners seems >>>> more about worrying that an adequate spec cannot be determined. >>>> >>> >>> My point exactly. a SDF-based KafkaIO can run in the middle of a >>> pipeline. E.g. we could have TextIO producing a list of topics, and the SDF >>> KafkaIO run after that on this dynamic (not known until runtime) list of >>> topics. If the native Flink source doesn't work this way, then it doesn't >>> share the same spec and should have a different URN. >>> >> >> Agree that if they cannot share the same spec, SDF and native transforms >> warrant different URNs. Native Kafka might be able to support a PCollection >> of topics/partitions as an input though by utilizing underlying native >> Flink Kafka connector as a library. On the other hand, we might decide to >> expand SDF based ParDos into to other transforms before a runner gets a >> chance to override in which case this kind of replacements will not be >> possible. >> >> Thanks, >> Cham >> >> >>> >>>> Runners will already invoke the SDF differently, so users treating >>>> every detail of some implementation as the spec are doomed. >>>> >>>> Kenn >>>> >>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <[email protected]> wrote: >>>> >>>>> >>>>> >>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath < >>>>> [email protected]> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> > Note that a KafkaDoFn still needs to be provided, but could be a >>>>>>> DoFn that >>>>>>> > fails loudly if it's actually called in the short term rather than >>>>>>> a full >>>>>>> > Python implementation. >>>>>>> >>>>>>> For configurable runner-native IO, for now, I think it is reasonable >>>>>>> to use a URN + special data payload directly without a KafkaDoFn -- >>>>>>> assuming it's a portable pipeline. That's what we do in Go for >>>>>>> PubSub-on-Dataflow and something similar would work for Kafka-on-Flink >>>>>>> as >>>>>>> well. I agree that non-native alternative implementation is desirable, >>>>>>> but >>>>>>> if one is not present we should IMO rather fail at job submission >>>>>>> instead >>>>>>> of at runtime. I could imagine connectors intrinsic to an execution >>>>>>> engine >>>>>>> where non-native implementations are not possible. >>>>>>> >>>>>> >>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand >>>>>> similar to any other SDF by default (initial splitting, GBK, and a >>>>>> map-task >>>>>> equivalent, for example) but a runner (Flink in this case) will be free >>>>>> to >>>>>> override it with an runner-native implementation if desired. I assume >>>>>> runner will have a chance to perform this override before the SDF >>>>>> expansion >>>>>> (which is not fully designed yet). Providing a separate source/sink >>>>>> transforms for Flink native Kafka will be an option as well, but that >>>>>> will >>>>>> be less desirable from a Python user API perspective. >>>>>> >>>>> >>>>> Are we sure that the internal SDF will provide the same functionality >>>>> as the native one? What if the Kafka SDF is in the middle of a pipeline - >>>>> can Flink support that? Having a separate transform for the Flink native >>>>> source might be a better user experience than having one that changes its >>>>> behavior in strange ways depending on the runner. >>>>> >>>>> >>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>> > Hi Cham, >>>>>>>> >>>>>>>> > Thanks for the feedback! >>>>>>>> >>>>>>>> > I should have probably clarified that my POC and questions aren't >>>>>>>> specific to Kafka as source, but pretty much any other source/sink >>>>>>>> that we >>>>>>>> internally use as well. We have existing Flink pipelines that are >>>>>>>> written >>>>>>>> in Java and we want to use the same connectors with the Python SDK >>>>>>>> on top >>>>>>>> of the already operationalized Flink stack. Therefore, portability >>>>>>>> isn't a >>>>>>>> concern as much as the ability to integrate is. >>>>>>>> >>>>>>> >>>>>> Thanks for the clarification. Agree that providing runner-native >>>>>> implementations of established source/sinks can be can be desirable in >>>>>> some >>>>>> cases. >>>>>> >>>>>> >>>>>>>> > --> >>>>>>>> >>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath >>>>>>>> > <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>> >> Hi Thomas, >>>>>>>> >>>>>>>> >> Seems like we are working on similar (partially) things :). >>>>>>>> >>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming >>>>>>>> source >>>>>>>> for a Python pipeline executing on the (in-progress) portable Flink >>>>>>>> runner. >>>>>>>> >>>>>>>> >>> We eventually want to use the same native Flink connectors for >>>>>>>> sources >>>>>>>> and sinks that we also use in other Flink jobs. >>>>>>>> >>>>>>>> >>>>>>>> >> Could you clarify what you mean by same Flink connector ? Do you >>>>>>>> mean >>>>>>>> that Beam-based and non-Beam-based versions of Flink will use the >>>>>>>> same >>>>>>>> Kafka connector implementation ? >>>>>>>> >>>>>>>> >>>>>>>> > The native Flink sources as shown in the example below, not the >>>>>>>> Beam >>>>>>>> KafkaIO or other Beam sources. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010 >>>>>>>> reading >>>>>>>> from Kafka and a Python lambda logging the value. The code is here: >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9 >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>> I'm looking for feedback/opinions on the following items in >>>>>>>> particular: >>>>>>>> >>>>>>>> >>> * Enabling custom translation on the Flink portable runner >>>>>>>> (custom >>>>>>>> translator could be loaded with ServiceLoader, additional >>>>>>>> translations >>>>>>>> could also be specified as job server configuration, pipeline >>>>>>>> option, ...) >>>>>>>> >>>>>>>> >>> * For the Python side, is what's shown in the commit the >>>>>>>> recommended >>>>>>>> way to define a custom transform (it would eventually live in a >>>>>>>> reusable >>>>>>>> custom module that pipeline authors can import)? Also, the example >>>>>>>> does not >>>>>>>> have the configuration part covered yet.. >>>>>>>> >>>>>>>> >>>>>>>> >> The only standard unbounded source API offered by Python SDK is >>>>>>>> the >>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying to >>>>>>>> add a >>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is >>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently >>>>>>>> comparing >>>>>>>> different Kafka Python client libraries. Will share more >>>>>>>> information on >>>>>>>> this soon. >>>>>>>> >>>>>>>> >> I understand this might not be possible in all cases and we >>>>>>>> might want >>>>>>>> to consider adding a native source/sink implementations. But this >>>>>>>> will >>>>>>>> result in the implementation being runner-specific (each runner >>>>>>>> will have >>>>>>>> to have it's own source/sink implementation). So I think we should >>>>>>>> try to >>>>>>>> add connector implementations to Beam using the standard API >>>>>>>> whenever >>>>>>>> possible. We also have plans to implement support for cross SDK >>>>>>>> transforms >>>>>>>> in the future (so that we can utilize Java implementation from >>>>>>>> Python for >>>>>>>> example) but we are not there yet and we might still want to >>>>>>>> implement a >>>>>>>> connector for a given SDK if there's good client library support. >>>>>>>> >>>>>>>> >>>>>>>> > It is great that the Python SDK will have connectors that are >>>>>>>> written in >>>>>>>> Python in the future, but I think it is equally if not more >>>>>>>> important to be >>>>>>>> able to use at least the Java Beam connectors with Python SDK (and >>>>>>>> any >>>>>>>> other non-Java SDK). Especially in a fully managed environment it >>>>>>>> should be >>>>>>>> possible to offer this to users in a way that is largely >>>>>>>> transparent. It >>>>>>>> takes significant time and effort to mature connectors and I'm not >>>>>>>> sure it >>>>>>>> is realistic to repeat that for all external systems in multiple >>>>>>>> languages. >>>>>>>> Or, to put it in another way, it is likely that instead of one over >>>>>>>> time >>>>>>>> rock solid connector per external system there will be multiple >>>>>>>> less mature >>>>>>>> implementations. That's also the reason we internally want to use >>>>>>>> the Flink >>>>>>>> native connectors - we know what they can and cannot do and want to >>>>>>>> leverage the existing investment. >>>>>>>> >>>>>>>> There are two related issues here: how to specify transforms (such >>>>>>>> as >>>>>>>> sources) in a language-independent manner, and how specific runners >>>>>>>> can >>>>>>>> recognize and run them, but URNs solve both. For this we use URNs: >>>>>>>> the >>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse + >>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that >>>>>>>> fully >>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and >>>>>>>> payload.) A runner that understands these URNs is free to make any >>>>>>>> (semantically-equivalent) substitutions it wants for this transform. >>>>>>>> >>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be a >>>>>>>> DoFn that >>>>>>>> fails loudly if it's actually called in the short term rather than >>>>>>>> a full >>>>>>>> Python implementation. Eventually, we would like to be able to call >>>>>>>> out to >>>>>>>> another SDK to expand full transforms (e.g. more complicated ones >>>>>>>> like >>>>>>>> BigQueryIO). >>>>>>>> >>>>>>>> >>> * Cross-language coders: In this example the Kafka source only >>>>>>>> considers the message value and uses the byte coder that both sides >>>>>>>> understand. If I wanted to pass on the key and possibly other >>>>>>>> metadata to >>>>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO), >>>>>>>> then a >>>>>>>> specific coder is needed. Such coder could be written using >>>>>>>> protobuf, Avro >>>>>>>> etc, but it would also need to be registered. >>>>>>>> >>>>>>>> >>>>>>>> >> I think this requirement goes away if we implement Kafka in >>>>>>>> Python SDK. >>>>>>>> >>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or >>>>>>>> rather any >>>>>>>> that isn't only using PCollection<byte[]>? Is there a language >>>>>>>> agnostic >>>>>>>> encoding for KV<?,?>, for example? >>>>>>>> >>>>>>>> Yes, Coders are also specified by URN (+components and/or payload), >>>>>>>> and >>>>>>>> there are a couple of standard ones, including KV. See >>>>>>>> >>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md >>>>>>>> This is not a closed set. >>>>>>>> >>>>>>>> - Robert >>>>>>>> >>>>>>>
