Hi Tison,

I agree, for now the async Executor.execute() is an internal detail but during 
your work for FLIP-74 it will probably also reach the public API.

Best,
Aljoscha

> On 4. Oct 2019, at 11:39, Zili Chen <wander4...@gmail.com> wrote:
> 
> Hi Aljoscha,
> 
> After clearly narrow the scope of this FLIP it looks good to me the
> interface
> Executor and its discovery so that I'm glad to see the vote thread.
> 
> As you said, we should still discuss on implementation details but I don't
> think
> it should be a blocker of the vote thread because a vote means we generally
> agree on the motivation and overall design.
> 
> As for Executor.execute() to be async, it is much better than we keep the
> difference between sync/async in this level. But I'd like to note that it
> only
> works internally for now because user-facing interface is still env.execute
> which block and return a JobExecutionResult. I'm afraid that there are
> several
> people depends on the result for doing post execution process, although it
> doesn't
> work on current per-job mode.
> 
> Best,
> tison.
> 
> 
> Aljoscha Krettek <aljos...@apache.org> 于2019年10月4日周五 下午4:40写道:
> 
>> Do you all think we could agree on the basic executor primitives and start
>> voting on this FLIP? There are still some implementation details but I
>> think we can discuss/tackle them when we get to them and the various people
>> implementing this should be in close collaboration.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 4. Oct 2019, at 10:15, Aljoscha Krettek <aljos...@apache.org> wrote:
>>> 
>>> Hi,
>>> 
>>> I think the end goal is to have only one environment per API, but I
>> think we won’t be able to achieve that in the short-term because of
>> backwards compatibility. This is most notable with the context environment,
>> preview environments etc.
>>> 
>>> To keep this FLIP very slim we can make this only about the executors
>> and executor discovery. Anything else like job submission semantics,
>> detached mode, … can be tackled after this. If we don’t focus I’m afraid
>> this will drag on for quite a while.
>>> 
>>> One thing I would like to propose to make this easier is to change
>> Executor.execute() to return a CompletableFuture and to completely remove
>> the “detached” logic from ClusterClient. That way, the new components make
>> no distinction between “detached” and “attached” but we can still do it in
>> the CLI (via the ContextEnvironment) to support the existing “detached”
>> behaviour of the CLI that users expect. What do you think about this?
>>> 
>>> Best,
>>> Aljoscha
>>> 
>>>> On 3. Oct 2019, at 10:03, Zili Chen <wander4...@gmail.com> wrote:
>>>> 
>>>> Thanks for your explanation Kostas to make it clear subtasks under
>> FLIP-73.
>>>> 
>>>> As you described, changes of Environment are included in this FLIP. For
>>>> "each
>>>> API to have a single Environment", it could be helpful to describe which
>>>> APIs we'd
>>>> like to have after FLIP-73. And if we keep multiple Environments, shall
>> we
>>>> keep the
>>>> way inject context environment for each API?
>>>> 
>>>> 
>>>> Kostas Kloudas <kklou...@gmail.com> 于2019年10月3日周四 下午1:44写道:
>>>> 
>>>>> Hi Tison,
>>>>> 
>>>>> The changes that this FLIP propose are:
>>>>> - the introduction of the Executor interface
>>>>> - the fact that everything in the current state of job submission in
>>>>> Flink can be defined through configuration parameters
>>>>> - implementation of Executors that do not change any of the semantics
>>>>> of the currently offered "modes" of job submission
>>>>> 
>>>>> In this, and in the FLIP itself where the
>>>>> ExecutionEnvironment.execute() method is described, there are details
>>>>> about parts of the
>>>>> integration with the existing Flink code-base.
>>>>> 
>>>>> So I am not sure what do you mean by making the "integration a
>>>>> follow-up discussion".
>>>>> 
>>>>> Cheers,
>>>>> Kostas
>>>>> 
>>>>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <wander4...@gmail.com> wrote:
>>>>>> 
>>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>>>>>> Executors work, as they are using the exexute() method because this is
>>>>>> the only "entry" to the user program. To this regard, I believe we
>>>>>> should just see the fact that they have their dedicated environment as
>>>>>> an "implementation detail".
>>>>>> 
>>>>>> The proposal says
>>>>>> 
>>>>>> In this document, we propose to abstract away from the Environments
>> the
>>>>> job
>>>>>> submission logic and put it in a newly introduced Executor. This will
>>>>>> allow *each
>>>>>> API to have a single Environment* which, based on the provided
>>>>>> configuration, will decide which executor to use, *e.g.* Yarn, Local,
>>>>> etc.
>>>>>> In addition, it will allow different APIs and downstream projects to
>>>>> re-use
>>>>>> the provided executors, thus limiting the amount of code duplication
>> and
>>>>>> the amount of code that has to be written.
>>>>>> 
>>>>>> note that This will allow *each API to have a single Environment*  it
>>>>>> seems a bit diverge with you statement above. Or we say a single
>>>>> Environment
>>>>>> as a possible advantage after the introduction of Executor so that we
>>>>>> exclude it
>>>>>> from this pass.
>>>>>> 
>>>>>> Best,
>>>>>> tison.
>>>>>> 
>>>>>> 
>>>>>> Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:07写道:
>>>>>> 
>>>>>>> BTW, correct me if I misunderstand, now I learn more about our
>>>>> community
>>>>>>> way. Since FLIP-73 aimed at introducing an interface with community
>>>>>>> consensus the discussion is more about the interface in order to
>>>>> properly
>>>>>>> define a useful and extensible API. The integration story could be a
>>>>>>> follow up
>>>>>>> since this one does not affect current behavior at all.
>>>>>>> 
>>>>>>> Best,
>>>>>>> tison.
>>>>>>> 
>>>>>>> 
>>>>>>> Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:02写道:
>>>>>>> 
>>>>>>>> Hi Kostas,
>>>>>>>> 
>>>>>>>> It seems does no harm we have a configuration parameter of
>>>>>>>> Executor#execute
>>>>>>>> since we can merge this one with the one configured on Executor
>>>>> created
>>>>>>>> and
>>>>>>>> let this one overwhelm that one.
>>>>>>>> 
>>>>>>>> I can see it is useful that conceptually we can create an Executor
>>>>> for a
>>>>>>>> series jobs
>>>>>>>> to the same cluster but with different job configuration per
>> pipeline.
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> tison.
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Kostas Kloudas <kklou...@apache.org> 于2019年10月3日周四 上午1:37写道:
>>>>>>>> 
>>>>>>>>> Hi again,
>>>>>>>>> 
>>>>>>>>> I did not include this to my previous email, as this is related to
>>>>> the
>>>>>>>>> proposal on the FLIP itself.
>>>>>>>>> 
>>>>>>>>> In the existing proposal, the Executor interface is the following.
>>>>>>>>> 
>>>>>>>>> public interface Executor {
>>>>>>>>> 
>>>>>>>>> JobExecutionResult execute(Pipeline pipeline) throws Exception;
>>>>>>>>> 
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> This implies that all the necessary information for the execution
>> of
>>>>> a
>>>>>>>>> Pipeline should be included in the Configuration passed in the
>>>>>>>>> ExecutorFactory which instantiates the Executor itself. This should
>>>>>>>>> include, for example, all the parameters currently supplied by the
>>>>>>>>> ProgramOptions, which are conceptually not executor parameters but
>>>>>>>>> rather parameters for the execution of the specific pipeline. To
>> this
>>>>>>>>> end, I would like to propose a change in the current Executor
>>>>>>>>> interface showcased below:
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> public interface Executor {
>>>>>>>>> 
>>>>>>>>> JobExecutionResult execute(Pipeline pipeline, Configuration
>>>>>>>>> executionOptions) throws Exception;
>>>>>>>>> 
>>>>>>>>> }
>>>>>>>>> 
>>>>>>>>> The above will allow to have the Executor specific options passed
>> in
>>>>>>>>> the configuration given during executor instantiation, while the
>>>>>>>>> pipeline specific options can be passed in the executionOptions.
>> As a
>>>>>>>>> positive side-effect, this will make Executors re-usable, i.e.
>>>>>>>>> instantiate an executor and use it to execute multiple pipelines,
>> if
>>>>>>>>> in the future we choose to do so.
>>>>>>>>> 
>>>>>>>>> Let me know what do you think,
>>>>>>>>> Kostas
>>>>>>>>> 
>>>>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <kklou...@apache.org
>>> 
>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi all,
>>>>>>>>>> 
>>>>>>>>>> I agree with Tison that we should disentangle threads so that
>>>>> people
>>>>>>>>>> can work independently.
>>>>>>>>>> 
>>>>>>>>>> For FLIP-73:
>>>>>>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>>>>>>>>>> Executors work, as they are using the exexute() method because
>>>>> this is
>>>>>>>>>> the only "entry" to the user program. To this regard, I believe we
>>>>>>>>>> should just see the fact that they have their dedicated
>>>>> environment as
>>>>>>>>>> an "implementation detail".
>>>>>>>>>> - for getting rid of the per-job mode: as a first note, there was
>>>>>>>>>> already a discussion here:
>>>>>>>>>> 
>>>>>>>>> 
>>>>> 
>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>>>>>>>>>> with many people, including myself, expressing their opinion. I am
>>>>>>>>>> mentioning that to show that this topic already has some history
>>>>> and
>>>>>>>>>> the discussin does not start from scratch but there are already
>>>>> some
>>>>>>>>>> contradicting opinions. My opinion is that we should not get rid
>> of
>>>>>>>>>> the per-job mode but I agree that we should discuss about the
>>>>>>>>>> semantics in more detail. Although in terms of code it may be
>>>>> tempting
>>>>>>>>>> to "merge" the two submission modes, one of the main benefits of
>>>>> the
>>>>>>>>>> per-job mode is isolation, both for resources and security, as the
>>>>>>>>>> jobGraph to be executed is fixed and the cluster is "locked" just
>>>>> for
>>>>>>>>>> that specific graph. This would be violated by having a session
>>>>>>>>>> cluster launched and having all the infrastrucutre (ports and
>>>>>>>>>> endpoints) set for submittting to that cluster any job.
>>>>>>>>>> - for getting rid of the "detached" mode: I agree with getting rid
>>>>> of
>>>>>>>>>> it but this implies some potential user-facing changes that should
>>>>> be
>>>>>>>>>> discussed.
>>>>>>>>>> 
>>>>>>>>>> Given the above, I think that:
>>>>>>>>>> 1) in the context of FLIP-73 we should not change any semantics
>> but
>>>>>>>>>> simply push the existing submission logic behind a reusable
>>>>>>>>>> abstraction and make it usable via public APIs, as Aljoscha said.
>>>>>>>>>> 2) as Till said, changing the semantics is beyond the scope of
>> this
>>>>>>>>>> FLIP and as Tison mentioned we should work towards decoupling
>>>>>>>>>> discussions rather than the opposite. So let's discuss about the
>>>>>>>>>> future of the per-job and detached modes in a separate thread.
>> This
>>>>>>>>>> will also allow to give the proper visibility to such an important
>>>>>>>>>> topic.
>>>>>>>>>> 
>>>>>>>>>> Cheers,
>>>>>>>>>> Kostas
>>>>>>>>>> 
>>>>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <wander4...@gmail.com>
>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for your thoughts Aljoscha.
>>>>>>>>>>> 
>>>>>>>>>>> Another question since FLIP-73 might contains refactors on
>>>>>>>>> Environemnt:
>>>>>>>>>>> shall we support
>>>>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a user
>>>>>>>>> perspective
>>>>>>>>>>> preview plan
>>>>>>>>>>> is useful, by give visual view, to modify topos and configure
>>>>> without
>>>>>>>>>>> submit it.
>>>>>>>>>>> 
>>>>>>>>>>> Best,
>>>>>>>>>>> tison.
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> 于2019年10月2日周三 下午10:10写道:
>>>>>>>>>>> 
>>>>>>>>>>>> I agree with Till that we should not change the semantics of
>>>>>>>>> per-job mode.
>>>>>>>>>>>> In my opinion per-job mode means that the cluster (JobManager)
>>>>> is
>>>>>>>>> brought
>>>>>>>>>>>> up with one job and it only executes that one job. There
>>>>> should be
>>>>>>>>> no open
>>>>>>>>>>>> ports/anything that would allow submitting further jobs. This
>>>>> is
>>>>>>>>> very
>>>>>>>>>>>> important for deployments in docker/Kubernetes or other
>>>>>>>>> environments were
>>>>>>>>>>>> you bring up jobs without necessarily having the notion of a
>>>>> Flink
>>>>>>>>> cluster.
>>>>>>>>>>>> 
>>>>>>>>>>>> What this means for a user program that has multiple execute()
>>>>>>>>> calls is
>>>>>>>>>>>> that you will get a fresh cluster for each execute call. This
>>>>> also
>>>>>>>>> means,
>>>>>>>>>>>> that further execute() calls will only happen if the “client”
>>>>> is
>>>>>>>>> still
>>>>>>>>>>>> alive, because it is the one driving execution. Currently, this
>>>>>>>>> only works
>>>>>>>>>>>> if you start the job in “attached” mode. If you start in
>>>>>>>>> “detached” mode
>>>>>>>>>>>> only the first execute() will happen and the rest will be
>>>>> ignored.
>>>>>>>>>>>> 
>>>>>>>>>>>> This brings us to the tricky question about what to do about
>>>>>>>>> “detached”
>>>>>>>>>>>> and “attached”. In the long run, I would like to get rid of the
>>>>>>>>> distinction
>>>>>>>>>>>> and leave it up to the user program, by either blocking or not
>>>>> on
>>>>>>>>> the
>>>>>>>>>>>> Future (or JobClient or whatnot) that job submission returns.
>>>>> This,
>>>>>>>>>>>> however, means that users cannot simply request “detached”
>>>>>>>>> execution when
>>>>>>>>>>>> using bin/flink, the user program has to “play along”. On the
>>>>>>>>> other hand,
>>>>>>>>>>>> “detached” mode is quite strange for the user program. The
>>>>>>>>> execute() call
>>>>>>>>>>>> either returns with a proper job result after the job ran (in
>>>>>>>>> “attached”
>>>>>>>>>>>> mode) or with a dummy result (in “detached” mode) right after
>>>>>>>>> submission. I
>>>>>>>>>>>> think this can even lead to weird cases where multiple
>>>>> "execute()”
>>>>>>>>> run in
>>>>>>>>>>>> parallel. For per-job detached mode we also “throw” out of the
>>>>>>>>> first
>>>>>>>>>>>> execute so the rest (including result processing logic) is
>>>>> ignored.
>>>>>>>>>>>> 
>>>>>>>>>>>> For this here FLIP-73 we can (and should) ignore these
>>>>> problems,
>>>>>>>>> because
>>>>>>>>>>>> FLIP-73 only moves the existing submission logic behind a
>>>>> reusable
>>>>>>>>>>>> abstraction and makes it usable via API. We should closely
>>>>> follow
>>>>>>>>> up on the
>>>>>>>>>>>> above points though because I think they are also important.
>>>>>>>>>>>> 
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Aljoscha
>>>>>>>>>>>> 
>>>>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen <wander4...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for your clarification Till.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I agree with the current semantics of the per-job mode, one
>>>>>>>>> should
>>>>>>>>>>>> deploy a
>>>>>>>>>>>>> new cluster for each part of the job. Apart from the
>>>>> performance
>>>>>>>>> concern
>>>>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a
>>>>> cluster
>>>>>>>>> actually,
>>>>>>>>>>>>> which is different from the description that Executor submit
>>>>> a
>>>>>>>>> job.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Anyway it sounds workable and narrow the changes.
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>> 
>>> 
>> 
>> 

Reply via email to