On Tue, Nov 5, 2019 at 10:32 AM Hai Lu <lhai...@gmail.com> wrote:
>
> Starting the expansion service in the job server is helpful. But having to 
> expose the port number and to include the address in the 
> beam.ExternalTransform is still a hassle. Giving a hard-coded port number 
> might be the only solution right now but it's not a very clean solution in 
> our case.

Yeah, it's pretty ugly.

> @Robert Bradshaw, yes, one cannot construct the "whole" pipeline first and 
> pass it to the runner, but can't we easily combine the job server and 
> expansion service?

Could you clarify what you mean (if this is more than just starting up
both with the same binary)?

> Also it seems like right now each beam.ExternalTransform is specifying an 
> expansion service address, does it mean we expect multiple expansion 
> services? Is there such use case?

Yes. Unless we build a single expansion service that has every
external transform one may want to use, we'll have multiple expansion
services. And people may have libraries that are not part of Beam. And
it'll be especially common if we have multiple languages, e.g. a Go
pipeline that calls some Python TFX and some Java IO.

In the short term, for Beam Java expansion services (as that's the
most mature SDK so has the most to borrow from), I created
JavaJarExpansionService which has as an argument the path (or URL) to
any Jar whose main class (which might be worth parameterizing) starts
up an expansion service at the specified port. This way all that must
be specified is a pre-build expansion service, and it will choose a
port, start it up, communicate with it, and shut it down in the
background. For transforms that are part of beam,
BeamJarExpansionService [2] makes this even easier: you specify a
gradle target and it will either use the jar built locally (in a dev
environment) or fetch it as a release artifact from Maven.

The one thign to be worked out, as mentioned in the other thread, is
attaching artifacts to environments.

[1] 
https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/transforms/external.py#L421
[2] 
https://github.com/apache/beam/blob/release-2.17.0/sdks/python/apache_beam/transforms/external.py#L452

> Thanks for everyone's response, BTW. This is very helpful! :)
>
> Thanks,
> Hai
>
> On Mon, Nov 4, 2019 at 2:23 PM Robert Bradshaw <rober...@google.com> wrote:
>>
>> To clarify, starting up the Flink Job Server by default starts up an
>> Expansion Service on the hard-coded, default port 8097.
>>
>> On Mon, Nov 4, 2019 at 2:02 PM Thomas Weise <t...@apache.org> wrote:
>> >
>> > The expansion service can be provided by the job server, as done in the 
>> > Flink runner. It needs to be available at pipeline construction time, but 
>> > there is no need to run a separate service.
>> >
>> > Thomas
>> >
>> > On Mon, Nov 4, 2019 at 12:03 PM Robert Bradshaw <rober...@google.com> 
>> > wrote:
>> >>
>> >> On Mon, Nov 4, 2019 at 11:54 AM Chamikara Jayalath <chamik...@google.com> 
>> >> wrote:
>> >> >
>> >> > On Mon, Nov 4, 2019 at 11:01 AM Hai Lu <lhai...@apache.org> wrote:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> We're looking into leveraging the cross language pipeline feature in 
>> >> >> our Beam pipelines on Samza runner. While the feature seems to work 
>> >> >> well, the PTransform expansion as a standalone service isn't very 
>> >> >> convenient. Particularly that the Python pipeline needs to specify the 
>> >> >> address of the expansion service.
>> >> >>
>> >> >> I'm wondering why we couldn't embed the expansion service into runner 
>> >> >> itself. I understand the cross language feature wants to be runner 
>> >> >> independent, but does it make sense to at least provide the option to 
>> >> >> allow runner to use the expansion service as a library and make it 
>> >> >> transparent to the portable pipeline?
>> >> >
>> >> >
>> >> > Beam composite transforms are expanded before defining the portable job 
>> >> > definition (and before submitting the jobs to the runner). So naturally 
>> >> > this is something that has to be done in the Beam side. As an added 
>> >> > benefit, as you identified, this allows us to keep this logic runner 
>> >> > independent.
>> >> > I think there were discussions regarding automatically starting up a 
>> >> > local expansion service if one is not specified. Will this address your 
>> >> > concerns ?
>> >>
>> >> Just to add to this, If you have a pipeline A -> B -> C, the expansion
>> >> of B often needs to be evaluated before C can be applied (e.g. we're
>> >> planning on exposing the SQL transforms cross language, and many
>> >> cross-language IOs can query and supply their own schemas for
>> >> downstream type checking), so one cannot construct the "whole"
>> >> pipeline, pass it to the runner, and let the runner do the expansion.

Reply via email to