Keep in mind that these user-supplied lambdas are commonly used in our IOs. One common usage is in Sink IOs, to allow dynamic destinations. e.g. in BigQueryIO.Write, a user-supplied lambda determines what table a record should be written to.
Given that IOs are one of the big selling points of cross-language support, we should think about how we can support this functionality. Reuven On Thu, Jan 24, 2019 at 8:34 AM Robert Bradshaw <[email protected]> wrote: > On Thu, Jan 24, 2019 at 5:08 PM Thomas Weise <[email protected]> wrote: > > > > Exciting to see the cross-language train gathering steam :) > > > > It may be useful to flesh out the user facing aspects a bit more before > going too deep on the service / expansion side or maybe that was done > elsewhere? > > It's been discussed, but no resolution yet. > > > A few examples (of varying complexity) of how the shim/proxy transforms > would look like in the other SDKs. Perhaps Java KafkaIO in Python and Go > would be a good candidate? > > The core implementation would, almost by definition, be > > input.apply(ExternalTransform(URN, payload, service_address). > > Nicer shims would just be composite transforms that call this, filling > in the URNs, payloads, and possibly service details from more > user-friendly parameters. > > > One problem we discovered with custom Flink native transforms for Python > was handling of lambdas / functions. An example could be a user defined > watermark timestamp extractor that the user should be able to supply in > Python and the JVM cannot handle. > > Yes, this has never been resolved satisfactorily. For now, if UDFs can > be reified in terms of a commonly-understood URN + payload, it'll > work. A transform could provide a wide range of "useful" URNs for its > internal callbacks, more than that would require significant design if > it can't be pre- or post-fixed. > > > On Wed, Jan 23, 2019 at 7:04 PM Chamikara Jayalath <[email protected]> > wrote: > >> > >> > >> > >> On Wed, Jan 23, 2019 at 1:03 PM Robert Bradshaw <[email protected]> > wrote: > >>> > >>> On Wed, Jan 23, 2019 at 6:38 PM Maximilian Michels <[email protected]> > wrote: > >>> > > >>> > Thank you for starting on the cross-language feature Robert! > >>> > > >>> > Just to recap: Each SDK runs an ExpansionService which can be > contacted during > >>> > pipeline translation to expand transforms that are unknown to the > SDK. The > >>> > service returns the Proto definitions to the querying process. > >>> > >>> Yep. Technically it doesn't have to be the SDK, or even if it is there > >>> may be a variety of services (e.g. one offering SQL, one offering > >>> different IOs). > >>> > >>> > There will be multiple environments such that during execution > cross-language > >>> > pipelines select the appropriate environment for a transform. > >>> > >>> Exactly. And fuses only those steps with compatible environments > together. > >>> > >>> > It's not clear to me, should the expansion happen during pipeline > construction > >>> > or during translation by the Runner? > >>> > >>> I think it need to happen as part of construction because the set of > >>> outputs (and their properties) can be dynamic based on the expansion. > >> > >> > >> Also, without expansion at pipeline construction, we'll have to define > all composite cross-language transforms as runner-native transforms which > won't be practical ? > >> > >>> > >>> > >>> > Thanks, > >>> > Max > >>> > > >>> > On 23.01.19 04:12, Robert Bradshaw wrote: > >>> > > No, this PR simply takes an endpoint address as a parameter, > expecting > >>> > > it to already be up and available. More convenient APIs, e.g. ones > >>> > > that spin up and endpoint and tear it down, or catalog and locate > code > >>> > > and services offering these endpoints, could be provided as > wrappers > >>> > > on top of or extensions of this. > >>> > > > >>> > > On Wed, Jan 23, 2019 at 12:19 AM Kenneth Knowles <[email protected]> > wrote: > >>> > >> > >>> > >> Nice! If I recall correctly, there was mostly concern about how > to launch and manage the expansion service (Docker? Vendor-specific? Etc). > Does this PR a position on that question? > >>> > >> > >>> > >> Kenn > >>> > >> > >>> > >> On Tue, Jan 22, 2019 at 1:44 PM Chamikara Jayalath < > [email protected]> wrote: > >>> > >>> > >>> > >>> > >>> > >>> > >>> > >>> On Tue, Jan 22, 2019 at 11:35 AM Udi Meiri <[email protected]> > wrote: > >>> > >>>> > >>> > >>>> Also debugability: collecting logs from each of these systems. > >>> > >>> > >>> > >>> > >>> > >>> Agree. > >>> > >>> > >>> > >>>> > >>> > >>>> > >>> > >>>> On Tue, Jan 22, 2019 at 10:53 AM Chamikara Jayalath < > [email protected]> wrote: > >>> > >>>>> > >>> > >>>>> Thanks Robert. > >>> > >>>>> > >>> > >>>>> On Tue, Jan 22, 2019 at 4:39 AM Robert Bradshaw < > [email protected]> wrote: > >>> > >>>>>> > >>> > >>>>>> Now that we have the FnAPI, I started playing around with > support for > >>> > >>>>>> cross-language pipelines. This will allow things like IOs to > be shared > >>> > >>>>>> across all languages, SQL to be invoked from non-Java, TFX > tensorflow > >>> > >>>>>> transforms to be invoked from non-Python, etc. and I think is > the next > >>> > >>>>>> step in extending (and taking advantage of) the portability > layer > >>> > >>>>>> we've developed. These are often composite transforms whose > inner > >>> > >>>>>> structure depends in non-trivial ways on their configuration. > >>> > >>>>> > >>> > >>>>> > >>> > >>>>> Some additional benefits of cross-language transforms are > given below. > >>> > >>>>> > >>> > >>>>> (1) Current large collection of Java IO connectors will be > become available to other languages. > >>> > >>>>> (2) Current Java and Python transforms will be available for > Go and any other future SDKs. > >>> > >>>>> (3) New transform authors will be able to pick their language > of choice and make their transform available to all Beam SDKs. For example, > this can be the language the transform author is most familiar with or the > only language for which a client library is available for connecting to an > external data store. > >>> > >>>>> > >>> > >>>>>> > >>> > >>>>>> I created a PR [1] that basically follows the "expand via an > external > >>> > >>>>>> process" over RPC alternative from the proposals we came up > with when > >>> > >>>>>> we were discussing this last time [2]. There are still some > unknowns, > >>> > >>>>>> e.g. how to handle artifacts supplied by an alternative SDK > (they > >>> > >>>>>> currently must be provided by the environment), but I think > this is a > >>> > >>>>>> good incremental step forward that will already be useful in > a large > >>> > >>>>>> number of cases. It would be good to validate the general > direction > >>> > >>>>>> and I would be interested in any feedback others may have on > it. > >>> > >>>>> > >>> > >>>>> > >>> > >>>>> I think there are multiple semi-dependent problems we have to > tackle to reach the final goal of supporting fully-fledged cross-language > transforms in Beam. I agree with taking an incremental approach here with > overall vision in mind. Some other problems we have to tackle involve > following. > >>> > >>>>> > >>> > >>>>> * Defining a user API that will allow pipelines defined in a > SDK X to use transforms defined in SDK Y. > >>> > >>>>> * Update various runners to use URN/payload based environment > definition [1] > >>> > >>>>> * Updating various runners to support starting containers for > multiple environments/languages for the same pipeline and supporting > executing pipeline steps in containers started for multiple environments. > >>> > >>> > >>> > >>> > >>> > >>> I've been working with +Heejong Lee to add some of the missing > pieces mentioned above. > >>> > >>> > >>> > >>> We created following doc that captures some of the ongoing work > related to cross-language transforms and which will hopefully serve as a > knowledge base for anybody who wish to quickly learn context related to > this. > >>> > >>> Feel free to refer to this and/or add to this. > >>> > >>> > >>> > >>> > https://docs.google.com/document/d/1H3yCyVFI9xYs1jsiF1GfrDtARgWGnLDEMwG5aQIx2AU/edit?usp=sharing > >>> > >>> > >>> > >>> > >>> > >>>>> > >>> > >>>>> > >>> > >>>>> Thanks, > >>> > >>>>> Cham > >>> > >>>>> > >>> > >>>>> [1] > https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L952 > >>> > >>>>> > >>> > >>>>> > >>> > >>>>> > >>> > >>>>> > >>> > >>>>> > >>> > >>>>> > >>> > >>>>> > >>> > >>>>>> > >>> > >>>>>> > >>> > >>>>>> - Robert > >>> > >>>>>> > >>> > >>>>>> [1] https://github.com/apache/beam/pull/7316 > >>> > >>>>>> [2] https://s.apache.org/beam-mixed-language-pipelines >
