On Thu, Apr 26, 2018 at 8:38 PM Chamikara Jayalath <[email protected]>
wrote:

>
>
> On Thu, Apr 26, 2018 at 5:59 PM Eugene Kirpichov <[email protected]>
> wrote:
>
>> I agree with Thomas' sentiment that cross-language IO is very important
>> because of how much work it takes to produce a mature connector
>> implementation in a language. Looking at implementations of BigQueryIO,
>> PubSubIO, KafkaIO, FileIO in Java, only a very daring soul would be tempted
>> to reimplement them entirely in Python and Go.
>>
>> I'm imagining pretty much what Kenn is describing: a pipeline would
>> specify some transforms by URN + payload, and rely on the runner to do
>> whatever it takes to run this - either by expanding it into a Beam
>> implementation of this transform that the runner chooses to use (could be
>> in the same language or in a different language; either way, the runner
>> would indeed need to invoke the respective SDK to expand it given the
>> parameters), or by doing something entirely runner-specific (e.g. using the
>> built-in Flink Kafka connector).
>>
>> I don't see a reason to require that there *must* exist a Beam
>> implementation of this transform. There only, ideally, must be a runner-
>> and language-agnostic spec for the URN and payload; of course, then the
>> transform is only as portable as the set of runners that implement this URN.
>>
>
> For a transform in general it's true that we don't need a Beam
> implementation, but more specifically for IOs I think there are many
> benefits to having the implementation in Beam. For example,
>
>    - IO connector will offer same behavior and feature set across various
>    runners/SDKs.
>    - Beam community will be able to view/modify/improve the IO connector.
>    - existing IO connectors will serve as examples for users who wish to
>    develop new IO connectors
>
>
>
   - More runners will be able to execute the users pipeline.


>> I actually really like the idea that the transform can be implemented in
>> a completely runner-specific way without a Beam expansion to back it up -
>> it would let us unblock a lot of the work earlier than full-blown
>> cross-language IO is delivered or even than SDFs work in all
>> languages/runners.
>>
>
> If there are existing established connectors (for example, Kafka for Flink
> in this case) I agree. But for anybody developing a new IO connector, I
> think we should encourage developing that in Beam (in some SDK) given that
> the connector will be available to all runners (and to all SDKs once we
> have cross-language transforms).
>
> Thanks,
> Cham
>
>
>>
>> On Wed, Apr 25, 2018 at 10:02 PM Kenneth Knowles <[email protected]> wrote:
>>
>>> It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka connector,
>>> right? I was imagining: Python SDK submits pipeline with a KafkaIO (with
>>> URN + payload) maybe bogus contents. It is replaced with a small Flink
>>> subgraph, including the native Flink Kafka connector and some compensating
>>> transfoms to match the required semantics. To me, this is preferable to
>>> making single-runner transform URNs, since that breaks runner portability
>>> by definition.
>>>
>>> Kenn
>>>
>>> On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <[email protected]>
>>> wrote:
>>>
>>>>
>>>>
>>>> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <[email protected]> wrote:
>>>>
>>>>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> The premise of URN + payload is that you can establish a spec. A
>>>>>> native override still needs to meet the spec - it may still require some
>>>>>> compensating code. Worrying about weird differences between runners seems
>>>>>> more about worrying that an adequate spec cannot be determined.
>>>>>>
>>>>>
>>>>> My point exactly. a SDF-based KafkaIO can run in the middle of a
>>>>> pipeline. E.g. we could have TextIO producing a list of topics, and the 
>>>>> SDF
>>>>> KafkaIO run after that on this dynamic (not known until runtime) list of
>>>>> topics. If the native Flink source doesn't work this way, then it doesn't
>>>>> share the same spec and should have a different URN.
>>>>>
>>>>
>>>> Agree that if they cannot share the same spec, SDF and native
>>>> transforms warrant different URNs. Native Kafka might be able to support a
>>>> PCollection of topics/partitions as an input though by utilizing underlying
>>>> native Flink Kafka connector as a library. On the other hand, we might
>>>> decide to expand SDF based ParDos into to other transforms before a runner
>>>> gets a chance to override in which case this kind of replacements will not
>>>> be possible.
>>>>
>>>> Thanks,
>>>> Cham
>>>>
>>>>
>>>>>
>>>>>> Runners will already invoke the SDF differently, so users treating
>>>>>> every detail of some implementation as the spec are doomed.
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <[email protected]> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> > Note that a KafkaDoFn still needs to be provided, but could be
>>>>>>>>> a DoFn that
>>>>>>>>> > fails loudly if it's actually called in the short term rather
>>>>>>>>> than a full
>>>>>>>>> > Python implementation.
>>>>>>>>>
>>>>>>>>> For configurable runner-native IO, for now, I think it is
>>>>>>>>> reasonable to use a URN + special data payload directly without a 
>>>>>>>>> KafkaDoFn
>>>>>>>>> -- assuming it's a portable pipeline. That's what we do in Go for
>>>>>>>>> PubSub-on-Dataflow and something similar would work for 
>>>>>>>>> Kafka-on-Flink as
>>>>>>>>> well. I agree that non-native alternative implementation is 
>>>>>>>>> desirable, but
>>>>>>>>> if one is not present we should IMO rather fail at job submission 
>>>>>>>>> instead
>>>>>>>>> of at runtime. I could imagine connectors intrinsic to an execution 
>>>>>>>>> engine
>>>>>>>>> where non-native implementations are not possible.
>>>>>>>>>
>>>>>>>>
>>>>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand
>>>>>>>> similar to any other SDF by default (initial splitting, GBK, and a 
>>>>>>>> map-task
>>>>>>>> equivalent, for example) but a runner (Flink in this case) will be 
>>>>>>>> free to
>>>>>>>> override it with an runner-native implementation if desired. I assume
>>>>>>>> runner will have a chance to perform this override before the SDF 
>>>>>>>> expansion
>>>>>>>> (which is not fully designed yet). Providing a separate source/sink
>>>>>>>> transforms for Flink native Kafka will be an option as well, but that 
>>>>>>>> will
>>>>>>>> be less desirable from a Python user API perspective.
>>>>>>>>
>>>>>>>
>>>>>>> Are we sure that the internal SDF will provide the same
>>>>>>> functionality as the native one? What if the Kafka SDF is in the middle 
>>>>>>> of
>>>>>>> a pipeline - can Flink support that? Having a separate transform for the
>>>>>>> Flink native source might be a better user experience than having one 
>>>>>>> that
>>>>>>> changes its behavior in strange ways depending on the runner.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> > Hi Cham,
>>>>>>>>>>
>>>>>>>>>> > Thanks for the feedback!
>>>>>>>>>>
>>>>>>>>>> > I should have probably clarified that my POC and questions
>>>>>>>>>> aren't
>>>>>>>>>> specific to Kafka as source, but pretty much any other
>>>>>>>>>> source/sink that we
>>>>>>>>>> internally use as well. We have existing Flink pipelines that are
>>>>>>>>>> written
>>>>>>>>>> in Java and we want to use the same connectors with the Python
>>>>>>>>>> SDK on top
>>>>>>>>>> of the already operationalized Flink stack. Therefore,
>>>>>>>>>> portability isn't a
>>>>>>>>>> concern as much as the ability to integrate is.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>> Thanks for the clarification. Agree that providing runner-native
>>>>>>>> implementations of established source/sinks can be can be desirable in 
>>>>>>>> some
>>>>>>>> cases.
>>>>>>>>
>>>>>>>>
>>>>>>>>>> > -->
>>>>>>>>>>
>>>>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>>>>>>> > <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> >> Hi Thomas,
>>>>>>>>>>
>>>>>>>>>> >> Seems like we are working on similar (partially) things :).
>>>>>>>>>>
>>>>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming
>>>>>>>>>> source
>>>>>>>>>> for a Python pipeline executing on the (in-progress) portable
>>>>>>>>>> Flink runner.
>>>>>>>>>>
>>>>>>>>>> >>> We eventually want to use the same native Flink connectors
>>>>>>>>>> for sources
>>>>>>>>>> and sinks that we also use in other Flink jobs.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> >> Could you clarify what you mean by same Flink connector ? Do
>>>>>>>>>> you mean
>>>>>>>>>> that Beam-based and non-Beam-based versions of Flink will use the
>>>>>>>>>> same
>>>>>>>>>> Kafka connector implementation ?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> > The native Flink sources as shown in the example below, not the
>>>>>>>>>> Beam
>>>>>>>>>> KafkaIO or other Beam sources.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010
>>>>>>>>>> reading
>>>>>>>>>> from Kafka and a Python lambda logging the value. The code is
>>>>>>>>>> here:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> >>> I'm looking for feedback/opinions on the following items in
>>>>>>>>>> particular:
>>>>>>>>>>
>>>>>>>>>> >>> * Enabling custom translation on the Flink portable runner
>>>>>>>>>> (custom
>>>>>>>>>> translator could be loaded with ServiceLoader, additional
>>>>>>>>>> translations
>>>>>>>>>> could also be specified as job server configuration, pipeline
>>>>>>>>>> option, ...)
>>>>>>>>>>
>>>>>>>>>> >>> * For the Python side, is what's shown in the commit the
>>>>>>>>>> recommended
>>>>>>>>>> way to define a custom transform (it would eventually live in a
>>>>>>>>>> reusable
>>>>>>>>>> custom module that pipeline authors can import)? Also, the
>>>>>>>>>> example does not
>>>>>>>>>> have the configuration part covered yet..
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> >> The only standard unbounded source API offered by Python SDK
>>>>>>>>>> is the
>>>>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying
>>>>>>>>>> to add a
>>>>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>>>>>>>> comparing
>>>>>>>>>> different Kafka Python client libraries. Will share more
>>>>>>>>>> information on
>>>>>>>>>> this soon.
>>>>>>>>>>
>>>>>>>>>> >> I understand this might not be possible in all cases and we
>>>>>>>>>> might want
>>>>>>>>>> to consider adding a native source/sink implementations. But this
>>>>>>>>>> will
>>>>>>>>>> result in the implementation being runner-specific (each runner
>>>>>>>>>> will have
>>>>>>>>>> to have it's own source/sink implementation). So I think we
>>>>>>>>>> should try to
>>>>>>>>>> add connector implementations to Beam using the standard API
>>>>>>>>>> whenever
>>>>>>>>>> possible. We also have plans to implement support for cross SDK
>>>>>>>>>> transforms
>>>>>>>>>> in the future (so that we can utilize Java implementation from
>>>>>>>>>> Python for
>>>>>>>>>> example) but we are not there yet and we might still want to
>>>>>>>>>> implement a
>>>>>>>>>> connector for a given SDK if there's good client library support.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> > It is great that the Python SDK will have connectors that are
>>>>>>>>>> written in
>>>>>>>>>> Python in the future, but I think it is equally if not more
>>>>>>>>>> important to be
>>>>>>>>>> able to use at least the Java Beam connectors with Python SDK
>>>>>>>>>> (and any
>>>>>>>>>> other non-Java SDK). Especially in a fully managed environment it
>>>>>>>>>> should be
>>>>>>>>>> possible to offer this to users in a way that is largely
>>>>>>>>>> transparent. It
>>>>>>>>>> takes significant time and effort to mature connectors and I'm
>>>>>>>>>> not sure it
>>>>>>>>>> is realistic to repeat that for all external systems in multiple
>>>>>>>>>> languages.
>>>>>>>>>> Or, to put it in another way, it is likely that instead of one
>>>>>>>>>> over time
>>>>>>>>>> rock solid connector per external system there will be multiple
>>>>>>>>>> less mature
>>>>>>>>>> implementations. That's also the reason we internally want to use
>>>>>>>>>> the Flink
>>>>>>>>>> native connectors - we know what they can and cannot do and want
>>>>>>>>>> to
>>>>>>>>>> leverage the existing investment.
>>>>>>>>>>
>>>>>>>>>> There are two related issues here: how to specify transforms
>>>>>>>>>> (such as
>>>>>>>>>> sources) in a language-independent manner, and how specific
>>>>>>>>>> runners can
>>>>>>>>>> recognize and run them, but URNs solve both. For  this we use
>>>>>>>>>> URNs: the
>>>>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that
>>>>>>>>>> fully
>>>>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and
>>>>>>>>>> payload.) A runner that understands these URNs is free to make any
>>>>>>>>>> (semantically-equivalent) substitutions it wants for this
>>>>>>>>>> transform.
>>>>>>>>>>
>>>>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be a
>>>>>>>>>> DoFn that
>>>>>>>>>> fails loudly if it's actually called in the short term rather
>>>>>>>>>> than a full
>>>>>>>>>> Python implementation. Eventually, we would like to be able to
>>>>>>>>>> call out to
>>>>>>>>>> another SDK to expand full transforms (e.g. more complicated ones
>>>>>>>>>> like
>>>>>>>>>> BigQueryIO).
>>>>>>>>>>
>>>>>>>>>> >>> * Cross-language coders: In this example the Kafka source only
>>>>>>>>>> considers the message value and uses the byte coder that both
>>>>>>>>>> sides
>>>>>>>>>> understand. If I wanted to pass on the key and possibly other
>>>>>>>>>> metadata to
>>>>>>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO),
>>>>>>>>>> then a
>>>>>>>>>> specific coder is needed. Such coder could be written using
>>>>>>>>>> protobuf, Avro
>>>>>>>>>> etc, but it would also need to be registered.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> >> I think this requirement goes away if we implement Kafka in
>>>>>>>>>> Python SDK.
>>>>>>>>>>
>>>>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or
>>>>>>>>>> rather any
>>>>>>>>>> that isn't only using PCollection<byte[]>? Is there a language
>>>>>>>>>> agnostic
>>>>>>>>>> encoding for KV<?,?>, for example?
>>>>>>>>>>
>>>>>>>>>> Yes, Coders are also specified by URN (+components and/or
>>>>>>>>>> payload), and
>>>>>>>>>> there are a couple of standard ones, including KV. See
>>>>>>>>>>
>>>>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>>>>>>> This is not a closed set.
>>>>>>>>>>
>>>>>>>>>> - Robert
>>>>>>>>>>
>>>>>>>>>

Reply via email to