Follow-up for users looking to run portable pipelines on Flink:

After prototyping the generate-jar-file approach for internal deployment
and some related discussion, the conclusion was that it is too limiting.
The sticky point is that the jar file would need to be generated at
container build time. That does not allow us to execute any logic in the
Python driver program that depends on the deploy environment, such as
retrieval of environment variables for configuration/credentials, setting a
submission timestamp for stream positioning etc.

What worked well was that no job server was required to submit the Flink
job and the jar file could be used with the existing Flink tooling; there
was no need to change the FlinkK8sOperator
<https://github.com/lyft/flinkk8soperator> at all.

I then looked for a way to eliminate the build time translation and execute
the Python driver program when the job is submitted, but still as a Flink
entry point w/o extra job server deployment and client side dependencies.
How can that work?

https://issues.apache.org/jira/browse/BEAM-8471

The main point was that there should be no requirement to install things on
the client. FlinkK8sOperator is talking to the Flink REST API, w/o Python
or Java. The Python dependencies need to be present on the Flink job
manager host at the time the job is started through the REST API. That was
something we had already solved for our container image build, and from
conversation with few other folks this was their preferred container build
approach also.

In the future we may seek the ability to separate Flink and SDK/application
bits into different images. For the SDK worker, this is intended via the
external environment and sidecar container. For the client driver program,
a similar approach could be implemented. Through an "external client
environment", instead of a local process execution.

The new Flink runner can be used as entry point for the REST API, the Flink
CLI or standalone, especially for Flink centric automation. Of course
portable pipelines can also be directly submitted through the SDK language
client, via job server or other tooling, like the Python Flink client that
Robert contributed recently.

Thanks,
Thomas


On Thu, Aug 22, 2019 at 12:58 PM Kyle Weaver <[email protected]> wrote:

> Following up on discussion in this morning's OSS runners meeting, I have
> uploaded a draft PR for the full implementation (job creation + execution):
> https://github.com/apache/beam/pull/9408
>
> Kyle Weaver | Software Engineer | github.com/ibzib | [email protected]
>
>
> On Tue, Aug 20, 2019 at 1:24 PM Robert Bradshaw <[email protected]>
> wrote:
>
>> The point of expansion services is to run at pipeline construction
>> time so that the caller can build on top of the outputs. E.g. we're
>> hoping to expose Beam's SQL transforms to other languages via an
>> expansion service and *not* duplicate the logic of parsing the SQL
>> statements to determine the type(s) of the outputs. Even for simpler
>> IOs, we would like to take advantage of schema information (e.g.
>> looked up at construction time) to produce results and validate (or
>> even inform) subsequent construction.
>>
>> I think we're also making a mistake in talking about "the" expansion
>> service here, as if there was only one well defined service that all
>> pipenes used. If we go the route of deferring some expansion to the
>> runner, we need a way of naming expansion services. It seems like this
>> proposal is simply isomorphic to defining new primitive transforms
>> which some (all?) runners are just expected to understand.
>>
>> On Tue, Aug 20, 2019 at 10:11 AM Thomas Weise <[email protected]> wrote:
>> >
>> >
>> >
>> > On Tue, Aug 20, 2019 at 8:56 AM Lukasz Cwik <[email protected]> wrote:
>> >>
>> >>
>> >>
>> >> On Mon, Aug 19, 2019 at 5:52 PM Ahmet Altay <[email protected]> wrote:
>> >>>
>> >>>
>> >>>
>> >>> On Sun, Aug 18, 2019 at 12:34 PM Thomas Weise <[email protected]> wrote:
>> >>>>
>> >>>> There is a PR open for this:
>> https://github.com/apache/beam/pull/9331
>> >>>>
>> >>>> (it wasn't tagged with the JIRA and therefore not linked)
>> >>>>
>> >>>> I think it is worthwhile to explore how we could further detangle
>> the client side Python and Java dependencies.
>> >>>>
>> >>>> The expansion service is one more dependency to consider in a build
>> environment. Is it really necessary to expand external transforms prior to
>> submission to the job service?
>> >>>
>> >>>
>> >>> +1, this will make it easier to use external transforms from the
>> already familiar client environments.
>> >>>
>> >>
>> >>
>> >> The intent is to make it so that you CAN (not MUST) run an expansion
>> service separate from a Runner. Creating a single endpoint that hosts both
>> the Job and Expansion service is something that gRPC does very easily since
>> you can host multiple service definitions on a single port.
>> >
>> >
>> > Yes, that's fine. The point here is when the expansion occurs. I
>> believe the runner can also invoke the expansion service, thereby
>> eliminating the expansion service interaction from the client side.
>> >
>> >
>> >>
>> >>
>> >>>>
>> >>>>
>> >>>> Can we come up with a partially constructed proto that can be
>> produced by just running the Python entry point? Note this would also
>> require pushing the pipeline options parsing into the job service.
>> >>>
>> >>>
>> >>> Why would this require pushing the pipeline options parsing to the
>> job service. Assuming that python will have enough idea about the external
>> transform what options it will need. The necessary bit could be converted
>> to arguments and be part of that partially constructed proto.
>> >>>
>> >>>>
>> >>>>
>> >>>> On Sun, Aug 18, 2019 at 12:01 PM enrico canzonieri <
>> [email protected]> wrote:
>> >>>>>
>> >>>>> I found the tracking ticket at BEAM-7966
>> >>>>>
>> >>>>> On Sun, Aug 18, 2019 at 11:59 AM enrico canzonieri <
>> [email protected]> wrote:
>> >>>>>>
>> >>>>>> Is this alternative still being considered? Creating a portable
>> jar sounds like a good solution to re-use the existing runner specific
>> deployment mechanism (e.g. Flink k8s operator) and in general simplify the
>> deployment story.
>> >>>>>>
>> >>>>>> On Fri, Aug 9, 2019 at 12:46 AM Robert Bradshaw <
>> [email protected]> wrote:
>> >>>>>>>
>> >>>>>>> The expansion service is a separate service. (The flink jar
>> happens to
>> >>>>>>> bring both up.) However, there is negotiation to receive/validate
>> the
>> >>>>>>> pipeline options.
>> >>>>>>>
>> >>>>>>> On Fri, Aug 9, 2019 at 1:54 AM Thomas Weise <[email protected]>
>> wrote:
>> >>>>>>> >
>> >>>>>>> > We would also need to consider cross-language pipelines that
>> (currently) assume the interaction with an expansion service at
>> construction time.
>> >>>>>>> >
>> >>>>>>> > On Thu, Aug 8, 2019, 4:38 PM Kyle Weaver <[email protected]>
>> wrote:
>> >>>>>>> >>
>> >>>>>>> >> > It might also be useful to have the option to just output
>> the proto and artifacts, as alternative to the jar file.
>> >>>>>>> >>
>> >>>>>>> >> Sure, that wouldn't be too big a change if we were to decide
>> to go the SDK route.
>> >>>>>>> >>
>> >>>>>>> >> > For the Flink entry point we would need to allow for the job
>> server to be used as a library.
>> >>>>>>> >>
>> >>>>>>> >> We don't need the whole job server, we only need to add a main
>> method to FlinkPipelineRunner [1] as the entry point, which would basically
>> just do the setup described in the doc then call FlinkPipelineRunner::run.
>> >>>>>>> >>
>> >>>>>>> >> [1]
>> https://github.com/apache/beam/blob/master/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java#L53
>> >>>>>>> >>
>> >>>>>>> >> Kyle Weaver | Software Engineer | github.com/ibzib |
>> [email protected]
>> >>>>>>> >>
>> >>>>>>> >>
>> >>>>>>> >> On Thu, Aug 8, 2019 at 4:21 PM Thomas Weise <[email protected]>
>> wrote:
>> >>>>>>> >>>
>> >>>>>>> >>> Hi Kyle,
>> >>>>>>> >>>
>> >>>>>>> >>> It might also be useful to have the option to just output the
>> proto and artifacts, as alternative to the jar file.
>> >>>>>>> >>>
>> >>>>>>> >>> For the Flink entry point we would need to allow for the job
>> server to be used as a library. It would probably not be too hard to have
>> the Flink job constructed via the context execution environment, which
>> would require no changes on the Flink side.
>> >>>>>>> >>>
>> >>>>>>> >>> Thanks,
>> >>>>>>> >>> Thomas
>> >>>>>>> >>>
>> >>>>>>> >>>
>> >>>>>>> >>> On Thu, Aug 8, 2019 at 9:52 AM Kyle Weaver <
>> [email protected]> wrote:
>> >>>>>>> >>>>
>> >>>>>>> >>>> Re Javaless/serverless solution:
>> >>>>>>> >>>> I take it this would probably mean that we would construct
>> the jar directly from the SDK. There are advantages to this: full
>> separation of Python and Java environments, no need for a job server, and
>> likely a simpler implementation, since we'd no longer have to work within
>> the constraints of the existing job server infrastructure. The only
>> downside I can think of is the additional cost of implementing/maintaining
>> jar creation code in each SDK, but that cost may be acceptable if it's
>> simple enough.
>> >>>>>>> >>>>
>> >>>>>>> >>>> Kyle Weaver | Software Engineer | github.com/ibzib |
>> [email protected]
>> >>>>>>> >>>>
>> >>>>>>> >>>>
>> >>>>>>> >>>> On Thu, Aug 8, 2019 at 9:31 AM Thomas Weise <[email protected]>
>> wrote:
>> >>>>>>> >>>>>
>> >>>>>>> >>>>>
>> >>>>>>> >>>>>
>> >>>>>>> >>>>> On Thu, Aug 8, 2019 at 8:29 AM Robert Bradshaw <
>> [email protected]> wrote:
>> >>>>>>> >>>>>>
>> >>>>>>> >>>>>> > Before assembling the jar, the job server runs to create
>> the ingredients. That requires the (matching) Java environment on the
>> Python developers machine.
>> >>>>>>> >>>>>>
>> >>>>>>> >>>>>> We can run the job server and have it create the jar (and
>> if we keep
>> >>>>>>> >>>>>> the job server running we can use it to interact with the
>> running
>> >>>>>>> >>>>>> job). However, if the jar layout is simple enough, there's
>> no need to
>> >>>>>>> >>>>>> even build it from Java.
>> >>>>>>> >>>>>>
>> >>>>>>> >>>>>> Taken to the extreme, this is a one-shot, jar-based
>> JobService API. We
>> >>>>>>> >>>>>> choose a standard layout of where to put the pipeline
>> description and
>> >>>>>>> >>>>>> artifacts, and can "augment" an existing jar (that has a
>> >>>>>>> >>>>>> runner-specific main class whose entry point knows how to
>> read this
>> >>>>>>> >>>>>> data to kick off a pipeline as if it were a users driver
>> code) into
>> >>>>>>> >>>>>> one that has a portable pipeline packaged into it for
>> submission to a
>> >>>>>>> >>>>>> cluster.
>> >>>>>>> >>>>>
>> >>>>>>> >>>>>
>> >>>>>>> >>>>> It would be nice if the Python developer doesn't have to
>> run anything Java at all.
>> >>>>>>> >>>>>
>> >>>>>>> >>>>> As we just discussed offline, this could be accomplished
>> by  including the proto that is produced by the SDK into the pre-existing
>> jar.
>> >>>>>>> >>>>>
>> >>>>>>> >>>>> And if the jar has an entry point that creates the Flink
>> job in the prescribed manner [1], it can be directly submitted to the Flink
>> REST API. That would allow for Java free client.
>> >>>>>>> >>>>>
>> >>>>>>> >>>>> [1]
>> https://lists.apache.org/thread.html/6db869c53816f4e2917949a7c6992c2b90856d7d639d7f2e1cd13768@%3Cdev.flink.apache.org%3E
>> >>>>>>> >>>>>
>>
>

Reply via email to