Hi Zili,

I think we are more or less on the same page with most of the stuff
you mentioned.

The only slight difference, at least in my opinion, is that I do not
see the Executors as being a "Client".
As you mentioned, I can see having the following:

1). ClusterClientFactory: responsible for deploy session cluster and
retrieve session cluster client.
2). ClusterClient: interact with session cluster, responsible for
query cluster level status, submit Flink job and retrieve Flink job
client.
3) JobClient: interact with Flink job, responsible for query job level
status and perform job level operation such as trigger savepoint.

Now the Executor simply uses a client, e.g. a ClusterClient, to submit
the job (JobGraph) that it will create from the user program.
In that sense, the Executor is one level of abstraction above the
clients, as it adds more functionality and it uses the one offered by
the client.

For the two questions you mentioned:

I). It seems we cannot have a ClusterClient of JobCluster. Is it
expected(due to the cluster bound to the job)?

 I think that this is expected for the reason that you also mention.
In this case, given that the lifecycle of the cluster
and that of the job are identical, the JobClient is essentially a
ClusterClient. You cannot submit jobs, but you can take
a savepoint or cancel the job that is currently being executed. In the
case of cancelling, I believe that it should also
kill the cluster.

II). It seems we treat session cluster quite different from job
cluster, but cluster client can submit a job, which overlaps a bit
with Executor.

It is true that we treat them differently, for the reason that in the
per-job mode, we have a "single-purpose" cluster and when its job is
done, it has no reason to keep occupying resources. In my opinion, the
"cluster client" or "job client" (semantically in per-job mode the are
the same) should not be able to submit new jobs in this scenario.

I agree that the PerJobExecutor requires a bit more discussion and I
will keep on updating the FLIP and discussing on this thread as more
details become clearer for this case.

Thanks for your thoughts on the topic and keep them coming ;)

Cheers,
Kostas

On Fri, Sep 27, 2019 at 10:28 AM Zili Chen <wander4...@gmail.com> wrote:
>
> Thanks for your reply Kostas.
>
> As mentioned in FLIP-74 thread[1] there are two questions on Executor design
>
> (1) Where Executor is in a multi-layered clients view.
> (2) A bit more details about PerJobExecutor implementation.
>
> For (1) Where Executor is in a multi-layered clients view,
>
> As described in the multi-layered client thread[2], in our current codebase, 
> with JobClient
> introduced in FLIP-74, clients can be layered as
>
> 1) ClusterDescriptor: interact with external resource manager, responsible 
> for deploy Flink
> application cluster and retrieve Flink application cluster client.
> 2) ClusterClient: interact with Flink application cluster, responsible for 
> query cluster level
> status, submit Flink job and retrieve Flink job client.
> 3) JobClient: interact with Flink job, responsible for query job level status 
> and perform job
> level operation such as trigger savepoint.
>
> However, the singularity is JobCluster, which couple a bit cluster deployment 
> and job
> submission. From my perspective with FLIP-73 and Kostas's thoughts in FLIP-74 
> thread,
> we form a multi-layered client as below
>
> 1) Executor: responsible for job submission, whether the corresponding 
> cluster is
> SessionCluster or JobCluster doesn't matter. Executor always returns 
> JobClient.
> 2). ClusterClientFactory: responsible for deploy session cluster and retrieve 
> session cluster
> client.
> 3). ClusterClient: interact with session cluster, responsible for query 
> cluster level
> status, submit Flink job and retrieve Flink job client.
> 4) JobClient: interact with Flink job, responsible for query job level status 
> and perform job
> level operation such as trigger savepoint.
>
> I am not sure if the structure above is the same as that in your mind. If so, 
> there are two questions
>
> I). It seems we cannot have a ClusterClient of JobCluster. Is it expected(due 
> to the cluster bound to the job)?
> II). It seems we treat session cluster quite different from job cluster, but 
> cluster client can submit a job, which
> overlaps a bit with Executor.
>
> For (2) A bit more details about PerJobExecutor implementation,
>
> From the content of FLIP-73 it doesn't describe how PerJobExecutor would be 
> although it is spoken a bit in
> the design document[3]. In FLIP-74 thread I forward previous insights in our 
> community which towards two
> attributes of JobCluster
>
> I). Running Flink job by invoke user main method and execute throughout, 
> instead of create JobGraph from main-class.
> II). Run the client inside the cluster.
>
> Does PerJobExecutor fit this requirement? Anyway, it would be helpful we 
> describe the abstraction of Executor
> in the FLIP, at least the different between PerJobExecutor and 
> SessionExecutor is essential.
>
> Best,
> tison.
>
> [1] 
> https://lists.apache.org/x/thread.html/b2e22a45aeb94a8d06b50c4de078f7b23d9ff08b8226918a1a903768@%3Cdev.flink.apache.org%3E
> [2] 
> https://lists.apache.org/x/thread.html/240582148eda905a772d59b2424cb38fa16ab993647824d178cacb02@%3Cdev.flink.apache.org%3E
> [3] 
> https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?ts=5d8cbe34#heading=h.qq4wc2suukg
>
>
> Kostas Kloudas <kklou...@gmail.com> 于2019年9月25日周三 下午8:27写道:
>>
>> Hi,
>>
>> @Aljoscha, I believe that it is better to be done like this so that we
>> do not step on each-other's feet. If the executor already "knew" about
>> the JobClient, then we should also know about how we expect the
>> JobExecutionResult is retrieved (which is part of FLIP-74). I think it
>> is nice to have each discussion self-contained.
>>
>> Cheers,
>> Kostas
>>
>> On Wed, Sep 25, 2019 at 2:13 PM Aljoscha Krettek <aljos...@apache.org> wrote:
>> >
>> > Hi,
>> >
>> > I’m fine with either signature for the new execute() method but I think we 
>> > should focus on the executor discovery and executor configuration part in 
>> > this FLIP while FLIP-74 is about the evolution of the method signature to 
>> > return a future.
>> >
>> > I understand that it’s a bit weird, that this FLIP introduces a new 
>> > interface only to be changed within the same Flink release in a follow-up 
>> > FLIP. But I think we can still do it. What do you think?
>> >
>> > Best,
>> > Aljoscha
>> >
>> > > On 25. Sep 2019, at 10:11, Kostas Kloudas <kklou...@gmail.com> wrote:
>> > >
>> > > Hi Thomas and Zili,
>> > >
>> > > As you both said the Executor is a new addition so there are no
>> > > compatibility concerns.
>> > > Backwards compatibility comes into play on the
>> > > (Stream)ExecutionEnvironment#execute().
>> > >
>> > > This method has to stay and keep having the same (blocking) semantics,
>> > > but we can
>> > > add a new one, sth along the lines of executeAsync() that will return
>> > > the JobClient and
>> > > will allow the caller to interact with the job.
>> > >
>> > > Cheers,
>> > > Kostas
>> > >
>> > > On Wed, Sep 25, 2019 at 2:44 AM Zili Chen <wander4...@gmail.com> wrote:
>> > >>
>> > >>> Since Exceutor is a new interface, why is backward compatibility a 
>> > >>> concern?
>> > >>
>> > >> For backward compatibility, it is on 
>> > >> (Stream)ExecutionEnvironment#execute.
>> > >> You're right that we don't stick to blocking to return a 
>> > >> JobExecutionResult in
>> > >> Executor aspect but implementing env.execute with a unique
>> > >>
>> > >> Executor#execute(or with suffix Async): CompletableFuture<JobClient>
>> > >>
>> > >> what do you think @Kostas Kloudas?
>> > >>
>> > >>> I could see that become an issue later when replacing Executor execute 
>> > >>> with
>> > >>> executeAsync. Or are both targeted for 1.10?
>> > >>
>> > >> IIUC both Executors and JobClient are targeted for 1.10.
>> > >>
>> > >>
>> > >> Thomas Weise <t...@apache.org> 于2019年9月25日周三 上午2:39写道:
>> > >>>
>> > >>> Since Exceutor is a new interface, why is backward compatibility a 
>> > >>> concern?
>> > >>>
>> > >>> I could see that become an issue later when replacing Executor execute 
>> > >>> with
>> > >>> executeAsync. Or are both targeted for 1.10?
>> > >>>
>> > >>>
>> > >>> On Tue, Sep 24, 2019 at 10:24 AM Zili Chen <wander4...@gmail.com> 
>> > >>> wrote:
>> > >>>
>> > >>>> Hi Thomas,
>> > >>>>
>> > >>>>> Should the new Executor execute method be defined as asynchronous? It
>> > >>>> could
>> > >>>>> return a job handle to interact with the job and the legacy 
>> > >>>>> environments
>> > >>>>> can still block to retain their semantics.
>> > >>>>
>> > >>>> During our discussion there will be a method
>> > >>>>
>> > >>>> executeAsync(...): CompletableFuture<JobClient>
>> > >>>>
>> > >>>> where JobClient can be regarded as job handle in your context.
>> > >>>>
>> > >>>> I think we remain
>> > >>>>
>> > >>>> execute(...): JobExecutionResult
>> > >>>>
>> > >>>> just for backward compatibility because this effort towards 1.10 
>> > >>>> which is
>> > >>>> not a
>> > >>>> major version bump.
>> > >>>>
>> > >>>> BTW, I am drafting details of JobClient(as FLIP-74). Will start a 
>> > >>>> separated
>> > >>>> discussion
>> > >>>> thread on that interface as soon as I finish an early version.
>> > >>>>
>> > >>>> Best,
>> > >>>> tison.
>> > >>>>
>> > >>>>
>> > >>>> Thomas Weise <t...@apache.org> 于2019年9月25日周三 上午1:17写道:
>> > >>>>
>> > >>>>> Thanks for the proposal. These changes will make it significantly 
>> > >>>>> easier
>> > >>>> to
>> > >>>>> programmatically use Flink in downstream frameworks.
>> > >>>>>
>> > >>>>> Should the new Executor execute method be defined as asynchronous? It
>> > >>>> could
>> > >>>>> return a job handle to interact with the job and the legacy 
>> > >>>>> environments
>> > >>>>> can still block to retain their semantics.
>> > >>>>>
>> > >>>>> (The blocking execution has also made things more difficult in Beam, 
>> > >>>>> we
>> > >>>>> could simply switch to use Executor directly.)
>> > >>>>>
>> > >>>>> Thomas
>> > >>>>>
>> > >>>>>
>> > >>>>> On Tue, Sep 24, 2019 at 6:48 AM Kostas Kloudas <kklou...@apache.org>
>> > >>>>> wrote:
>> > >>>>>
>> > >>>>>> Hi all,
>> > >>>>>>
>> > >>>>>> In the context of the discussion about introducing the Job Client 
>> > >>>>>> API
>> > >>>>> [1],
>> > >>>>>> there was a side-discussion about refactoring the way users submit 
>> > >>>>>> jobs
>> > >>>>> in
>> > >>>>>> Flink. There were many different interesting ideas on the topic and 
>> > >>>>>> 3
>> > >>>>>> design documents that were trying to tackle both the issue about 
>> > >>>>>> code
>> > >>>>>> submission and the Job Client API.
>> > >>>>>>
>> > >>>>>> This discussion thread aims at the job submission part and proposes 
>> > >>>>>> the
>> > >>>>>> approach of introducing the Executor abstraction which will abstract
>> > >>>> the
>> > >>>>>> job submission logic from the Environments and will make it API
>> > >>>> agnostic.
>> > >>>>>>
>> > >>>>>> The FLIP can be found at [2].
>> > >>>>>>
>> > >>>>>> Please keep the discussion here, in the mailing list.
>> > >>>>>>
>> > >>>>>> Looking forward to your opinions,
>> > >>>>>> Kostas
>> > >>>>>>
>> > >>>>>> [1]
>> > >>>>>>
>> > >>>>>>
>> > >>>>>
>> > >>>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>> > >>>>>> [2]
>> > >>>>>>
>> > >>>>>>
>> > >>>>>
>> > >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
>> > >>>>>>
>> > >>>>>
>> > >>>>
>> >

Reply via email to