On Fri, Jan 25, 2019 at 12:18 AM Reuven Lax <re...@google.com> wrote:
>
> On Thu, Jan 24, 2019 at 2:38 PM Robert Bradshaw <rober...@google.com> wrote:
>>
>> On Thu, Jan 24, 2019 at 6:43 PM Reuven Lax <re...@google.com> wrote:
>> >
>> > Keep in mind that these user-supplied lambdas are commonly used in our 
>> > IOs. One common usage is in Sink IOs, to allow dynamic destinations. e.g. 
>> > in BigQueryIO.Write, a user-supplied lambda determines what table a record 
>> > should be written to.
>>
>> This can probably be pre-computed upstream (as part of the wrapping
>> composite that does take a language-native lamdba) and placed in a
>> standard format (e.g. a tuple or other schema) to be extracted by the
>> "core" sink.
>
> I'm not quite sure what you mean. How will you express a lambda as a tuple? 
> Or are you suggesting that we preapply all the lambdas and pass the result 
> down?

Exactly.

> That might work, but would be _far_ more expensive.

Calling back to the SDK on each application would likely be (a
different kind of) expensive.

> The result of the lambda is sometimes must larger than the input (e.g. the 
> result could be a fully-qualified  output location string), so these IOs try 
> and delay application as much as possible; as a result, the actual 
> application is often deep inside the graph.

Batching such PRCs gets messy (though perhaps we'll have to go there).
Some hybrid approach where we compute the truly dynamic part eagerly
and do some "boring" (known URN) application like prefixing with a
prefix delayed may sometimes be possible. Some applications may lend
themselves to interleaving (e.g. so the large lambda output is never
shuffled, but still crosses the data plane).

Worst case there are features that simply wouldn't be available, or at
least not cheaply, until an SDK-native source is written, but it could
still be a huge win for a lot of usecases.

As I said, we just don't have any good answers for this bit yet :).

>>
>> > Given that IOs are one of the big selling points of cross-language 
>> > support, we should think about how we can support this functionality.
>>
>> Yes. There are user-supplied lambdas that can't be as easily pre- or
>> post-applied, and though we had some brainstorming sessions (~ a year
>> ago) we're far from a (good) answer to that.
>>
>> > On Thu, Jan 24, 2019 at 8:34 AM Robert Bradshaw <rober...@google.com> 
>> > wrote:
>> >>
>> >> On Thu, Jan 24, 2019 at 5:08 PM Thomas Weise <t...@apache.org> wrote:
>> >> >
>> >> > Exciting to see the cross-language train gathering steam :)
>> >> >
>> >> > It may be useful to flesh out the user facing aspects a bit more before 
>> >> > going too deep on the service / expansion side or maybe that was done 
>> >> > elsewhere?
>> >>
>> >> It's been discussed, but no resolution yet.
>> >>
>> >> > A few examples (of varying complexity) of how the shim/proxy transforms 
>> >> > would look like in the other SDKs. Perhaps Java KafkaIO in Python and 
>> >> > Go would be a good candidate?
>> >>
>> >> The core implementation would, almost by definition, be
>> >>
>> >>     input.apply(ExternalTransform(URN, payload, service_address).
>> >>
>> >> Nicer shims would just be composite transforms that call this, filling
>> >> in the URNs, payloads, and possibly service details from more
>> >> user-friendly parameters.
>> >>
>> >> > One problem we discovered with custom Flink native transforms for 
>> >> > Python was handling of lambdas / functions. An example could be a user 
>> >> > defined watermark timestamp extractor that the user should be able to 
>> >> > supply in Python and the JVM cannot handle.
>> >>
>> >> Yes, this has never been resolved satisfactorily. For now, if UDFs can
>> >> be reified in terms of a commonly-understood URN + payload, it'll
>> >> work. A transform could provide a wide range of "useful" URNs for its
>> >> internal callbacks, more than that would require significant design if
>> >> it can't be pre- or post-fixed.
>> >>
>> >> > On Wed, Jan 23, 2019 at 7:04 PM Chamikara Jayalath 
>> >> > <chamik...@google.com> wrote:
>> >> >>
>> >> >>
>> >> >>
>> >> >> On Wed, Jan 23, 2019 at 1:03 PM Robert Bradshaw <rober...@google.com> 
>> >> >> wrote:
>> >> >>>
>> >> >>> On Wed, Jan 23, 2019 at 6:38 PM Maximilian Michels <m...@apache.org> 
>> >> >>> wrote:
>> >> >>> >
>> >> >>> > Thank you for starting on the cross-language feature Robert!
>> >> >>> >
>> >> >>> > Just to recap: Each SDK runs an ExpansionService which can be 
>> >> >>> > contacted during
>> >> >>> > pipeline translation to expand transforms that are unknown to the 
>> >> >>> > SDK. The
>> >> >>> > service returns the Proto definitions to the querying process.
>> >> >>>
>> >> >>> Yep. Technically it doesn't have to be the SDK, or even if it is there
>> >> >>> may be a variety of services (e.g. one offering SQL, one offering
>> >> >>> different IOs).
>> >> >>>
>> >> >>> > There will be multiple environments such that during execution 
>> >> >>> > cross-language
>> >> >>> > pipelines select the appropriate environment for a transform.
>> >> >>>
>> >> >>> Exactly. And fuses only those steps with compatible environments 
>> >> >>> together.
>> >> >>>
>> >> >>> > It's not clear to me, should the expansion happen during pipeline 
>> >> >>> > construction
>> >> >>> > or during translation by the Runner?
>> >> >>>
>> >> >>> I think it need to happen as part of construction because the set of
>> >> >>> outputs (and their properties) can be dynamic based on the expansion.
>> >> >>
>> >> >>
>> >> >> Also, without expansion at pipeline construction, we'll have to define 
>> >> >> all composite cross-language transforms as runner-native transforms 
>> >> >> which won't be practical ?
>> >> >>
>> >> >>>
>> >> >>>
>> >> >>> > Thanks,
>> >> >>> > Max
>> >> >>> >
>> >> >>> > On 23.01.19 04:12, Robert Bradshaw wrote:
>> >> >>> > > No, this PR simply takes an endpoint address as a parameter, 
>> >> >>> > > expecting
>> >> >>> > > it to already be up and available. More convenient APIs, e.g. ones
>> >> >>> > > that spin up and endpoint and tear it down, or catalog and locate 
>> >> >>> > > code
>> >> >>> > > and services offering these endpoints, could be provided as 
>> >> >>> > > wrappers
>> >> >>> > > on top of or extensions of this.
>> >> >>> > >
>> >> >>> > > On Wed, Jan 23, 2019 at 12:19 AM Kenneth Knowles 
>> >> >>> > > <k...@apache.org> wrote:
>> >> >>> > >>
>> >> >>> > >> Nice! If I recall correctly, there was mostly concern about how 
>> >> >>> > >> to launch and manage the expansion service (Docker? 
>> >> >>> > >> Vendor-specific? Etc). Does this PR a position on that question?
>> >> >>> > >>
>> >> >>> > >> Kenn
>> >> >>> > >>
>> >> >>> > >> On Tue, Jan 22, 2019 at 1:44 PM Chamikara Jayalath 
>> >> >>> > >> <chamik...@google.com> wrote:
>> >> >>> > >>>
>> >> >>> > >>>
>> >> >>> > >>>
>> >> >>> > >>> On Tue, Jan 22, 2019 at 11:35 AM Udi Meiri <eh...@google.com> 
>> >> >>> > >>> wrote:
>> >> >>> > >>>>
>> >> >>> > >>>> Also debugability: collecting logs from each of these systems.
>> >> >>> > >>>
>> >> >>> > >>>
>> >> >>> > >>> Agree.
>> >> >>> > >>>
>> >> >>> > >>>>
>> >> >>> > >>>>
>> >> >>> > >>>> On Tue, Jan 22, 2019 at 10:53 AM Chamikara Jayalath 
>> >> >>> > >>>> <chamik...@google.com> wrote:
>> >> >>> > >>>>>
>> >> >>> > >>>>> Thanks Robert.
>> >> >>> > >>>>>
>> >> >>> > >>>>> On Tue, Jan 22, 2019 at 4:39 AM Robert Bradshaw 
>> >> >>> > >>>>> <rober...@google.com> wrote:
>> >> >>> > >>>>>>
>> >> >>> > >>>>>> Now that we have the FnAPI, I started playing around with 
>> >> >>> > >>>>>> support for
>> >> >>> > >>>>>> cross-language pipelines. This will allow things like IOs to 
>> >> >>> > >>>>>> be shared
>> >> >>> > >>>>>> across all languages, SQL to be invoked from non-Java, TFX 
>> >> >>> > >>>>>> tensorflow
>> >> >>> > >>>>>> transforms to be invoked from non-Python, etc. and I think 
>> >> >>> > >>>>>> is the next
>> >> >>> > >>>>>> step in extending (and taking advantage of) the portability 
>> >> >>> > >>>>>> layer
>> >> >>> > >>>>>> we've developed. These are often composite transforms whose 
>> >> >>> > >>>>>> inner
>> >> >>> > >>>>>> structure depends in non-trivial ways on their configuration.
>> >> >>> > >>>>>
>> >> >>> > >>>>>
>> >> >>> > >>>>> Some additional benefits of cross-language transforms are 
>> >> >>> > >>>>> given below.
>> >> >>> > >>>>>
>> >> >>> > >>>>> (1) Current large collection of Java IO connectors will be 
>> >> >>> > >>>>> become available to other languages.
>> >> >>> > >>>>> (2) Current Java and Python transforms will be available for 
>> >> >>> > >>>>> Go and any other future SDKs.
>> >> >>> > >>>>> (3) New transform authors will be able to pick their language 
>> >> >>> > >>>>> of choice and make their transform available to all Beam 
>> >> >>> > >>>>> SDKs. For example, this can be the language the transform 
>> >> >>> > >>>>> author is most familiar with or the only language for which a 
>> >> >>> > >>>>> client library is available for connecting to an external 
>> >> >>> > >>>>> data store.
>> >> >>> > >>>>>
>> >> >>> > >>>>>>
>> >> >>> > >>>>>> I created a PR [1] that basically follows the "expand via an 
>> >> >>> > >>>>>> external
>> >> >>> > >>>>>> process" over RPC alternative from the proposals we came up 
>> >> >>> > >>>>>> with when
>> >> >>> > >>>>>> we were discussing this last time [2]. There are still some 
>> >> >>> > >>>>>> unknowns,
>> >> >>> > >>>>>> e.g. how to handle artifacts supplied by an alternative SDK 
>> >> >>> > >>>>>> (they
>> >> >>> > >>>>>> currently must be provided by the environment), but I think 
>> >> >>> > >>>>>> this is a
>> >> >>> > >>>>>> good incremental step forward that will already be useful in 
>> >> >>> > >>>>>> a large
>> >> >>> > >>>>>> number of cases. It would be good to validate the general 
>> >> >>> > >>>>>> direction
>> >> >>> > >>>>>> and I would be interested in any feedback others may have on 
>> >> >>> > >>>>>> it.
>> >> >>> > >>>>>
>> >> >>> > >>>>>
>> >> >>> > >>>>> I think there are multiple semi-dependent problems we have to 
>> >> >>> > >>>>> tackle to reach the final goal of supporting fully-fledged 
>> >> >>> > >>>>> cross-language transforms in Beam. I agree with taking an 
>> >> >>> > >>>>> incremental approach here with overall vision in mind. Some 
>> >> >>> > >>>>> other problems we have to tackle involve following.
>> >> >>> > >>>>>
>> >> >>> > >>>>> * Defining a user API that will allow pipelines defined in a 
>> >> >>> > >>>>> SDK X to use transforms defined in SDK Y.
>> >> >>> > >>>>> * Update various runners to use URN/payload based environment 
>> >> >>> > >>>>> definition [1]
>> >> >>> > >>>>> * Updating various runners to support starting containers for 
>> >> >>> > >>>>> multiple environments/languages for the same pipeline and 
>> >> >>> > >>>>> supporting executing pipeline steps in containers started for 
>> >> >>> > >>>>> multiple environments.
>> >> >>> > >>>
>> >> >>> > >>>
>> >> >>> > >>> I've been working with +Heejong Lee to add some of the missing 
>> >> >>> > >>> pieces mentioned above.
>> >> >>> > >>>
>> >> >>> > >>> We created following doc that captures some of the ongoing work 
>> >> >>> > >>> related to cross-language transforms and which will hopefully 
>> >> >>> > >>> serve as a knowledge base for anybody who wish to quickly learn 
>> >> >>> > >>> context related to this.
>> >> >>> > >>> Feel free to refer to this and/or add to this.
>> >> >>> > >>>
>> >> >>> > >>> https://docs.google.com/document/d/1H3yCyVFI9xYs1jsiF1GfrDtARgWGnLDEMwG5aQIx2AU/edit?usp=sharing
>> >> >>> > >>>
>> >> >>> > >>>
>> >> >>> > >>>>>
>> >> >>> > >>>>>
>> >> >>> > >>>>> Thanks,
>> >> >>> > >>>>> Cham
>> >> >>> > >>>>>
>> >> >>> > >>>>> [1] 
>> >> >>> > >>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/proto/beam_runner_api.proto#L952
>> >> >>> > >>>>>
>> >> >>> > >>>>>
>> >> >>> > >>>>>
>> >> >>> > >>>>>
>> >> >>> > >>>>>
>> >> >>> > >>>>>
>> >> >>> > >>>>>
>> >> >>> > >>>>>>
>> >> >>> > >>>>>>
>> >> >>> > >>>>>> - Robert
>> >> >>> > >>>>>>
>> >> >>> > >>>>>> [1] https://github.com/apache/beam/pull/7316
>> >> >>> > >>>>>> [2] https://s.apache.org/beam-mixed-language-pipelines

Reply via email to