I agree with Thomas' sentiment that cross-language IO is very important
because of how much work it takes to produce a mature connector
implementation in a language. Looking at implementations of BigQueryIO,
PubSubIO, KafkaIO, FileIO in Java, only a very daring soul would be tempted
to reimplement them entirely in Python and Go.

I'm imagining pretty much what Kenn is describing: a pipeline would specify
some transforms by URN + payload, and rely on the runner to do whatever it
takes to run this - either by expanding it into a Beam implementation of
this transform that the runner chooses to use (could be in the same
language or in a different language; either way, the runner would indeed
need to invoke the respective SDK to expand it given the parameters), or by
doing something entirely runner-specific (e.g. using the built-in Flink
Kafka connector).

I don't see a reason to require that there *must* exist a Beam
implementation of this transform. There only, ideally, must be a runner-
and language-agnostic spec for the URN and payload; of course, then the
transform is only as portable as the set of runners that implement this URN.

I actually really like the idea that the transform can be implemented in a
completely runner-specific way without a Beam expansion to back it up - it
would let us unblock a lot of the work earlier than full-blown
cross-language IO is delivered or even than SDFs work in all
languages/runners.

On Wed, Apr 25, 2018 at 10:02 PM Kenneth Knowles <[email protected]> wrote:

> It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka connector,
> right? I was imagining: Python SDK submits pipeline with a KafkaIO (with
> URN + payload) maybe bogus contents. It is replaced with a small Flink
> subgraph, including the native Flink Kafka connector and some compensating
> transfoms to match the required semantics. To me, this is preferable to
> making single-runner transform URNs, since that breaks runner portability
> by definition.
>
> Kenn
>
> On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <[email protected]>
> wrote:
>
>>
>>
>> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <[email protected]> wrote:
>>
>>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <[email protected]> wrote:
>>>
>>>> The premise of URN + payload is that you can establish a spec. A native
>>>> override still needs to meet the spec - it may still require some
>>>> compensating code. Worrying about weird differences between runners seems
>>>> more about worrying that an adequate spec cannot be determined.
>>>>
>>>
>>> My point exactly. a SDF-based KafkaIO can run in the middle of a
>>> pipeline. E.g. we could have TextIO producing a list of topics, and the SDF
>>> KafkaIO run after that on this dynamic (not known until runtime) list of
>>> topics. If the native Flink source doesn't work this way, then it doesn't
>>> share the same spec and should have a different URN.
>>>
>>
>> Agree that if they cannot share the same spec, SDF and native transforms
>> warrant different URNs. Native Kafka might be able to support a PCollection
>> of topics/partitions as an input though by utilizing underlying native
>> Flink Kafka connector as a library. On the other hand, we might decide to
>> expand SDF based ParDos into to other transforms before a runner gets a
>> chance to override in which case this kind of replacements will not be
>> possible.
>>
>> Thanks,
>> Cham
>>
>>
>>>
>>>> Runners will already invoke the SDF differently, so users treating
>>>> every detail of some implementation as the spec are doomed.
>>>>
>>>> Kenn
>>>>
>>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <[email protected]> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <
>>>>> [email protected]> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> > Note that a KafkaDoFn still needs to be provided, but could be a
>>>>>>> DoFn that
>>>>>>> > fails loudly if it's actually called in the short term rather than
>>>>>>> a full
>>>>>>> > Python implementation.
>>>>>>>
>>>>>>> For configurable runner-native IO, for now, I think it is reasonable
>>>>>>> to use a URN + special data payload directly without a KafkaDoFn --
>>>>>>> assuming it's a portable pipeline. That's what we do in Go for
>>>>>>> PubSub-on-Dataflow and something similar would work for Kafka-on-Flink 
>>>>>>> as
>>>>>>> well. I agree that non-native alternative implementation is desirable, 
>>>>>>> but
>>>>>>> if one is not present we should IMO rather fail at job submission 
>>>>>>> instead
>>>>>>> of at runtime. I could imagine connectors intrinsic to an execution 
>>>>>>> engine
>>>>>>> where non-native implementations are not possible.
>>>>>>>
>>>>>>
>>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand
>>>>>> similar to any other SDF by default (initial splitting, GBK, and a 
>>>>>> map-task
>>>>>> equivalent, for example) but a runner (Flink in this case) will be free 
>>>>>> to
>>>>>> override it with an runner-native implementation if desired. I assume
>>>>>> runner will have a chance to perform this override before the SDF 
>>>>>> expansion
>>>>>> (which is not fully designed yet). Providing a separate source/sink
>>>>>> transforms for Flink native Kafka will be an option as well, but that 
>>>>>> will
>>>>>> be less desirable from a Python user API perspective.
>>>>>>
>>>>>
>>>>> Are we sure that the internal SDF will provide the same functionality
>>>>> as the native one? What if the Kafka SDF is in the middle of a pipeline -
>>>>> can Flink support that? Having a separate transform for the Flink native
>>>>> source might be a better user experience than having one that changes its
>>>>> behavior in strange ways depending on the runner.
>>>>>
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <[email protected]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> > Hi Cham,
>>>>>>>>
>>>>>>>> > Thanks for the feedback!
>>>>>>>>
>>>>>>>> > I should have probably clarified that my POC and questions aren't
>>>>>>>> specific to Kafka as source, but pretty much any other source/sink
>>>>>>>> that we
>>>>>>>> internally use as well. We have existing Flink pipelines that are
>>>>>>>> written
>>>>>>>> in Java and we want to use the same connectors with the Python SDK
>>>>>>>> on top
>>>>>>>> of the already operationalized Flink stack. Therefore, portability
>>>>>>>> isn't a
>>>>>>>> concern as much as the ability to integrate is.
>>>>>>>>
>>>>>>>
>>>>>> Thanks for the clarification. Agree that providing runner-native
>>>>>> implementations of established source/sinks can be can be desirable in 
>>>>>> some
>>>>>> cases.
>>>>>>
>>>>>>
>>>>>>>> > -->
>>>>>>>>
>>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>>>>> > <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> >> Hi Thomas,
>>>>>>>>
>>>>>>>> >> Seems like we are working on similar (partially) things :).
>>>>>>>>
>>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming
>>>>>>>> source
>>>>>>>> for a Python pipeline executing on the (in-progress) portable Flink
>>>>>>>> runner.
>>>>>>>>
>>>>>>>> >>> We eventually want to use the same native Flink connectors for
>>>>>>>> sources
>>>>>>>> and sinks that we also use in other Flink jobs.
>>>>>>>>
>>>>>>>>
>>>>>>>> >> Could you clarify what you mean by same Flink connector ? Do you
>>>>>>>> mean
>>>>>>>> that Beam-based and non-Beam-based versions of Flink will use the
>>>>>>>> same
>>>>>>>> Kafka connector implementation ?
>>>>>>>>
>>>>>>>>
>>>>>>>> > The native Flink sources as shown in the example below, not the
>>>>>>>> Beam
>>>>>>>> KafkaIO or other Beam sources.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010
>>>>>>>> reading
>>>>>>>> from Kafka and a Python lambda logging the value. The code is here:
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> >>> I'm looking for feedback/opinions on the following items in
>>>>>>>> particular:
>>>>>>>>
>>>>>>>> >>> * Enabling custom translation on the Flink portable runner
>>>>>>>> (custom
>>>>>>>> translator could be loaded with ServiceLoader, additional
>>>>>>>> translations
>>>>>>>> could also be specified as job server configuration, pipeline
>>>>>>>> option, ...)
>>>>>>>>
>>>>>>>> >>> * For the Python side, is what's shown in the commit the
>>>>>>>> recommended
>>>>>>>> way to define a custom transform (it would eventually live in a
>>>>>>>> reusable
>>>>>>>> custom module that pipeline authors can import)? Also, the example
>>>>>>>> does not
>>>>>>>> have the configuration part covered yet..
>>>>>>>>
>>>>>>>>
>>>>>>>> >> The only standard unbounded source API offered by Python SDK is
>>>>>>>> the
>>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying to
>>>>>>>> add a
>>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>>>>>> comparing
>>>>>>>> different Kafka Python client libraries. Will share more
>>>>>>>> information on
>>>>>>>> this soon.
>>>>>>>>
>>>>>>>> >> I understand this might not be possible in all cases and we
>>>>>>>> might want
>>>>>>>> to consider adding a native source/sink implementations. But this
>>>>>>>> will
>>>>>>>> result in the implementation being runner-specific (each runner
>>>>>>>> will have
>>>>>>>> to have it's own source/sink implementation). So I think we should
>>>>>>>> try to
>>>>>>>> add connector implementations to Beam using the standard API
>>>>>>>> whenever
>>>>>>>> possible. We also have plans to implement support for cross SDK
>>>>>>>> transforms
>>>>>>>> in the future (so that we can utilize Java implementation from
>>>>>>>> Python for
>>>>>>>> example) but we are not there yet and we might still want to
>>>>>>>> implement a
>>>>>>>> connector for a given SDK if there's good client library support.
>>>>>>>>
>>>>>>>>
>>>>>>>> > It is great that the Python SDK will have connectors that are
>>>>>>>> written in
>>>>>>>> Python in the future, but I think it is equally if not more
>>>>>>>> important to be
>>>>>>>> able to use at least the Java Beam connectors with Python SDK (and
>>>>>>>> any
>>>>>>>> other non-Java SDK). Especially in a fully managed environment it
>>>>>>>> should be
>>>>>>>> possible to offer this to users in a way that is largely
>>>>>>>> transparent. It
>>>>>>>> takes significant time and effort to mature connectors and I'm not
>>>>>>>> sure it
>>>>>>>> is realistic to repeat that for all external systems in multiple
>>>>>>>> languages.
>>>>>>>> Or, to put it in another way, it is likely that instead of one over
>>>>>>>> time
>>>>>>>> rock solid connector per external system there will be multiple
>>>>>>>> less mature
>>>>>>>> implementations. That's also the reason we internally want to use
>>>>>>>> the Flink
>>>>>>>> native connectors - we know what they can and cannot do and want to
>>>>>>>> leverage the existing investment.
>>>>>>>>
>>>>>>>> There are two related issues here: how to specify transforms (such
>>>>>>>> as
>>>>>>>> sources) in a language-independent manner, and how specific runners
>>>>>>>> can
>>>>>>>> recognize and run them, but URNs solve both. For  this we use URNs:
>>>>>>>> the
>>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that
>>>>>>>> fully
>>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and
>>>>>>>> payload.) A runner that understands these URNs is free to make any
>>>>>>>> (semantically-equivalent) substitutions it wants for this transform.
>>>>>>>>
>>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be a
>>>>>>>> DoFn that
>>>>>>>> fails loudly if it's actually called in the short term rather than
>>>>>>>> a full
>>>>>>>> Python implementation. Eventually, we would like to be able to call
>>>>>>>> out to
>>>>>>>> another SDK to expand full transforms (e.g. more complicated ones
>>>>>>>> like
>>>>>>>> BigQueryIO).
>>>>>>>>
>>>>>>>> >>> * Cross-language coders: In this example the Kafka source only
>>>>>>>> considers the message value and uses the byte coder that both sides
>>>>>>>> understand. If I wanted to pass on the key and possibly other
>>>>>>>> metadata to
>>>>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO),
>>>>>>>> then a
>>>>>>>> specific coder is needed. Such coder could be written using
>>>>>>>> protobuf, Avro
>>>>>>>> etc, but it would also need to be registered.
>>>>>>>>
>>>>>>>>
>>>>>>>> >> I think this requirement goes away if we implement Kafka in
>>>>>>>> Python SDK.
>>>>>>>>
>>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or
>>>>>>>> rather any
>>>>>>>> that isn't only using PCollection<byte[]>? Is there a language
>>>>>>>> agnostic
>>>>>>>> encoding for KV<?,?>, for example?
>>>>>>>>
>>>>>>>> Yes, Coders are also specified by URN (+components and/or payload),
>>>>>>>> and
>>>>>>>> there are a couple of standard ones, including KV. See
>>>>>>>>
>>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>>>>> This is not a closed set.
>>>>>>>>
>>>>>>>> - Robert
>>>>>>>>
>>>>>>>

Reply via email to