It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka connector,
right? I was imagining: Python SDK submits pipeline with a KafkaIO (with
URN + payload) maybe bogus contents. It is replaced with a small Flink
subgraph, including the native Flink Kafka connector and some compensating
transfoms to match the required semantics. To me, this is preferable to
making single-runner transform URNs, since that breaks runner portability
by definition.

Kenn

On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <[email protected]>
wrote:

>
>
> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <[email protected]> wrote:
>
>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <[email protected]> wrote:
>>
>>> The premise of URN + payload is that you can establish a spec. A native
>>> override still needs to meet the spec - it may still require some
>>> compensating code. Worrying about weird differences between runners seems
>>> more about worrying that an adequate spec cannot be determined.
>>>
>>
>> My point exactly. a SDF-based KafkaIO can run in the middle of a
>> pipeline. E.g. we could have TextIO producing a list of topics, and the SDF
>> KafkaIO run after that on this dynamic (not known until runtime) list of
>> topics. If the native Flink source doesn't work this way, then it doesn't
>> share the same spec and should have a different URN.
>>
>
> Agree that if they cannot share the same spec, SDF and native transforms
> warrant different URNs. Native Kafka might be able to support a PCollection
> of topics/partitions as an input though by utilizing underlying native
> Flink Kafka connector as a library. On the other hand, we might decide to
> expand SDF based ParDos into to other transforms before a runner gets a
> chance to override in which case this kind of replacements will not be
> possible.
>
> Thanks,
> Cham
>
>
>>
>>> Runners will already invoke the SDF differently, so users treating every
>>> detail of some implementation as the spec are doomed.
>>>
>>> Kenn
>>>
>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <[email protected]> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> > Note that a KafkaDoFn still needs to be provided, but could be a
>>>>>> DoFn that
>>>>>> > fails loudly if it's actually called in the short term rather than
>>>>>> a full
>>>>>> > Python implementation.
>>>>>>
>>>>>> For configurable runner-native IO, for now, I think it is reasonable
>>>>>> to use a URN + special data payload directly without a KafkaDoFn --
>>>>>> assuming it's a portable pipeline. That's what we do in Go for
>>>>>> PubSub-on-Dataflow and something similar would work for Kafka-on-Flink as
>>>>>> well. I agree that non-native alternative implementation is desirable, 
>>>>>> but
>>>>>> if one is not present we should IMO rather fail at job submission instead
>>>>>> of at runtime. I could imagine connectors intrinsic to an execution 
>>>>>> engine
>>>>>> where non-native implementations are not possible.
>>>>>>
>>>>>
>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand
>>>>> similar to any other SDF by default (initial splitting, GBK, and a 
>>>>> map-task
>>>>> equivalent, for example) but a runner (Flink in this case) will be free to
>>>>> override it with an runner-native implementation if desired. I assume
>>>>> runner will have a chance to perform this override before the SDF 
>>>>> expansion
>>>>> (which is not fully designed yet). Providing a separate source/sink
>>>>> transforms for Flink native Kafka will be an option as well, but that will
>>>>> be less desirable from a Python user API perspective.
>>>>>
>>>>
>>>> Are we sure that the internal SDF will provide the same functionality
>>>> as the native one? What if the Kafka SDF is in the middle of a pipeline -
>>>> can Flink support that? Having a separate transform for the Flink native
>>>> source might be a better user experience than having one that changes its
>>>> behavior in strange ways depending on the runner.
>>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <[email protected]> wrote:
>>>>>>>
>>>>>>> > Hi Cham,
>>>>>>>
>>>>>>> > Thanks for the feedback!
>>>>>>>
>>>>>>> > I should have probably clarified that my POC and questions aren't
>>>>>>> specific to Kafka as source, but pretty much any other source/sink
>>>>>>> that we
>>>>>>> internally use as well. We have existing Flink pipelines that are
>>>>>>> written
>>>>>>> in Java and we want to use the same connectors with the Python SDK
>>>>>>> on top
>>>>>>> of the already operationalized Flink stack. Therefore, portability
>>>>>>> isn't a
>>>>>>> concern as much as the ability to integrate is.
>>>>>>>
>>>>>>
>>>>> Thanks for the clarification. Agree that providing runner-native
>>>>> implementations of established source/sinks can be can be desirable in 
>>>>> some
>>>>> cases.
>>>>>
>>>>>
>>>>>>> > -->
>>>>>>>
>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>>>> > <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>> >> Hi Thomas,
>>>>>>>
>>>>>>> >> Seems like we are working on similar (partially) things :).
>>>>>>>
>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming
>>>>>>> source
>>>>>>> for a Python pipeline executing on the (in-progress) portable Flink
>>>>>>> runner.
>>>>>>>
>>>>>>> >>> We eventually want to use the same native Flink connectors for
>>>>>>> sources
>>>>>>> and sinks that we also use in other Flink jobs.
>>>>>>>
>>>>>>>
>>>>>>> >> Could you clarify what you mean by same Flink connector ? Do you
>>>>>>> mean
>>>>>>> that Beam-based and non-Beam-based versions of Flink will use the
>>>>>>> same
>>>>>>> Kafka connector implementation ?
>>>>>>>
>>>>>>>
>>>>>>> > The native Flink sources as shown in the example below, not the
>>>>>>> Beam
>>>>>>> KafkaIO or other Beam sources.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010
>>>>>>> reading
>>>>>>> from Kafka and a Python lambda logging the value. The code is here:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> >>> I'm looking for feedback/opinions on the following items in
>>>>>>> particular:
>>>>>>>
>>>>>>> >>> * Enabling custom translation on the Flink portable runner
>>>>>>> (custom
>>>>>>> translator could be loaded with ServiceLoader, additional
>>>>>>> translations
>>>>>>> could also be specified as job server configuration, pipeline
>>>>>>> option, ...)
>>>>>>>
>>>>>>> >>> * For the Python side, is what's shown in the commit the
>>>>>>> recommended
>>>>>>> way to define a custom transform (it would eventually live in a
>>>>>>> reusable
>>>>>>> custom module that pipeline authors can import)? Also, the example
>>>>>>> does not
>>>>>>> have the configuration part covered yet..
>>>>>>>
>>>>>>>
>>>>>>> >> The only standard unbounded source API offered by Python SDK is
>>>>>>> the
>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying to
>>>>>>> add a
>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>>>>> comparing
>>>>>>> different Kafka Python client libraries. Will share more information
>>>>>>> on
>>>>>>> this soon.
>>>>>>>
>>>>>>> >> I understand this might not be possible in all cases and we might
>>>>>>> want
>>>>>>> to consider adding a native source/sink implementations. But this
>>>>>>> will
>>>>>>> result in the implementation being runner-specific (each runner will
>>>>>>> have
>>>>>>> to have it's own source/sink implementation). So I think we should
>>>>>>> try to
>>>>>>> add connector implementations to Beam using the standard API whenever
>>>>>>> possible. We also have plans to implement support for cross SDK
>>>>>>> transforms
>>>>>>> in the future (so that we can utilize Java implementation from
>>>>>>> Python for
>>>>>>> example) but we are not there yet and we might still want to
>>>>>>> implement a
>>>>>>> connector for a given SDK if there's good client library support.
>>>>>>>
>>>>>>>
>>>>>>> > It is great that the Python SDK will have connectors that are
>>>>>>> written in
>>>>>>> Python in the future, but I think it is equally if not more
>>>>>>> important to be
>>>>>>> able to use at least the Java Beam connectors with Python SDK (and
>>>>>>> any
>>>>>>> other non-Java SDK). Especially in a fully managed environment it
>>>>>>> should be
>>>>>>> possible to offer this to users in a way that is largely
>>>>>>> transparent. It
>>>>>>> takes significant time and effort to mature connectors and I'm not
>>>>>>> sure it
>>>>>>> is realistic to repeat that for all external systems in multiple
>>>>>>> languages.
>>>>>>> Or, to put it in another way, it is likely that instead of one over
>>>>>>> time
>>>>>>> rock solid connector per external system there will be multiple less
>>>>>>> mature
>>>>>>> implementations. That's also the reason we internally want to use
>>>>>>> the Flink
>>>>>>> native connectors - we know what they can and cannot do and want to
>>>>>>> leverage the existing investment.
>>>>>>>
>>>>>>> There are two related issues here: how to specify transforms (such as
>>>>>>> sources) in a language-independent manner, and how specific runners
>>>>>>> can
>>>>>>> recognize and run them, but URNs solve both. For  this we use URNs:
>>>>>>> the
>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that
>>>>>>> fully
>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and
>>>>>>> payload.) A runner that understands these URNs is free to make any
>>>>>>> (semantically-equivalent) substitutions it wants for this transform.
>>>>>>>
>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be a
>>>>>>> DoFn that
>>>>>>> fails loudly if it's actually called in the short term rather than a
>>>>>>> full
>>>>>>> Python implementation. Eventually, we would like to be able to call
>>>>>>> out to
>>>>>>> another SDK to expand full transforms (e.g. more complicated ones
>>>>>>> like
>>>>>>> BigQueryIO).
>>>>>>>
>>>>>>> >>> * Cross-language coders: In this example the Kafka source only
>>>>>>> considers the message value and uses the byte coder that both sides
>>>>>>> understand. If I wanted to pass on the key and possibly other
>>>>>>> metadata to
>>>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO),
>>>>>>> then a
>>>>>>> specific coder is needed. Such coder could be written using
>>>>>>> protobuf, Avro
>>>>>>> etc, but it would also need to be registered.
>>>>>>>
>>>>>>>
>>>>>>> >> I think this requirement goes away if we implement Kafka in
>>>>>>> Python SDK.
>>>>>>>
>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or rather
>>>>>>> any
>>>>>>> that isn't only using PCollection<byte[]>? Is there a language
>>>>>>> agnostic
>>>>>>> encoding for KV<?,?>, for example?
>>>>>>>
>>>>>>> Yes, Coders are also specified by URN (+components and/or payload),
>>>>>>> and
>>>>>>> there are a couple of standard ones, including KV. See
>>>>>>>
>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>>>> This is not a closed set.
>>>>>>>
>>>>>>> - Robert
>>>>>>>
>>>>>>

Reply via email to