> Note that a KafkaDoFn still needs to be provided, but could be a DoFn that
> fails loudly if it's actually called in the short term rather than a full
> Python implementation.

For configurable runner-native IO, for now, I think it is reasonable to use
a URN + special data payload directly without a KafkaDoFn -- assuming it's
a portable pipeline. That's what we do in Go for PubSub-on-Dataflow and
something similar would work for Kafka-on-Flink as well. I agree that
non-native alternative implementation is desirable, but if one is not
present we should IMO rather fail at job submission instead of at runtime.
I could imagine connectors intrinsic to an execution engine where
non-native implementations are not possible.


On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <[email protected]> wrote:

> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <[email protected]> wrote:
>
> > Hi Cham,
>
> > Thanks for the feedback!
>
> > I should have probably clarified that my POC and questions aren't
> specific to Kafka as source, but pretty much any other source/sink that we
> internally use as well. We have existing Flink pipelines that are written
> in Java and we want to use the same connectors with the Python SDK on top
> of the already operationalized Flink stack. Therefore, portability isn't a
> concern as much as the ability to integrate is.
>
> > -->
>
> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
> > <[email protected]>
> wrote:
>
> >> Hi Thomas,
>
> >> Seems like we are working on similar (partially) things :).
>
> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <[email protected]> wrote:
>
> >>> I'm working on a mini POC to enable Kafka as custom streaming source
> for a Python pipeline executing on the (in-progress) portable Flink runner.
>
> >>> We eventually want to use the same native Flink connectors for sources
> and sinks that we also use in other Flink jobs.
>
>
> >> Could you clarify what you mean by same Flink connector ? Do you mean
> that Beam-based and non-Beam-based versions of Flink will use the same
> Kafka connector implementation ?
>
>
> > The native Flink sources as shown in the example below, not the Beam
> KafkaIO or other Beam sources.
>
>
>
> >>> I got a simple example to work with the FlinkKafkaConsumer010 reading
> from Kafka and a Python lambda logging the value. The code is here:
>
>
>
> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>
>
>
> >>> I'm looking for feedback/opinions on the following items in particular:
>
> >>> * Enabling custom translation on the Flink portable runner (custom
> translator could be loaded with ServiceLoader, additional translations
> could also be specified as job server configuration, pipeline option, ...)
>
> >>> * For the Python side, is what's shown in the commit the recommended
> way to define a custom transform (it would eventually live in a reusable
> custom module that pipeline authors can import)? Also, the example does not
> have the configuration part covered yet..
>
>
> >> The only standard unbounded source API offered by Python SDK is the
> Splittable DoFn API. This is the part I'm working on. I'm trying to add a
> Kafka connector for Beam Python SDK using SDF API. JIRA is
> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently comparing
> different Kafka Python client libraries. Will share more information on
> this soon.
>
> >> I understand this might not be possible in all cases and we might want
> to consider adding a native source/sink implementations. But this will
> result in the implementation being runner-specific (each runner will have
> to have it's own source/sink implementation). So I think we should try to
> add connector implementations to Beam using the standard API whenever
> possible. We also have plans to implement support for cross SDK transforms
> in the future (so that we can utilize Java implementation from Python for
> example) but we are not there yet and we might still want to implement a
> connector for a given SDK if there's good client library support.
>
>
> > It is great that the Python SDK will have connectors that are written in
> Python in the future, but I think it is equally if not more important to be
> able to use at least the Java Beam connectors with Python SDK (and any
> other non-Java SDK). Especially in a fully managed environment it should be
> possible to offer this to users in a way that is largely transparent. It
> takes significant time and effort to mature connectors and I'm not sure it
> is realistic to repeat that for all external systems in multiple languages.
> Or, to put it in another way, it is likely that instead of one over time
> rock solid connector per external system there will be multiple less mature
> implementations. That's also the reason we internally want to use the Flink
> native connectors - we know what they can and cannot do and want to
> leverage the existing investment.
>
> There are two related issues here: how to specify transforms (such as
> sources) in a language-independent manner, and how specific runners can
> recognize and run them, but URNs solve both. For  this we use URNs: the
> composite ReadFromKafka PTransform (that consists of a Impulse +
> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that fully
> specifies this read. (The KafkaDoFn could similarly have a URN and
> payload.) A runner that understands these URNs is free to make any
> (semantically-equivalent) substitutions it wants for this transform.
>
> Note that a KafkaDoFn still needs to be provided, but could be a DoFn that
> fails loudly if it's actually called in the short term rather than a full
> Python implementation. Eventually, we would like to be able to call out to
> another SDK to expand full transforms (e.g. more complicated ones like
> BigQueryIO).
>
> >>> * Cross-language coders: In this example the Kafka source only
> considers the message value and uses the byte coder that both sides
> understand. If I wanted to pass on the key and possibly other metadata to
> the Python transform (similar to KafkaRecord from Java KafkaIO), then a
> specific coder is needed. Such coder could be written using protobuf, Avro
> etc, but it would also need to be registered.
>
>
> >> I think this requirement goes away if we implement Kafka in Python SDK.
>
> > Wouldn't this be needed for any cross language pipeline? Or rather any
> that isn't only using PCollection<byte[]>? Is there a language agnostic
> encoding for KV<?,?>, for example?
>
> Yes, Coders are also specified by URN (+components and/or payload), and
> there are a couple of standard ones, including KV. See
>
> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
> This is not a closed set.
>
> - Robert
>

Reply via email to