On Thu, Apr 26, 2018 at 8:38 PM Chamikara Jayalath <[email protected]> wrote:
> > > On Thu, Apr 26, 2018 at 5:59 PM Eugene Kirpichov <[email protected]> > wrote: > >> I agree with Thomas' sentiment that cross-language IO is very important >> because of how much work it takes to produce a mature connector >> implementation in a language. Looking at implementations of BigQueryIO, >> PubSubIO, KafkaIO, FileIO in Java, only a very daring soul would be tempted >> to reimplement them entirely in Python and Go. >> >> I'm imagining pretty much what Kenn is describing: a pipeline would >> specify some transforms by URN + payload, and rely on the runner to do >> whatever it takes to run this - either by expanding it into a Beam >> implementation of this transform that the runner chooses to use (could be >> in the same language or in a different language; either way, the runner >> would indeed need to invoke the respective SDK to expand it given the >> parameters), or by doing something entirely runner-specific (e.g. using the >> built-in Flink Kafka connector). >> >> I don't see a reason to require that there *must* exist a Beam >> implementation of this transform. There only, ideally, must be a runner- >> and language-agnostic spec for the URN and payload; of course, then the >> transform is only as portable as the set of runners that implement this URN. >> > > For a transform in general it's true that we don't need a Beam > implementation, but more specifically for IOs I think there are many > benefits to having the implementation in Beam. For example, > > - IO connector will offer same behavior and feature set across various > runners/SDKs. > - Beam community will be able to view/modify/improve the IO connector. > - existing IO connectors will serve as examples for users who wish to > develop new IO connectors > > > - More runners will be able to execute the users pipeline. >> I actually really like the idea that the transform can be implemented in >> a completely runner-specific way without a Beam expansion to back it up - >> it would let us unblock a lot of the work earlier than full-blown >> cross-language IO is delivered or even than SDFs work in all >> languages/runners. >> > > If there are existing established connectors (for example, Kafka for Flink > in this case) I agree. But for anybody developing a new IO connector, I > think we should encourage developing that in Beam (in some SDK) given that > the connector will be available to all runners (and to all SDKs once we > have cross-language transforms). > > Thanks, > Cham > > >> >> On Wed, Apr 25, 2018 at 10:02 PM Kenneth Knowles <[email protected]> wrote: >> >>> It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka connector, >>> right? I was imagining: Python SDK submits pipeline with a KafkaIO (with >>> URN + payload) maybe bogus contents. It is replaced with a small Flink >>> subgraph, including the native Flink Kafka connector and some compensating >>> transfoms to match the required semantics. To me, this is preferable to >>> making single-runner transform URNs, since that breaks runner portability >>> by definition. >>> >>> Kenn >>> >>> On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <[email protected]> >>> wrote: >>> >>>> >>>> >>>> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <[email protected]> >>>>> wrote: >>>>> >>>>>> The premise of URN + payload is that you can establish a spec. A >>>>>> native override still needs to meet the spec - it may still require some >>>>>> compensating code. Worrying about weird differences between runners seems >>>>>> more about worrying that an adequate spec cannot be determined. >>>>>> >>>>> >>>>> My point exactly. a SDF-based KafkaIO can run in the middle of a >>>>> pipeline. E.g. we could have TextIO producing a list of topics, and the >>>>> SDF >>>>> KafkaIO run after that on this dynamic (not known until runtime) list of >>>>> topics. If the native Flink source doesn't work this way, then it doesn't >>>>> share the same spec and should have a different URN. >>>>> >>>> >>>> Agree that if they cannot share the same spec, SDF and native >>>> transforms warrant different URNs. Native Kafka might be able to support a >>>> PCollection of topics/partitions as an input though by utilizing underlying >>>> native Flink Kafka connector as a library. On the other hand, we might >>>> decide to expand SDF based ParDos into to other transforms before a runner >>>> gets a chance to override in which case this kind of replacements will not >>>> be possible. >>>> >>>> Thanks, >>>> Cham >>>> >>>> >>>>> >>>>>> Runners will already invoke the SDF differently, so users treating >>>>>> every detail of some implementation as the spec are doomed. >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <[email protected]> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> > Note that a KafkaDoFn still needs to be provided, but could be >>>>>>>>> a DoFn that >>>>>>>>> > fails loudly if it's actually called in the short term rather >>>>>>>>> than a full >>>>>>>>> > Python implementation. >>>>>>>>> >>>>>>>>> For configurable runner-native IO, for now, I think it is >>>>>>>>> reasonable to use a URN + special data payload directly without a >>>>>>>>> KafkaDoFn >>>>>>>>> -- assuming it's a portable pipeline. That's what we do in Go for >>>>>>>>> PubSub-on-Dataflow and something similar would work for >>>>>>>>> Kafka-on-Flink as >>>>>>>>> well. I agree that non-native alternative implementation is >>>>>>>>> desirable, but >>>>>>>>> if one is not present we should IMO rather fail at job submission >>>>>>>>> instead >>>>>>>>> of at runtime. I could imagine connectors intrinsic to an execution >>>>>>>>> engine >>>>>>>>> where non-native implementations are not possible. >>>>>>>>> >>>>>>>> >>>>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand >>>>>>>> similar to any other SDF by default (initial splitting, GBK, and a >>>>>>>> map-task >>>>>>>> equivalent, for example) but a runner (Flink in this case) will be >>>>>>>> free to >>>>>>>> override it with an runner-native implementation if desired. I assume >>>>>>>> runner will have a chance to perform this override before the SDF >>>>>>>> expansion >>>>>>>> (which is not fully designed yet). Providing a separate source/sink >>>>>>>> transforms for Flink native Kafka will be an option as well, but that >>>>>>>> will >>>>>>>> be less desirable from a Python user API perspective. >>>>>>>> >>>>>>> >>>>>>> Are we sure that the internal SDF will provide the same >>>>>>> functionality as the native one? What if the Kafka SDF is in the middle >>>>>>> of >>>>>>> a pipeline - can Flink support that? Having a separate transform for the >>>>>>> Flink native source might be a better user experience than having one >>>>>>> that >>>>>>> changes its behavior in strange ways depending on the runner. >>>>>>> >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> > Hi Cham, >>>>>>>>>> >>>>>>>>>> > Thanks for the feedback! >>>>>>>>>> >>>>>>>>>> > I should have probably clarified that my POC and questions >>>>>>>>>> aren't >>>>>>>>>> specific to Kafka as source, but pretty much any other >>>>>>>>>> source/sink that we >>>>>>>>>> internally use as well. We have existing Flink pipelines that are >>>>>>>>>> written >>>>>>>>>> in Java and we want to use the same connectors with the Python >>>>>>>>>> SDK on top >>>>>>>>>> of the already operationalized Flink stack. Therefore, >>>>>>>>>> portability isn't a >>>>>>>>>> concern as much as the ability to integrate is. >>>>>>>>>> >>>>>>>>> >>>>>>>> Thanks for the clarification. Agree that providing runner-native >>>>>>>> implementations of established source/sinks can be can be desirable in >>>>>>>> some >>>>>>>> cases. >>>>>>>> >>>>>>>> >>>>>>>>>> > --> >>>>>>>>>> >>>>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath >>>>>>>>>> > <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> >> Hi Thomas, >>>>>>>>>> >>>>>>>>>> >> Seems like we are working on similar (partially) things :). >>>>>>>>>> >>>>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming >>>>>>>>>> source >>>>>>>>>> for a Python pipeline executing on the (in-progress) portable >>>>>>>>>> Flink runner. >>>>>>>>>> >>>>>>>>>> >>> We eventually want to use the same native Flink connectors >>>>>>>>>> for sources >>>>>>>>>> and sinks that we also use in other Flink jobs. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >> Could you clarify what you mean by same Flink connector ? Do >>>>>>>>>> you mean >>>>>>>>>> that Beam-based and non-Beam-based versions of Flink will use the >>>>>>>>>> same >>>>>>>>>> Kafka connector implementation ? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> > The native Flink sources as shown in the example below, not the >>>>>>>>>> Beam >>>>>>>>>> KafkaIO or other Beam sources. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010 >>>>>>>>>> reading >>>>>>>>>> from Kafka and a Python lambda logging the value. The code is >>>>>>>>>> here: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>> I'm looking for feedback/opinions on the following items in >>>>>>>>>> particular: >>>>>>>>>> >>>>>>>>>> >>> * Enabling custom translation on the Flink portable runner >>>>>>>>>> (custom >>>>>>>>>> translator could be loaded with ServiceLoader, additional >>>>>>>>>> translations >>>>>>>>>> could also be specified as job server configuration, pipeline >>>>>>>>>> option, ...) >>>>>>>>>> >>>>>>>>>> >>> * For the Python side, is what's shown in the commit the >>>>>>>>>> recommended >>>>>>>>>> way to define a custom transform (it would eventually live in a >>>>>>>>>> reusable >>>>>>>>>> custom module that pipeline authors can import)? Also, the >>>>>>>>>> example does not >>>>>>>>>> have the configuration part covered yet.. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >> The only standard unbounded source API offered by Python SDK >>>>>>>>>> is the >>>>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying >>>>>>>>>> to add a >>>>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is >>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently >>>>>>>>>> comparing >>>>>>>>>> different Kafka Python client libraries. Will share more >>>>>>>>>> information on >>>>>>>>>> this soon. >>>>>>>>>> >>>>>>>>>> >> I understand this might not be possible in all cases and we >>>>>>>>>> might want >>>>>>>>>> to consider adding a native source/sink implementations. But this >>>>>>>>>> will >>>>>>>>>> result in the implementation being runner-specific (each runner >>>>>>>>>> will have >>>>>>>>>> to have it's own source/sink implementation). So I think we >>>>>>>>>> should try to >>>>>>>>>> add connector implementations to Beam using the standard API >>>>>>>>>> whenever >>>>>>>>>> possible. We also have plans to implement support for cross SDK >>>>>>>>>> transforms >>>>>>>>>> in the future (so that we can utilize Java implementation from >>>>>>>>>> Python for >>>>>>>>>> example) but we are not there yet and we might still want to >>>>>>>>>> implement a >>>>>>>>>> connector for a given SDK if there's good client library support. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> > It is great that the Python SDK will have connectors that are >>>>>>>>>> written in >>>>>>>>>> Python in the future, but I think it is equally if not more >>>>>>>>>> important to be >>>>>>>>>> able to use at least the Java Beam connectors with Python SDK >>>>>>>>>> (and any >>>>>>>>>> other non-Java SDK). Especially in a fully managed environment it >>>>>>>>>> should be >>>>>>>>>> possible to offer this to users in a way that is largely >>>>>>>>>> transparent. It >>>>>>>>>> takes significant time and effort to mature connectors and I'm >>>>>>>>>> not sure it >>>>>>>>>> is realistic to repeat that for all external systems in multiple >>>>>>>>>> languages. >>>>>>>>>> Or, to put it in another way, it is likely that instead of one >>>>>>>>>> over time >>>>>>>>>> rock solid connector per external system there will be multiple >>>>>>>>>> less mature >>>>>>>>>> implementations. That's also the reason we internally want to use >>>>>>>>>> the Flink >>>>>>>>>> native connectors - we know what they can and cannot do and want >>>>>>>>>> to >>>>>>>>>> leverage the existing investment. >>>>>>>>>> >>>>>>>>>> There are two related issues here: how to specify transforms >>>>>>>>>> (such as >>>>>>>>>> sources) in a language-independent manner, and how specific >>>>>>>>>> runners can >>>>>>>>>> recognize and run them, but URNs solve both. For this we use >>>>>>>>>> URNs: the >>>>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse + >>>>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that >>>>>>>>>> fully >>>>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and >>>>>>>>>> payload.) A runner that understands these URNs is free to make any >>>>>>>>>> (semantically-equivalent) substitutions it wants for this >>>>>>>>>> transform. >>>>>>>>>> >>>>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be a >>>>>>>>>> DoFn that >>>>>>>>>> fails loudly if it's actually called in the short term rather >>>>>>>>>> than a full >>>>>>>>>> Python implementation. Eventually, we would like to be able to >>>>>>>>>> call out to >>>>>>>>>> another SDK to expand full transforms (e.g. more complicated ones >>>>>>>>>> like >>>>>>>>>> BigQueryIO). >>>>>>>>>> >>>>>>>>>> >>> * Cross-language coders: In this example the Kafka source only >>>>>>>>>> considers the message value and uses the byte coder that both >>>>>>>>>> sides >>>>>>>>>> understand. If I wanted to pass on the key and possibly other >>>>>>>>>> metadata to >>>>>>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO), >>>>>>>>>> then a >>>>>>>>>> specific coder is needed. Such coder could be written using >>>>>>>>>> protobuf, Avro >>>>>>>>>> etc, but it would also need to be registered. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >> I think this requirement goes away if we implement Kafka in >>>>>>>>>> Python SDK. >>>>>>>>>> >>>>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or >>>>>>>>>> rather any >>>>>>>>>> that isn't only using PCollection<byte[]>? Is there a language >>>>>>>>>> agnostic >>>>>>>>>> encoding for KV<?,?>, for example? >>>>>>>>>> >>>>>>>>>> Yes, Coders are also specified by URN (+components and/or >>>>>>>>>> payload), and >>>>>>>>>> there are a couple of standard ones, including KV. See >>>>>>>>>> >>>>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md >>>>>>>>>> This is not a closed set. >>>>>>>>>> >>>>>>>>>> - Robert >>>>>>>>>> >>>>>>>>>
