On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <[email protected]> wrote:
> > > On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <[email protected]> wrote: > >> > Note that a KafkaDoFn still needs to be provided, but could be a DoFn >> that >> > fails loudly if it's actually called in the short term rather than a >> full >> > Python implementation. >> >> For configurable runner-native IO, for now, I think it is reasonable to >> use a URN + special data payload directly without a KafkaDoFn -- assuming >> it's a portable pipeline. That's what we do in Go for PubSub-on-Dataflow >> and something similar would work for Kafka-on-Flink as well. I agree that >> non-native alternative implementation is desirable, but if one is not >> present we should IMO rather fail at job submission instead of at runtime. >> I could imagine connectors intrinsic to an execution engine where >> non-native implementations are not possible. >> > > I think, in this case, KafkaDoFn can be a SDF that would expand similar to > any other SDF by default (initial splitting, GBK, and a map-task > equivalent, for example) but a runner (Flink in this case) will be free to > override it with an runner-native implementation if desired. I assume > runner will have a chance to perform this override before the SDF expansion > (which is not fully designed yet). Providing a separate source/sink > transforms for Flink native Kafka will be an option as well, but that will > be less desirable from a Python user API perspective. > Are we sure that the internal SDF will provide the same functionality as the native one? What if the Kafka SDF is in the middle of a pipeline - can Flink support that? Having a separate transform for the Flink native source might be a better user experience than having one that changes its behavior in strange ways depending on the runner. > > >> >> >> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <[email protected]> >> wrote: >> >>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <[email protected]> wrote: >>> >>> > Hi Cham, >>> >>> > Thanks for the feedback! >>> >>> > I should have probably clarified that my POC and questions aren't >>> specific to Kafka as source, but pretty much any other source/sink that >>> we >>> internally use as well. We have existing Flink pipelines that are written >>> in Java and we want to use the same connectors with the Python SDK on top >>> of the already operationalized Flink stack. Therefore, portability isn't >>> a >>> concern as much as the ability to integrate is. >>> >> > Thanks for the clarification. Agree that providing runner-native > implementations of established source/sinks can be can be desirable in some > cases. > > >>> > --> >>> >>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath >>> > <[email protected]> >>> wrote: >>> >>> >> Hi Thomas, >>> >>> >> Seems like we are working on similar (partially) things :). >>> >>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <[email protected]> wrote: >>> >>> >>> I'm working on a mini POC to enable Kafka as custom streaming source >>> for a Python pipeline executing on the (in-progress) portable Flink >>> runner. >>> >>> >>> We eventually want to use the same native Flink connectors for >>> sources >>> and sinks that we also use in other Flink jobs. >>> >>> >>> >> Could you clarify what you mean by same Flink connector ? Do you mean >>> that Beam-based and non-Beam-based versions of Flink will use the same >>> Kafka connector implementation ? >>> >>> >>> > The native Flink sources as shown in the example below, not the Beam >>> KafkaIO or other Beam sources. >>> >>> >>> >>> >>> I got a simple example to work with the FlinkKafkaConsumer010 reading >>> from Kafka and a Python lambda logging the value. The code is here: >>> >>> >>> >>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9 >>> >>> >>> >>> >>> I'm looking for feedback/opinions on the following items in >>> particular: >>> >>> >>> * Enabling custom translation on the Flink portable runner (custom >>> translator could be loaded with ServiceLoader, additional translations >>> could also be specified as job server configuration, pipeline option, >>> ...) >>> >>> >>> * For the Python side, is what's shown in the commit the recommended >>> way to define a custom transform (it would eventually live in a reusable >>> custom module that pipeline authors can import)? Also, the example does >>> not >>> have the configuration part covered yet.. >>> >>> >>> >> The only standard unbounded source API offered by Python SDK is the >>> Splittable DoFn API. This is the part I'm working on. I'm trying to add a >>> Kafka connector for Beam Python SDK using SDF API. JIRA is >>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently comparing >>> different Kafka Python client libraries. Will share more information on >>> this soon. >>> >>> >> I understand this might not be possible in all cases and we might want >>> to consider adding a native source/sink implementations. But this will >>> result in the implementation being runner-specific (each runner will have >>> to have it's own source/sink implementation). So I think we should try to >>> add connector implementations to Beam using the standard API whenever >>> possible. We also have plans to implement support for cross SDK >>> transforms >>> in the future (so that we can utilize Java implementation from Python for >>> example) but we are not there yet and we might still want to implement a >>> connector for a given SDK if there's good client library support. >>> >>> >>> > It is great that the Python SDK will have connectors that are written >>> in >>> Python in the future, but I think it is equally if not more important to >>> be >>> able to use at least the Java Beam connectors with Python SDK (and any >>> other non-Java SDK). Especially in a fully managed environment it should >>> be >>> possible to offer this to users in a way that is largely transparent. It >>> takes significant time and effort to mature connectors and I'm not sure >>> it >>> is realistic to repeat that for all external systems in multiple >>> languages. >>> Or, to put it in another way, it is likely that instead of one over time >>> rock solid connector per external system there will be multiple less >>> mature >>> implementations. That's also the reason we internally want to use the >>> Flink >>> native connectors - we know what they can and cannot do and want to >>> leverage the existing investment. >>> >>> There are two related issues here: how to specify transforms (such as >>> sources) in a language-independent manner, and how specific runners can >>> recognize and run them, but URNs solve both. For this we use URNs: the >>> composite ReadFromKafka PTransform (that consists of a Impulse + >>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that fully >>> specifies this read. (The KafkaDoFn could similarly have a URN and >>> payload.) A runner that understands these URNs is free to make any >>> (semantically-equivalent) substitutions it wants for this transform. >>> >>> Note that a KafkaDoFn still needs to be provided, but could be a DoFn >>> that >>> fails loudly if it's actually called in the short term rather than a full >>> Python implementation. Eventually, we would like to be able to call out >>> to >>> another SDK to expand full transforms (e.g. more complicated ones like >>> BigQueryIO). >>> >>> >>> * Cross-language coders: In this example the Kafka source only >>> considers the message value and uses the byte coder that both sides >>> understand. If I wanted to pass on the key and possibly other metadata to >>> the Python transform (similar to KafkaRecord from Java KafkaIO), then a >>> specific coder is needed. Such coder could be written using protobuf, >>> Avro >>> etc, but it would also need to be registered. >>> >>> >>> >> I think this requirement goes away if we implement Kafka in Python >>> SDK. >>> >>> > Wouldn't this be needed for any cross language pipeline? Or rather any >>> that isn't only using PCollection<byte[]>? Is there a language agnostic >>> encoding for KV<?,?>, for example? >>> >>> Yes, Coders are also specified by URN (+components and/or payload), and >>> there are a couple of standard ones, including KV. See >>> >>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md >>> This is not a closed set. >>> >>> - Robert >>> >>
