Hi Thomas,

Seems like we are working on similar (partially) things :).

On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <[email protected]> wrote:

> I'm working on a mini POC to enable Kafka as custom streaming source for a
> Python pipeline executing on the (in-progress) portable Flink runner.
>
> We eventually want to use the same native Flink connectors for sources and
> sinks that we also use in other Flink jobs.
>

Could you clarify what you mean by same Flink connector ? Do you mean that
Beam-based and non-Beam-based versions of Flink will use the same Kafka
connector implementation ?


> I got a simple example to work with the FlinkKafkaConsumer010 reading from
> Kafka and a Python lambda logging the value. The code is here:
>
>
> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>
>

> I'm looking for feedback/opinions on the following items in particular:
>
> * Enabling custom translation on the Flink portable runner (custom
> translator could be loaded with ServiceLoader, additional translations
> could also be specified as job server configuration, pipeline option, ...)
>
> * For the Python side, is what's shown in the commit the recommended way
> to define a custom transform (it would eventually live in a reusable custom
> module that pipeline authors can import)? Also, the example does not have
> the configuration part covered yet..
>

The only standard unbounded source API offered by Python SDK is the
Splittable DoFn API. This is the part I'm working on. I'm trying to add a
Kafka connector for Beam Python SDK using SDF API. JIRA is
https://issues.apache.org/jira/browse/BEAM-3788. I'm currently comparing
different Kafka Python client libraries. Will share more information on
this soon.

I understand this might not be possible in all cases and we might want to
consider adding a native source/sink implementations. But this will result
in the implementation being runner-specific (each runner will have to have
it's own source/sink implementation). So I think we should try to add
connector implementations to Beam using the standard API whenever possible.
We also have plans to implement support for cross SDK transforms in the
future (so that we can utilize Java implementation from Python for example)
but we are not there yet and we might still want to implement a connector
for a given SDK if there's good client library support.


>
> * Cross-language coders: In this example the Kafka source only considers
> the message value and uses the byte coder that both sides understand. If I
> wanted to pass on the key and possibly other metadata to the Python
> transform (similar to KafkaRecord from Java KafkaIO), then a specific coder
> is needed. Such coder could be written using protobuf, Avro etc, but it
> would also need to be registered.
>

I think this requirement goes away if we implement Kafka in Python SDK.


> Thanks,
> Thomas
>
>
Thanks,
Cham

Reply via email to