On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <[email protected]>
wrote:

>
>
> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <[email protected]> wrote:
>
>> > Note that a KafkaDoFn still needs to be provided, but could be a DoFn
>> that
>> > fails loudly if it's actually called in the short term rather than a
>> full
>> > Python implementation.
>>
>> For configurable runner-native IO, for now, I think it is reasonable to
>> use a URN + special data payload directly without a KafkaDoFn -- assuming
>> it's a portable pipeline. That's what we do in Go for PubSub-on-Dataflow
>> and something similar would work for Kafka-on-Flink as well. I agree that
>> non-native alternative implementation is desirable, but if one is not
>> present we should IMO rather fail at job submission instead of at runtime.
>> I could imagine connectors intrinsic to an execution engine where
>> non-native implementations are not possible.
>>
>
> I think, in this case, KafkaDoFn can be a SDF that would expand similar to
> any other SDF by default (initial splitting, GBK, and a map-task
> equivalent, for example) but a runner (Flink in this case) will be free to
> override it with an runner-native implementation if desired. I assume
> runner will have a chance to perform this override before the SDF expansion
> (which is not fully designed yet). Providing a separate source/sink
> transforms for Flink native Kafka will be an option as well, but that will
> be less desirable from a Python user API perspective.
>

Are we sure that the internal SDF will provide the same functionality as
the native one? What if the Kafka SDF is in the middle of a pipeline - can
Flink support that? Having a separate transform for the Flink native source
might be a better user experience than having one that changes its behavior
in strange ways depending on the runner.



>
>
>>
>>
>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <[email protected]>
>> wrote:
>>
>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <[email protected]> wrote:
>>>
>>> > Hi Cham,
>>>
>>> > Thanks for the feedback!
>>>
>>> > I should have probably clarified that my POC and questions aren't
>>> specific to Kafka as source, but pretty much any other source/sink that
>>> we
>>> internally use as well. We have existing Flink pipelines that are written
>>> in Java and we want to use the same connectors with the Python SDK on top
>>> of the already operationalized Flink stack. Therefore, portability isn't
>>> a
>>> concern as much as the ability to integrate is.
>>>
>>
> Thanks for the clarification. Agree that providing runner-native
> implementations of established source/sinks can be can be desirable in some
> cases.
>
>
>>> > -->
>>>
>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>> > <[email protected]>
>>> wrote:
>>>
>>> >> Hi Thomas,
>>>
>>> >> Seems like we are working on similar (partially) things :).
>>>
>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <[email protected]> wrote:
>>>
>>> >>> I'm working on a mini POC to enable Kafka as custom streaming source
>>> for a Python pipeline executing on the (in-progress) portable Flink
>>> runner.
>>>
>>> >>> We eventually want to use the same native Flink connectors for
>>> sources
>>> and sinks that we also use in other Flink jobs.
>>>
>>>
>>> >> Could you clarify what you mean by same Flink connector ? Do you mean
>>> that Beam-based and non-Beam-based versions of Flink will use the same
>>> Kafka connector implementation ?
>>>
>>>
>>> > The native Flink sources as shown in the example below, not the Beam
>>> KafkaIO or other Beam sources.
>>>
>>>
>>>
>>> >>> I got a simple example to work with the FlinkKafkaConsumer010 reading
>>> from Kafka and a Python lambda logging the value. The code is here:
>>>
>>>
>>>
>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>
>>>
>>>
>>> >>> I'm looking for feedback/opinions on the following items in
>>> particular:
>>>
>>> >>> * Enabling custom translation on the Flink portable runner (custom
>>> translator could be loaded with ServiceLoader, additional translations
>>> could also be specified as job server configuration, pipeline option,
>>> ...)
>>>
>>> >>> * For the Python side, is what's shown in the commit the recommended
>>> way to define a custom transform (it would eventually live in a reusable
>>> custom module that pipeline authors can import)? Also, the example does
>>> not
>>> have the configuration part covered yet..
>>>
>>>
>>> >> The only standard unbounded source API offered by Python SDK is the
>>> Splittable DoFn API. This is the part I'm working on. I'm trying to add a
>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently comparing
>>> different Kafka Python client libraries. Will share more information on
>>> this soon.
>>>
>>> >> I understand this might not be possible in all cases and we might want
>>> to consider adding a native source/sink implementations. But this will
>>> result in the implementation being runner-specific (each runner will have
>>> to have it's own source/sink implementation). So I think we should try to
>>> add connector implementations to Beam using the standard API whenever
>>> possible. We also have plans to implement support for cross SDK
>>> transforms
>>> in the future (so that we can utilize Java implementation from Python for
>>> example) but we are not there yet and we might still want to implement a
>>> connector for a given SDK if there's good client library support.
>>>
>>>
>>> > It is great that the Python SDK will have connectors that are written
>>> in
>>> Python in the future, but I think it is equally if not more important to
>>> be
>>> able to use at least the Java Beam connectors with Python SDK (and any
>>> other non-Java SDK). Especially in a fully managed environment it should
>>> be
>>> possible to offer this to users in a way that is largely transparent. It
>>> takes significant time and effort to mature connectors and I'm not sure
>>> it
>>> is realistic to repeat that for all external systems in multiple
>>> languages.
>>> Or, to put it in another way, it is likely that instead of one over time
>>> rock solid connector per external system there will be multiple less
>>> mature
>>> implementations. That's also the reason we internally want to use the
>>> Flink
>>> native connectors - we know what they can and cannot do and want to
>>> leverage the existing investment.
>>>
>>> There are two related issues here: how to specify transforms (such as
>>> sources) in a language-independent manner, and how specific runners can
>>> recognize and run them, but URNs solve both. For  this we use URNs: the
>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that fully
>>> specifies this read. (The KafkaDoFn could similarly have a URN and
>>> payload.) A runner that understands these URNs is free to make any
>>> (semantically-equivalent) substitutions it wants for this transform.
>>>
>>> Note that a KafkaDoFn still needs to be provided, but could be a DoFn
>>> that
>>> fails loudly if it's actually called in the short term rather than a full
>>> Python implementation. Eventually, we would like to be able to call out
>>> to
>>> another SDK to expand full transforms (e.g. more complicated ones like
>>> BigQueryIO).
>>>
>>> >>> * Cross-language coders: In this example the Kafka source only
>>> considers the message value and uses the byte coder that both sides
>>> understand. If I wanted to pass on the key and possibly other metadata to
>>> the Python transform (similar to KafkaRecord from Java KafkaIO), then a
>>> specific coder is needed. Such coder could be written using protobuf,
>>> Avro
>>> etc, but it would also need to be registered.
>>>
>>>
>>> >> I think this requirement goes away if we implement Kafka in Python
>>> SDK.
>>>
>>> > Wouldn't this be needed for any cross language pipeline? Or rather any
>>> that isn't only using PCollection<byte[]>? Is there a language agnostic
>>> encoding for KV<?,?>, for example?
>>>
>>> Yes, Coders are also specified by URN (+components and/or payload), and
>>> there are a couple of standard ones, including KV. See
>>>
>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>> This is not a closed set.
>>>
>>> - Robert
>>>
>>

Reply via email to