Hi,

Regarding the original proposal: I don’t think spawning another process inside 
the JarHandler.runJar() is the way to go here. Looking at the bigger picture, 
the proposal would get us to roughly this situation:

1. Spawn Kubernetes containers (JobManager and TaskManagers)
2. User does a REST call to JobManager.runJar() to submit the user job
3. JobManager.runJar() opens a port that waits for job submission
4. JobMananger.runJar() invokes UserProgram.main()
5. UserProgram.main() launches a process (BeamJobService) that opens a port to 
wait for a Python process to connect to it
6. UserProgram.main() launches another process (the Python code, or any 
language, really) that connects to BeamJobService to submit the Pipeline
7. BeamJobService receives the Pipeline and talks to the port open on 
JobManager (via REST service, maybe) to submit the Job
8. Job is executed
9. Where is UserProgram.main() at this point?

I think that even running UserProgram.main() in the JobManager is already too 
much. A JobManager should accept JobGraphs (or something) and execute them, 
nothing more. Running UserProgram.main() makes some things complicated or 
weird. For example, what happens when that UserProgram.main() creates a 
RemoteEnvironment and uses that? What happens when the user code calls 
execute() multiple times.

I think a good solution for the motivating use case is to

a) run BeamJobService as a separate service that talks to a running JobManager 
via REST for submitting jobs that it receives

b) Spawning a JobManager inside the BeamJobService, i.e. the BeamJobService is 
like the entry point in a per-job Kubernetes model. Something that the new 
Executor work ([1], [2]) will enable.

Any thoughts? I’m happy to jump on a call about this because these things are 
very tricky to figure out and I might be wrong.

Best,
Aljoscha

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
[2] 
https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?ts=5d88e631

> On 6. Aug 2019, at 09:51, Till Rohrmann <trohrm...@apache.org> wrote:
> 
> I think there was the idea to make the JobGraph a "public"/stable interface
> other projects can rely on at some point. If I remember correctly, then we
> wanted to define a proto buf definition for the JobGraph so that clients
> written in a different language can submit JobGraphs and we could extend
> the data structure. As far as I know, this effort hasn't been started yet
> and is still in the backlog (I think there doesn't exist a JIRA issue yet).
> 
> The problem came up when discussing additions to the JobGraph because they
> need to be backwards compatible otherwise newer version of Flink would not
> be able to recover jobs. I think so far Flink provides backwards
> compatibility between different versions of the JobGraph. However, this is
> not officially guaranteed.
> 
> Cheers,
> Till
> 
> On Tue, Aug 6, 2019 at 3:56 AM Zili Chen <wander4...@gmail.com> wrote:
> 
>> It sounds like a request to change the interface Program into
>> 
>> public interface Program {
>>  JobGraph getJobGraph(String... args);
>> }
>> 
>> Also, given that JobGraph is said as internal interface or
>> cannot be relied on, we might introduce and use a
>> representation that allows for cross version compatibility.
>> 
>> 
>> Thomas Weise <t...@apache.org> 于2019年8月6日周二 上午12:11写道:
>> 
>>> If the goal is to keep job creation and job submission separate and we
>>> agree that there should be more flexibility for the job construction,
>> then
>>> JobGraph and friends should be stable API that the user can depend on. If
>>> that's the case, the path Chesnay pointed to may become viable.
>>> 
>>> There was discussion in the past that JobGraph cannot be relied on WRT
>>> backward compatibility and I would expect that at some point we want to
>>> move to a representation that allows for cross version compatibility.
>> Beam
>>> is an example how this could be accomplished (with its pipeline proto).
>>> 
>>> So if the Beam job server was able to produce the JobGraph, is there
>>> agreement that we should provide a mechanism that allows the program
>> entry
>>> point to return the JobGraph directly (without using the
>>> ExecutionEnvironment to build it)?
>>> 
>>> 
>>> On Mon, Aug 5, 2019 at 2:10 AM Zili Chen <wander4...@gmail.com> wrote:
>>> 
>>>> Hi Thomas,
>>>> 
>>>> If REST handler calls main(), the behavior inside main() is
>>>> unpredictable.
>>>> 
>>>> Now the jar run handler extract the job graph and submit
>>>> it with the job id configured in REST request. If REST
>>>> handler calls main() we can hardly even know how much
>>>> jobs are executed.
>>>> 
>>>> A new environment, as you said,
>>>> ExtractJobGraphAndSubmitToDispatcherEnvironment can be
>>>> added to satisfy your requirement. However, it is a bit
>>>> out of Flink scope. It might be better to write your own
>>>> REST handler.
>>>> 
>>>> WebMonitorExtension is for extending REST handlers but
>>>> it seems also unable to customize...
>>>> 
>>>> Best,
>>>> tison.
>>>> 
>>>> 
>>>> Thomas Weise <t...@apache.org> 于2019年8月3日周六 上午4:09写道:
>>>> 
>>>>> Thanks for looking into this.
>>>>> 
>>>>> I see the "Jar run handler" as function that takes few parameters and
>>>>> returns a job ID. I think it would be nice if the handler doesn't
>> hard
>>>> code
>>>>> the function. Perhaps this could be accomplished by pushing the code
>>> into
>>>>> something like "ExtractJobGraphAndSubmitToDispatcherEnvironment" that
>>> the
>>>>> main method could also bypass if it has an alternative way to provide
>>> the
>>>>> jobId via a context variable?
>>>>> 
>>>>> Zili: I looked at the client API proposal and left a few comments. I
>>>> think
>>>>> it is important to improve programmatic job submission. But it also
>>> seems
>>>>> orthogonal to how the jar run handler operates (i.e. these issues
>> could
>>>> be
>>>>> addressed independently).
>>>>> 
>>>>> Chesnay: You are right that the Beam job sever could be hacked to
>>> extract
>>>>> job graph and other ingredients. This isn't desirable though because
>>>> these
>>>>> Flink internals should not be exposed downstream. But even if we went
>>>> down
>>>>> that route we would still need a way to let the jar run handler know
>> to
>>>>> just return the ID of an already submitted job vs. trying to submit
>> one
>>>>> from OptimizerPlanEnvironment.
>>>>> 
>>>>> The intended sequence would be:
>>>>> 
>>>>> REST client provides a launcher jar
>>>>> REST client "runs jar"
>>>>> REST handler calls main()
>>>>> main launches Beam job server, runs Beam pipeline construction code
>>>> against
>>>>> that job server
>>>>> job server uses RemoteEnvironment to submit real job
>>>>> main "returns job id"
>>>>> REST handler returns job id
>>>>> 
>>>>> Thomas
>>>>> 
>>>>> 
>>>>> On Wed, Jul 31, 2019 at 4:33 AM Zili Chen <wander4...@gmail.com>
>>> wrote:
>>>>> 
>>>>>> By the way, currently Dispatcher implements RestfulGateway
>>>>>> and delegate resource request to ResourceManager. If we can,
>>>>>> semantically, let WebMonitor implement RestfulGateway,
>>>>>> and delegate job request to Dispatcher, resource request to
>>>>>> ResourceManager, it seems reasonable that when WebMonitor
>>>>>> receives a JarRun request, it spawns a process and run
>>>>>> the main method of the main class of that jar.
>>>>>> 
>>>>>> Best,
>>>>>> tison.
>>>>>> 
>>>>>> 
>>>>>> Zili Chen <wander4...@gmail.com> 于2019年7月31日周三 下午7:10写道:
>>>>>> 
>>>>>>> I don't think the `Program` interface could solve the problem.
>>>>>>> 
>>>>>>> The launcher launches the job server which creates the job graph,
>>>>>>> submits it and keeps monitoring. Even if user program implement
>>>>>>> `Program` Flink still extracts the JobGraph from `getPlan` and
>>>>>>> submits it, instead of really execute codes in main method of
>>>>>>> user program, so that the launcher is not started.
>>>>>>> 
>>>>>>> @Thomas,
>>>>>>> 
>>>>>>> Here is an ongoing discussion on client refactoring[1] as Till
>>>>>>> mentioned. However, I'm afraid that with current jar run semantic,
>>>>>>> i.e., extract the job graph and submit it to the Dispatcher, it
>>> cannot
>>>>>>> fits your requirement. The problem is that REST API directly
>>>>>>> communicates with Dispatcher and thus it's strange to tell the
>>>>>>> Dispatcher "just run a program in a process".
>>>>>>> 
>>>>>>> As you mentioned in the document, with CLI in session mode the
>>>>>>> whole program would be executed sequentially. I'll appreciate it
>>>>>>> if you can participant the thread on client refactor[1]. In the
>>>>>>> design document[2], we propose to provide rich interfaces for
>>>>>>> downstream projects integration. You can customize your CLI for
>>>>>>> executing your program arbitrarily. Any requirement or advise
>>>>>>> would be help.
>>>>>>> 
>>>>>>> Best,
>>>>>>> tison.
>>>>>>> 
>>>>>>> [1]
>>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>>>>>>> [2]
>>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://docs.google.com/document/d/1UWJE7eYWiMuZewBKS0YmdVO2LUTqXPd6-pbOCof9ddY/edit
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Till Rohrmann <trohrm...@apache.org> 于2019年7月31日周三 下午4:50写道:
>>>>>>> 
>>>>>>>> Are you looking for something similar to the `Program` interface?
>>>> This
>>>>>>>> interface, even though it is a bit outdated and might get removed
>>> in
>>>>> the
>>>>>>>> future, offers a `getPlan` method which is called in order to
>>>> generate
>>>>>>>> the
>>>>>>>> `JobGraph`. In the client refactoring discussion thread it is
>>>> currently
>>>>>>>> being discussed what to do with this interface.
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>> 
>>>>>>>> On Wed, Jul 31, 2019 at 10:41 AM Chesnay Schepler <
>>>> ches...@apache.org>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> Couldn't the beam job server use the same work-around we're
>> using
>>>> in
>>>>>>>> the
>>>>>>>>> JarRunHandler to get access to the JobGraph?
>>>>>>>>> 
>>>>>>>>> On 26/07/2019 17:38, Thomas Weise wrote:
>>>>>>>>>> Hi Till,
>>>>>>>>>> 
>>>>>>>>>> Thanks for taking a look!
>>>>>>>>>> 
>>>>>>>>>> The Beam job server does not currently have the ability to
>> just
>>>>>>>> output
>>>>>>>>> the
>>>>>>>>>> job graph (and related artifacts) that could then be used
>> with
>>>> the
>>>>>>>>>> JobSubmitHandler. It is itself using
>>> StreamExecutionEnvironment,
>>>>>>>> which in
>>>>>>>>>> turn will lead to a REST API submission.
>>>>>>>>>> 
>>>>>>>>>> Here I'm looking at what happens before the Beam job server
>>> gets
>>>>>>>>> involved:
>>>>>>>>>> the interaction of the k8s operator with the Flink
>> deployment.
>>>> The
>>>>>>>> jar
>>>>>>>>> run
>>>>>>>>>> endpoint (ignoring the current handler implementation) is
>>> generic
>>>>> and
>>>>>>>>>> pretty much exactly matches what we would need for a uniform
>>>> entry
>>>>>>>> point.
>>>>>>>>>> It's just that in the Beam case the jar file would itself be
>> a
>>>>>>>> "launcher"
>>>>>>>>>> that doesn't provide the job graph itself, but the
>> dependencies
>>>> and
>>>>>>>>>> mechanism to invoke the actual client.
>>>>>>>>>> 
>>>>>>>>>> I could accomplish what I'm looking for by creating a
>> separate
>>>> REST
>>>>>>>>>> endpoint that looks almost the same. But I would prefer to
>>> reuse
>>>>> the
>>>>>>>>> Flink
>>>>>>>>>> REST API interaction that is already implemented for the
>> Flink
>>>> Java
>>>>>>>> jobs
>>>>>>>>> to
>>>>>>>>>> reduce the complexity of the deployment.
>>>>>>>>>> 
>>>>>>>>>> Thomas
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Fri, Jul 26, 2019 at 2:29 AM Till Rohrmann <
>>>>> trohrm...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>>> Hi Thomas,
>>>>>>>>>>> 
>>>>>>>>>>> quick question: Why do you wanna use the JarRunHandler? If
>>>> another
>>>>>>>>> process
>>>>>>>>>>> is building the JobGraph, then one could use the
>>>> JobSubmitHandler
>>>>>>>> which
>>>>>>>>>>> expects a JobGraph and then starts executing it.
>>>>>>>>>>> 
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Till
>>>>>>>>>>> 
>>>>>>>>>>> On Thu, Jul 25, 2019 at 7:45 PM Thomas Weise <
>> t...@apache.org>
>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>>> Hi,
>>>>>>>>>>>> 
>>>>>>>>>>>> While considering different options to launch Beam jobs
>>> through
>>>>> the
>>>>>>>>> Flink
>>>>>>>>>>>> REST API, I noticed that the implementation of
>> JarRunHandler
>>>>> places
>>>>>>>>>>> quite a
>>>>>>>>>>>> few restrictions on how the entry point shall construct a
>>> Flink
>>>>>>>> job, by
>>>>>>>>>>>> extracting and manipulating the job graph.
>>>>>>>>>>>> 
>>>>>>>>>>>> That's normally not a problem for Flink Java programs, but
>> in
>>>> the
>>>>>>>>>>> scenario
>>>>>>>>>>>> I'm looking at, the job graph would be constructed by a
>>>> different
>>>>>>>>> process
>>>>>>>>>>>> and isn't available to the REST handler. Instead, I would
>>> like
>>>> to
>>>>>>>> be
>>>>>>>>> able
>>>>>>>>>>>> to just respond with the job ID of the already launched
>> job.
>>>>>>>>>>>> 
>>>>>>>>>>>> For context, please see:
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://docs.google.com/document/d/1z3LNrRtr8kkiFHonZ5JJM_L4NWNBBNcqRc_yAf6G0VI/edit#heading=h.fh2f571kms4d
>>>>>>>>>>>> The current JarRunHandler code is here:
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>> 
>>>> 
>>> 
>> https://github.com/apache/flink/blob/f3c5dd960ff81a022ece2391ed3aee86080a352a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java#L82
>>>>>>>>>>>> It would be nice if there was an option to delegate the
>>>>>>>> responsibility
>>>>>>>>>>> for
>>>>>>>>>>>> job submission to the user code / entry point. That would
>> be
>>>>>>>> useful for
>>>>>>>>>>>> Beam and other frameworks built on top of Flink that
>>>> dynamically
>>>>>>>>> create a
>>>>>>>>>>>> job graph from a different representation.
>>>>>>>>>>>> 
>>>>>>>>>>>> Possible ways to get there:
>>>>>>>>>>>> 
>>>>>>>>>>>> * an interface that the main class can be implement end
>> when
>>>>>>>> present,
>>>>>>>>> the
>>>>>>>>>>>> jar run handler calls instead of main.
>>>>>>>>>>>> 
>>>>>>>>>>>> * an annotated method
>>>>>>>>>>>> 
>>>>>>>>>>>> Either way query parameters like savepoint path and
>>> parallelism
>>>>>>>> would
>>>>>>>>> be
>>>>>>>>>>>> forwarded to the user code and the result would be the ID
>> of
>>>> the
>>>>>>>>> launched
>>>>>>>>>>>> job.
>>>>>>>>>>>> 
>>>>>>>>>>>> Thougths?
>>>>>>>>>>>> 
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Thomas
>>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Reply via email to