I believe that most (all?) of these cases of executing a lambda could be avoided if we passed along structured records like: { table_name: row: { ... } }
On Mon, Apr 30, 2018 at 10:24 AM Chamikara Jayalath <chamik...@google.com> wrote: > > > On Mon, Apr 30, 2018 at 9:54 AM Kenneth Knowles <k...@google.com> wrote: > >> I agree with Cham's motivations as far as "we need it now" and getting >> Python SDF up and running and exercised on a real connector. >> >> But I do find the current API of BigQueryIO to be a poor example. That >> particular functionality on BigQueryIO seems extraneous and goes against >> our own style guide [1]. The recommended way to write it would be for >> BigQueryIO to output a natural concrete type (like TableRow) and allow the >> following step to do conversions. This is a broader programming best >> practice - unless there is a compelling reason, you should just return the >> value rather than accept a higher-order function to operate on the value. >> Is there a compelling reason in this case? I just dug through the code and >> just see that it bottoms out in AvroSource where it does not seem to add >> functionality. >> > > I think what Reuven was referring to was a functionality of the BQ sink > not the source. Even if we set the destination in an additional step, this > will still have to cross SDK harness boundary just to set the property > (invoking BQ destination lambda in this case), won't it? Java will be able > to fuse steps in this case but Python/Go will involve "some work in Java > SDK harness" + "invoking Python/Go harness to invoke the BQ destination > lambda" + "invoke Java harness to continue processing", including > serialization/deserialization in between. This could still result in a > significant amount of overhead for SDKs other than the one where the IO is > implemented. > > >> >> Considering cross-language pipelines as a primary use case for all >> connectors, perhaps we should audit them and bring them into alignment now, >> deprecating paths using higher-order functions. We can still consider >> host-language convenience composites. >> >> For an unbounded source like KafkaIO the compelling reason is the >> timestamp extracting function to be able to maintain a watermark. Notably, >> PubsubIO does not accept such a function, but requires the timestamp to be >> in a metadata field that any language can describe (versus having to parse >> the message to pull out the timestamp). >> >> Kenn >> >> [1] >> https://beam.apache.org/contribute/ptransform-style-guide/#choosing-types-of-input-and-output-pcollections >> >> On Mon, Apr 30, 2018 at 9:27 AM Reuven Lax <re...@google.com> wrote: >> >>> Another point: cross-language IOs might add a performance penalty in >>> many cases. For an example of this look at BigQueryIO. The user can >>> register a SerializableFunction that is evaluated on every record, and >>> determines which destination to write the record to. Now a Python user >>> would want to register a Python function for this of course. this means >>> that the Java IO would have to invoke Python code for each record it sees, >>> which will likely be a big performance hit. >>> >>> Of course the downside of duplicating IOs is exactly as you say - >>> multiple versions to maintain, and potentially duplicate bugs. I think the >>> right answer will need to be on a case-by-case basis. >>> >>> Reuven >>> >>> On Mon, Apr 30, 2018 at 8:05 AM Chamikara Jayalath <chamik...@google.com> >>> wrote: >>> >>>> Hi Aljoscha, >>>> >>>> I tried to cover this in the doc. Once we have full support for >>>> cross-language IO, we can decide this on a case-by-case basis. But I don't >>>> think we should cease defining new sources/sinks for Beam Python SDK till >>>> we get to that point. I think there are good reasons for adding Kafka >>>> support for Python today and many Beam users have request this. Also, note >>>> that proposed Python Kafka source will be based on the Splittable DoFn >>>> framework while the current Java version is based on the UnboundedSource >>>> framework. Here are the reasons that are currently listed in the doc. >>>> >>>> >>>> - >>>> >>>> Users might find it useful to have at least one unbounded source >>>> and sink combination implemented in Python SDK and Kafka is the >>>> streaming >>>> system that makes most sense to support if we just want to add support >>>> for >>>> only one such system in Python SDK. >>>> - >>>> >>>> Not all runners might support cross-language IO. Also some >>>> user/runner/deployment combinations might require an unbounded >>>> source/sink >>>> implemented in Python SDK. >>>> - >>>> >>>> We recently added Splittable DoFn support to Python SDK. It will be >>>> good to have at least one production quality Splittable DoFn that >>>> will server as a good example for any users who wish to implement new >>>> Splittable DoFn implementations on top of Beam Python SDK. >>>> - >>>> >>>> Cross-language transform feature is currently is in the initial >>>> discussion phase and it could be some time before we can offer existing >>>> Java implementation of Kafka for Python SDK users. >>>> - >>>> >>>> Cross-language IO might take even longer to reach the point where >>>> it's fully equivalent in expressive power to a transform written in the >>>> host language - e.g. supporting host-language lambdas as part of the >>>> transform configuration is likely to take a lot longer than >>>> "first-order" >>>> cross-language IO. KafkaIO in Java uses lambdas as part of transform >>>> configuration, e.g. timestamp functions. >>>> >>>> >>>> Thanks, >>>> Cham >>>> >>>> On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek <aljos...@apache.org> >>>> wrote: >>>> >>>>> Is this what we want to do in the long run, i.e. implement copies of >>>>> connectors for different SDKs? I thought the plan was to enable using >>>>> connectors written in different languages, i.e. use the Java Kafka I/O >>>>> from >>>>> python. This way we wouldn't duplicate bugs for three different language >>>>> (Java, Python, and Go for now). >>>>> >>>>> Best, >>>>> Aljoscha >>>>> >>>>> >>>>> On 29. Apr 2018, at 20:46, Eugene Kirpichov <kirpic...@google.com> >>>>> wrote: >>>>> >>>>> Thanks Cham, this is great! I left just a couple of comments on the >>>>> doc. >>>>> >>>>> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath < >>>>> chamik...@google.com> wrote: >>>>> >>>>>> Hi All, >>>>>> >>>>>> I'm looking into adding a Kafka connector to Beam Python SDK. I think >>>>>> this will benefits many Python SDK users and will serve as a good example >>>>>> for recently added Splittable DoFn API (Fn API support which will allow >>>>>> all >>>>>> runners to use Python Splittable DoFn is in active development). I >>>>>> created >>>>>> a document [1] that makes the case for adding this connector and compares >>>>>> the performance of available Python Kafka client libraries. Also I >>>>>> created >>>>>> a POC [2] that illustrates the API and how Python SDF API can be used to >>>>>> implement a Kafka source. I extremely appreciate any feedback related to >>>>>> this. >>>>>> >>>>>> [1] >>>>>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing >>>>>> [2] >>>>>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9 >>>>>> >>>>>> Thanks, >>>>>> Cham >>>>>> >>>>> >>>>>