I agree with Cham's motivations as far as "we need it now" and getting Python SDF up and running and exercised on a real connector.
But I do find the current API of BigQueryIO to be a poor example. That particular functionality on BigQueryIO seems extraneous and goes against our own style guide [1]. The recommended way to write it would be for BigQueryIO to output a natural concrete type (like TableRow) and allow the following step to do conversions. This is a broader programming best practice - unless there is a compelling reason, you should just return the value rather than accept a higher-order function to operate on the value. Is there a compelling reason in this case? I just dug through the code and just see that it bottoms out in AvroSource where it does not seem to add functionality. Considering cross-language pipelines as a primary use case for all connectors, perhaps we should audit them and bring them into alignment now, deprecating paths using higher-order functions. We can still consider host-language convenience composites. For an unbounded source like KafkaIO the compelling reason is the timestamp extracting function to be able to maintain a watermark. Notably, PubsubIO does not accept such a function, but requires the timestamp to be in a metadata field that any language can describe (versus having to parse the message to pull out the timestamp). Kenn [1] https://beam.apache.org/contribute/ptransform-style-guide/#choosing-types-of-input-and-output-pcollections On Mon, Apr 30, 2018 at 9:27 AM Reuven Lax <re...@google.com> wrote: > Another point: cross-language IOs might add a performance penalty in many > cases. For an example of this look at BigQueryIO. The user can register a > SerializableFunction that is evaluated on every record, and determines > which destination to write the record to. Now a Python user would want to > register a Python function for this of course. this means that the Java IO > would have to invoke Python code for each record it sees, which will likely > be a big performance hit. > > Of course the downside of duplicating IOs is exactly as you say - multiple > versions to maintain, and potentially duplicate bugs. I think the right > answer will need to be on a case-by-case basis. > > Reuven > > On Mon, Apr 30, 2018 at 8:05 AM Chamikara Jayalath <chamik...@google.com> > wrote: > >> Hi Aljoscha, >> >> I tried to cover this in the doc. Once we have full support for >> cross-language IO, we can decide this on a case-by-case basis. But I don't >> think we should cease defining new sources/sinks for Beam Python SDK till >> we get to that point. I think there are good reasons for adding Kafka >> support for Python today and many Beam users have request this. Also, note >> that proposed Python Kafka source will be based on the Splittable DoFn >> framework while the current Java version is based on the UnboundedSource >> framework. Here are the reasons that are currently listed in the doc. >> >> >> - >> >> Users might find it useful to have at least one unbounded source and >> sink combination implemented in Python SDK and Kafka is the streaming >> system that makes most sense to support if we just want to add support for >> only one such system in Python SDK. >> - >> >> Not all runners might support cross-language IO. Also some >> user/runner/deployment combinations might require an unbounded source/sink >> implemented in Python SDK. >> - >> >> We recently added Splittable DoFn support to Python SDK. It will be >> good to have at least one production quality Splittable DoFn that >> will server as a good example for any users who wish to implement new >> Splittable DoFn implementations on top of Beam Python SDK. >> - >> >> Cross-language transform feature is currently is in the initial >> discussion phase and it could be some time before we can offer existing >> Java implementation of Kafka for Python SDK users. >> - >> >> Cross-language IO might take even longer to reach the point where >> it's fully equivalent in expressive power to a transform written in the >> host language - e.g. supporting host-language lambdas as part of the >> transform configuration is likely to take a lot longer than "first-order" >> cross-language IO. KafkaIO in Java uses lambdas as part of transform >> configuration, e.g. timestamp functions. >> >> >> Thanks, >> Cham >> >> On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek <aljos...@apache.org> >> wrote: >> >>> Is this what we want to do in the long run, i.e. implement copies of >>> connectors for different SDKs? I thought the plan was to enable using >>> connectors written in different languages, i.e. use the Java Kafka I/O from >>> python. This way we wouldn't duplicate bugs for three different language >>> (Java, Python, and Go for now). >>> >>> Best, >>> Aljoscha >>> >>> >>> On 29. Apr 2018, at 20:46, Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>> Thanks Cham, this is great! I left just a couple of comments on the doc. >>> >>> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath < >>> chamik...@google.com> wrote: >>> >>>> Hi All, >>>> >>>> I'm looking into adding a Kafka connector to Beam Python SDK. I think >>>> this will benefits many Python SDK users and will serve as a good example >>>> for recently added Splittable DoFn API (Fn API support which will allow all >>>> runners to use Python Splittable DoFn is in active development). I created >>>> a document [1] that makes the case for adding this connector and compares >>>> the performance of available Python Kafka client libraries. Also I created >>>> a POC [2] that illustrates the API and how Python SDF API can be used to >>>> implement a Kafka source. I extremely appreciate any feedback related to >>>> this. >>>> >>>> [1] >>>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing >>>> [2] >>>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9 >>>> >>>> Thanks, >>>> Cham >>>> >>> >>>