It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka connector, right? I was imagining: Python SDK submits pipeline with a KafkaIO (with URN + payload) maybe bogus contents. It is replaced with a small Flink subgraph, including the native Flink Kafka connector and some compensating transfoms to match the required semantics. To me, this is preferable to making single-runner transform URNs, since that breaks runner portability by definition.
Kenn On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <[email protected]> wrote: > > > On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <[email protected]> wrote: > >> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <[email protected]> wrote: >> >>> The premise of URN + payload is that you can establish a spec. A native >>> override still needs to meet the spec - it may still require some >>> compensating code. Worrying about weird differences between runners seems >>> more about worrying that an adequate spec cannot be determined. >>> >> >> My point exactly. a SDF-based KafkaIO can run in the middle of a >> pipeline. E.g. we could have TextIO producing a list of topics, and the SDF >> KafkaIO run after that on this dynamic (not known until runtime) list of >> topics. If the native Flink source doesn't work this way, then it doesn't >> share the same spec and should have a different URN. >> > > Agree that if they cannot share the same spec, SDF and native transforms > warrant different URNs. Native Kafka might be able to support a PCollection > of topics/partitions as an input though by utilizing underlying native > Flink Kafka connector as a library. On the other hand, we might decide to > expand SDF based ParDos into to other transforms before a runner gets a > chance to override in which case this kind of replacements will not be > possible. > > Thanks, > Cham > > >> >>> Runners will already invoke the SDF differently, so users treating every >>> detail of some implementation as the spec are doomed. >>> >>> Kenn >>> >>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <[email protected]> wrote: >>> >>>> >>>> >>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath < >>>> [email protected]> wrote: >>>> >>>>> >>>>> >>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <[email protected]> >>>>> wrote: >>>>> >>>>>> > Note that a KafkaDoFn still needs to be provided, but could be a >>>>>> DoFn that >>>>>> > fails loudly if it's actually called in the short term rather than >>>>>> a full >>>>>> > Python implementation. >>>>>> >>>>>> For configurable runner-native IO, for now, I think it is reasonable >>>>>> to use a URN + special data payload directly without a KafkaDoFn -- >>>>>> assuming it's a portable pipeline. That's what we do in Go for >>>>>> PubSub-on-Dataflow and something similar would work for Kafka-on-Flink as >>>>>> well. I agree that non-native alternative implementation is desirable, >>>>>> but >>>>>> if one is not present we should IMO rather fail at job submission instead >>>>>> of at runtime. I could imagine connectors intrinsic to an execution >>>>>> engine >>>>>> where non-native implementations are not possible. >>>>>> >>>>> >>>>> I think, in this case, KafkaDoFn can be a SDF that would expand >>>>> similar to any other SDF by default (initial splitting, GBK, and a >>>>> map-task >>>>> equivalent, for example) but a runner (Flink in this case) will be free to >>>>> override it with an runner-native implementation if desired. I assume >>>>> runner will have a chance to perform this override before the SDF >>>>> expansion >>>>> (which is not fully designed yet). Providing a separate source/sink >>>>> transforms for Flink native Kafka will be an option as well, but that will >>>>> be less desirable from a Python user API perspective. >>>>> >>>> >>>> Are we sure that the internal SDF will provide the same functionality >>>> as the native one? What if the Kafka SDF is in the middle of a pipeline - >>>> can Flink support that? Having a separate transform for the Flink native >>>> source might be a better user experience than having one that changes its >>>> behavior in strange ways depending on the runner. >>>> >>>> >>>> >>>>> >>>>> >>>>>> >>>>>> >>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <[email protected]> wrote: >>>>>>> >>>>>>> > Hi Cham, >>>>>>> >>>>>>> > Thanks for the feedback! >>>>>>> >>>>>>> > I should have probably clarified that my POC and questions aren't >>>>>>> specific to Kafka as source, but pretty much any other source/sink >>>>>>> that we >>>>>>> internally use as well. We have existing Flink pipelines that are >>>>>>> written >>>>>>> in Java and we want to use the same connectors with the Python SDK >>>>>>> on top >>>>>>> of the already operationalized Flink stack. Therefore, portability >>>>>>> isn't a >>>>>>> concern as much as the ability to integrate is. >>>>>>> >>>>>> >>>>> Thanks for the clarification. Agree that providing runner-native >>>>> implementations of established source/sinks can be can be desirable in >>>>> some >>>>> cases. >>>>> >>>>> >>>>>>> > --> >>>>>>> >>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath >>>>>>> > <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>> >> Hi Thomas, >>>>>>> >>>>>>> >> Seems like we are working on similar (partially) things :). >>>>>>> >>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming >>>>>>> source >>>>>>> for a Python pipeline executing on the (in-progress) portable Flink >>>>>>> runner. >>>>>>> >>>>>>> >>> We eventually want to use the same native Flink connectors for >>>>>>> sources >>>>>>> and sinks that we also use in other Flink jobs. >>>>>>> >>>>>>> >>>>>>> >> Could you clarify what you mean by same Flink connector ? Do you >>>>>>> mean >>>>>>> that Beam-based and non-Beam-based versions of Flink will use the >>>>>>> same >>>>>>> Kafka connector implementation ? >>>>>>> >>>>>>> >>>>>>> > The native Flink sources as shown in the example below, not the >>>>>>> Beam >>>>>>> KafkaIO or other Beam sources. >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010 >>>>>>> reading >>>>>>> from Kafka and a Python lambda logging the value. The code is here: >>>>>>> >>>>>>> >>>>>>> >>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9 >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>> I'm looking for feedback/opinions on the following items in >>>>>>> particular: >>>>>>> >>>>>>> >>> * Enabling custom translation on the Flink portable runner >>>>>>> (custom >>>>>>> translator could be loaded with ServiceLoader, additional >>>>>>> translations >>>>>>> could also be specified as job server configuration, pipeline >>>>>>> option, ...) >>>>>>> >>>>>>> >>> * For the Python side, is what's shown in the commit the >>>>>>> recommended >>>>>>> way to define a custom transform (it would eventually live in a >>>>>>> reusable >>>>>>> custom module that pipeline authors can import)? Also, the example >>>>>>> does not >>>>>>> have the configuration part covered yet.. >>>>>>> >>>>>>> >>>>>>> >> The only standard unbounded source API offered by Python SDK is >>>>>>> the >>>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying to >>>>>>> add a >>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is >>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently >>>>>>> comparing >>>>>>> different Kafka Python client libraries. Will share more information >>>>>>> on >>>>>>> this soon. >>>>>>> >>>>>>> >> I understand this might not be possible in all cases and we might >>>>>>> want >>>>>>> to consider adding a native source/sink implementations. But this >>>>>>> will >>>>>>> result in the implementation being runner-specific (each runner will >>>>>>> have >>>>>>> to have it's own source/sink implementation). So I think we should >>>>>>> try to >>>>>>> add connector implementations to Beam using the standard API whenever >>>>>>> possible. We also have plans to implement support for cross SDK >>>>>>> transforms >>>>>>> in the future (so that we can utilize Java implementation from >>>>>>> Python for >>>>>>> example) but we are not there yet and we might still want to >>>>>>> implement a >>>>>>> connector for a given SDK if there's good client library support. >>>>>>> >>>>>>> >>>>>>> > It is great that the Python SDK will have connectors that are >>>>>>> written in >>>>>>> Python in the future, but I think it is equally if not more >>>>>>> important to be >>>>>>> able to use at least the Java Beam connectors with Python SDK (and >>>>>>> any >>>>>>> other non-Java SDK). Especially in a fully managed environment it >>>>>>> should be >>>>>>> possible to offer this to users in a way that is largely >>>>>>> transparent. It >>>>>>> takes significant time and effort to mature connectors and I'm not >>>>>>> sure it >>>>>>> is realistic to repeat that for all external systems in multiple >>>>>>> languages. >>>>>>> Or, to put it in another way, it is likely that instead of one over >>>>>>> time >>>>>>> rock solid connector per external system there will be multiple less >>>>>>> mature >>>>>>> implementations. That's also the reason we internally want to use >>>>>>> the Flink >>>>>>> native connectors - we know what they can and cannot do and want to >>>>>>> leverage the existing investment. >>>>>>> >>>>>>> There are two related issues here: how to specify transforms (such as >>>>>>> sources) in a language-independent manner, and how specific runners >>>>>>> can >>>>>>> recognize and run them, but URNs solve both. For this we use URNs: >>>>>>> the >>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse + >>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that >>>>>>> fully >>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and >>>>>>> payload.) A runner that understands these URNs is free to make any >>>>>>> (semantically-equivalent) substitutions it wants for this transform. >>>>>>> >>>>>>> Note that a KafkaDoFn still needs to be provided, but could be a >>>>>>> DoFn that >>>>>>> fails loudly if it's actually called in the short term rather than a >>>>>>> full >>>>>>> Python implementation. Eventually, we would like to be able to call >>>>>>> out to >>>>>>> another SDK to expand full transforms (e.g. more complicated ones >>>>>>> like >>>>>>> BigQueryIO). >>>>>>> >>>>>>> >>> * Cross-language coders: In this example the Kafka source only >>>>>>> considers the message value and uses the byte coder that both sides >>>>>>> understand. If I wanted to pass on the key and possibly other >>>>>>> metadata to >>>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO), >>>>>>> then a >>>>>>> specific coder is needed. Such coder could be written using >>>>>>> protobuf, Avro >>>>>>> etc, but it would also need to be registered. >>>>>>> >>>>>>> >>>>>>> >> I think this requirement goes away if we implement Kafka in >>>>>>> Python SDK. >>>>>>> >>>>>>> > Wouldn't this be needed for any cross language pipeline? Or rather >>>>>>> any >>>>>>> that isn't only using PCollection<byte[]>? Is there a language >>>>>>> agnostic >>>>>>> encoding for KV<?,?>, for example? >>>>>>> >>>>>>> Yes, Coders are also specified by URN (+components and/or payload), >>>>>>> and >>>>>>> there are a couple of standard ones, including KV. See >>>>>>> >>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md >>>>>>> This is not a closed set. >>>>>>> >>>>>>> - Robert >>>>>>> >>>>>>
