Hi Tison, I agree, for now the async Executor.execute() is an internal detail but during your work for FLIP-74 it will probably also reach the public API.
Best, Aljoscha > On 4. Oct 2019, at 11:39, Zili Chen <wander4...@gmail.com> wrote: > > Hi Aljoscha, > > After clearly narrow the scope of this FLIP it looks good to me the > interface > Executor and its discovery so that I'm glad to see the vote thread. > > As you said, we should still discuss on implementation details but I don't > think > it should be a blocker of the vote thread because a vote means we generally > agree on the motivation and overall design. > > As for Executor.execute() to be async, it is much better than we keep the > difference between sync/async in this level. But I'd like to note that it > only > works internally for now because user-facing interface is still env.execute > which block and return a JobExecutionResult. I'm afraid that there are > several > people depends on the result for doing post execution process, although it > doesn't > work on current per-job mode. > > Best, > tison. > > > Aljoscha Krettek <aljos...@apache.org> 于2019年10月4日周五 下午4:40写道: > >> Do you all think we could agree on the basic executor primitives and start >> voting on this FLIP? There are still some implementation details but I >> think we can discuss/tackle them when we get to them and the various people >> implementing this should be in close collaboration. >> >> Best, >> Aljoscha >> >>> On 4. Oct 2019, at 10:15, Aljoscha Krettek <aljos...@apache.org> wrote: >>> >>> Hi, >>> >>> I think the end goal is to have only one environment per API, but I >> think we won’t be able to achieve that in the short-term because of >> backwards compatibility. This is most notable with the context environment, >> preview environments etc. >>> >>> To keep this FLIP very slim we can make this only about the executors >> and executor discovery. Anything else like job submission semantics, >> detached mode, … can be tackled after this. If we don’t focus I’m afraid >> this will drag on for quite a while. >>> >>> One thing I would like to propose to make this easier is to change >> Executor.execute() to return a CompletableFuture and to completely remove >> the “detached” logic from ClusterClient. That way, the new components make >> no distinction between “detached” and “attached” but we can still do it in >> the CLI (via the ContextEnvironment) to support the existing “detached” >> behaviour of the CLI that users expect. What do you think about this? >>> >>> Best, >>> Aljoscha >>> >>>> On 3. Oct 2019, at 10:03, Zili Chen <wander4...@gmail.com> wrote: >>>> >>>> Thanks for your explanation Kostas to make it clear subtasks under >> FLIP-73. >>>> >>>> As you described, changes of Environment are included in this FLIP. For >>>> "each >>>> API to have a single Environment", it could be helpful to describe which >>>> APIs we'd >>>> like to have after FLIP-73. And if we keep multiple Environments, shall >> we >>>> keep the >>>> way inject context environment for each API? >>>> >>>> >>>> Kostas Kloudas <kklou...@gmail.com> 于2019年10月3日周四 下午1:44写道: >>>> >>>>> Hi Tison, >>>>> >>>>> The changes that this FLIP propose are: >>>>> - the introduction of the Executor interface >>>>> - the fact that everything in the current state of job submission in >>>>> Flink can be defined through configuration parameters >>>>> - implementation of Executors that do not change any of the semantics >>>>> of the currently offered "modes" of job submission >>>>> >>>>> In this, and in the FLIP itself where the >>>>> ExecutionEnvironment.execute() method is described, there are details >>>>> about parts of the >>>>> integration with the existing Flink code-base. >>>>> >>>>> So I am not sure what do you mean by making the "integration a >>>>> follow-up discussion". >>>>> >>>>> Cheers, >>>>> Kostas >>>>> >>>>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <wander4...@gmail.com> wrote: >>>>>> >>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the >>>>>> Executors work, as they are using the exexute() method because this is >>>>>> the only "entry" to the user program. To this regard, I believe we >>>>>> should just see the fact that they have their dedicated environment as >>>>>> an "implementation detail". >>>>>> >>>>>> The proposal says >>>>>> >>>>>> In this document, we propose to abstract away from the Environments >> the >>>>> job >>>>>> submission logic and put it in a newly introduced Executor. This will >>>>>> allow *each >>>>>> API to have a single Environment* which, based on the provided >>>>>> configuration, will decide which executor to use, *e.g.* Yarn, Local, >>>>> etc. >>>>>> In addition, it will allow different APIs and downstream projects to >>>>> re-use >>>>>> the provided executors, thus limiting the amount of code duplication >> and >>>>>> the amount of code that has to be written. >>>>>> >>>>>> note that This will allow *each API to have a single Environment* it >>>>>> seems a bit diverge with you statement above. Or we say a single >>>>> Environment >>>>>> as a possible advantage after the introduction of Executor so that we >>>>>> exclude it >>>>>> from this pass. >>>>>> >>>>>> Best, >>>>>> tison. >>>>>> >>>>>> >>>>>> Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:07写道: >>>>>> >>>>>>> BTW, correct me if I misunderstand, now I learn more about our >>>>> community >>>>>>> way. Since FLIP-73 aimed at introducing an interface with community >>>>>>> consensus the discussion is more about the interface in order to >>>>> properly >>>>>>> define a useful and extensible API. The integration story could be a >>>>>>> follow up >>>>>>> since this one does not affect current behavior at all. >>>>>>> >>>>>>> Best, >>>>>>> tison. >>>>>>> >>>>>>> >>>>>>> Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:02写道: >>>>>>> >>>>>>>> Hi Kostas, >>>>>>>> >>>>>>>> It seems does no harm we have a configuration parameter of >>>>>>>> Executor#execute >>>>>>>> since we can merge this one with the one configured on Executor >>>>> created >>>>>>>> and >>>>>>>> let this one overwhelm that one. >>>>>>>> >>>>>>>> I can see it is useful that conceptually we can create an Executor >>>>> for a >>>>>>>> series jobs >>>>>>>> to the same cluster but with different job configuration per >> pipeline. >>>>>>>> >>>>>>>> Best, >>>>>>>> tison. >>>>>>>> >>>>>>>> >>>>>>>> Kostas Kloudas <kklou...@apache.org> 于2019年10月3日周四 上午1:37写道: >>>>>>>> >>>>>>>>> Hi again, >>>>>>>>> >>>>>>>>> I did not include this to my previous email, as this is related to >>>>> the >>>>>>>>> proposal on the FLIP itself. >>>>>>>>> >>>>>>>>> In the existing proposal, the Executor interface is the following. >>>>>>>>> >>>>>>>>> public interface Executor { >>>>>>>>> >>>>>>>>> JobExecutionResult execute(Pipeline pipeline) throws Exception; >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> This implies that all the necessary information for the execution >> of >>>>> a >>>>>>>>> Pipeline should be included in the Configuration passed in the >>>>>>>>> ExecutorFactory which instantiates the Executor itself. This should >>>>>>>>> include, for example, all the parameters currently supplied by the >>>>>>>>> ProgramOptions, which are conceptually not executor parameters but >>>>>>>>> rather parameters for the execution of the specific pipeline. To >> this >>>>>>>>> end, I would like to propose a change in the current Executor >>>>>>>>> interface showcased below: >>>>>>>>> >>>>>>>>> >>>>>>>>> public interface Executor { >>>>>>>>> >>>>>>>>> JobExecutionResult execute(Pipeline pipeline, Configuration >>>>>>>>> executionOptions) throws Exception; >>>>>>>>> >>>>>>>>> } >>>>>>>>> >>>>>>>>> The above will allow to have the Executor specific options passed >> in >>>>>>>>> the configuration given during executor instantiation, while the >>>>>>>>> pipeline specific options can be passed in the executionOptions. >> As a >>>>>>>>> positive side-effect, this will make Executors re-usable, i.e. >>>>>>>>> instantiate an executor and use it to execute multiple pipelines, >> if >>>>>>>>> in the future we choose to do so. >>>>>>>>> >>>>>>>>> Let me know what do you think, >>>>>>>>> Kostas >>>>>>>>> >>>>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas <kklou...@apache.org >>> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> Hi all, >>>>>>>>>> >>>>>>>>>> I agree with Tison that we should disentangle threads so that >>>>> people >>>>>>>>>> can work independently. >>>>>>>>>> >>>>>>>>>> For FLIP-73: >>>>>>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the >>>>>>>>>> Executors work, as they are using the exexute() method because >>>>> this is >>>>>>>>>> the only "entry" to the user program. To this regard, I believe we >>>>>>>>>> should just see the fact that they have their dedicated >>>>> environment as >>>>>>>>>> an "implementation detail". >>>>>>>>>> - for getting rid of the per-job mode: as a first note, there was >>>>>>>>>> already a discussion here: >>>>>>>>>> >>>>>>>>> >>>>> >> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E >>>>>>>>>> with many people, including myself, expressing their opinion. I am >>>>>>>>>> mentioning that to show that this topic already has some history >>>>> and >>>>>>>>>> the discussin does not start from scratch but there are already >>>>> some >>>>>>>>>> contradicting opinions. My opinion is that we should not get rid >> of >>>>>>>>>> the per-job mode but I agree that we should discuss about the >>>>>>>>>> semantics in more detail. Although in terms of code it may be >>>>> tempting >>>>>>>>>> to "merge" the two submission modes, one of the main benefits of >>>>> the >>>>>>>>>> per-job mode is isolation, both for resources and security, as the >>>>>>>>>> jobGraph to be executed is fixed and the cluster is "locked" just >>>>> for >>>>>>>>>> that specific graph. This would be violated by having a session >>>>>>>>>> cluster launched and having all the infrastrucutre (ports and >>>>>>>>>> endpoints) set for submittting to that cluster any job. >>>>>>>>>> - for getting rid of the "detached" mode: I agree with getting rid >>>>> of >>>>>>>>>> it but this implies some potential user-facing changes that should >>>>> be >>>>>>>>>> discussed. >>>>>>>>>> >>>>>>>>>> Given the above, I think that: >>>>>>>>>> 1) in the context of FLIP-73 we should not change any semantics >> but >>>>>>>>>> simply push the existing submission logic behind a reusable >>>>>>>>>> abstraction and make it usable via public APIs, as Aljoscha said. >>>>>>>>>> 2) as Till said, changing the semantics is beyond the scope of >> this >>>>>>>>>> FLIP and as Tison mentioned we should work towards decoupling >>>>>>>>>> discussions rather than the opposite. So let's discuss about the >>>>>>>>>> future of the per-job and detached modes in a separate thread. >> This >>>>>>>>>> will also allow to give the proper visibility to such an important >>>>>>>>>> topic. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Kostas >>>>>>>>>> >>>>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <wander4...@gmail.com> >>>>> wrote: >>>>>>>>>>> >>>>>>>>>>> Thanks for your thoughts Aljoscha. >>>>>>>>>>> >>>>>>>>>>> Another question since FLIP-73 might contains refactors on >>>>>>>>> Environemnt: >>>>>>>>>>> shall we support >>>>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a user >>>>>>>>> perspective >>>>>>>>>>> preview plan >>>>>>>>>>> is useful, by give visual view, to modify topos and configure >>>>> without >>>>>>>>>>> submit it. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> tison. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> 于2019年10月2日周三 下午10:10写道: >>>>>>>>>>> >>>>>>>>>>>> I agree with Till that we should not change the semantics of >>>>>>>>> per-job mode. >>>>>>>>>>>> In my opinion per-job mode means that the cluster (JobManager) >>>>> is >>>>>>>>> brought >>>>>>>>>>>> up with one job and it only executes that one job. There >>>>> should be >>>>>>>>> no open >>>>>>>>>>>> ports/anything that would allow submitting further jobs. This >>>>> is >>>>>>>>> very >>>>>>>>>>>> important for deployments in docker/Kubernetes or other >>>>>>>>> environments were >>>>>>>>>>>> you bring up jobs without necessarily having the notion of a >>>>> Flink >>>>>>>>> cluster. >>>>>>>>>>>> >>>>>>>>>>>> What this means for a user program that has multiple execute() >>>>>>>>> calls is >>>>>>>>>>>> that you will get a fresh cluster for each execute call. This >>>>> also >>>>>>>>> means, >>>>>>>>>>>> that further execute() calls will only happen if the “client” >>>>> is >>>>>>>>> still >>>>>>>>>>>> alive, because it is the one driving execution. Currently, this >>>>>>>>> only works >>>>>>>>>>>> if you start the job in “attached” mode. If you start in >>>>>>>>> “detached” mode >>>>>>>>>>>> only the first execute() will happen and the rest will be >>>>> ignored. >>>>>>>>>>>> >>>>>>>>>>>> This brings us to the tricky question about what to do about >>>>>>>>> “detached” >>>>>>>>>>>> and “attached”. In the long run, I would like to get rid of the >>>>>>>>> distinction >>>>>>>>>>>> and leave it up to the user program, by either blocking or not >>>>> on >>>>>>>>> the >>>>>>>>>>>> Future (or JobClient or whatnot) that job submission returns. >>>>> This, >>>>>>>>>>>> however, means that users cannot simply request “detached” >>>>>>>>> execution when >>>>>>>>>>>> using bin/flink, the user program has to “play along”. On the >>>>>>>>> other hand, >>>>>>>>>>>> “detached” mode is quite strange for the user program. The >>>>>>>>> execute() call >>>>>>>>>>>> either returns with a proper job result after the job ran (in >>>>>>>>> “attached” >>>>>>>>>>>> mode) or with a dummy result (in “detached” mode) right after >>>>>>>>> submission. I >>>>>>>>>>>> think this can even lead to weird cases where multiple >>>>> "execute()” >>>>>>>>> run in >>>>>>>>>>>> parallel. For per-job detached mode we also “throw” out of the >>>>>>>>> first >>>>>>>>>>>> execute so the rest (including result processing logic) is >>>>> ignored. >>>>>>>>>>>> >>>>>>>>>>>> For this here FLIP-73 we can (and should) ignore these >>>>> problems, >>>>>>>>> because >>>>>>>>>>>> FLIP-73 only moves the existing submission logic behind a >>>>> reusable >>>>>>>>>>>> abstraction and makes it usable via API. We should closely >>>>> follow >>>>>>>>> up on the >>>>>>>>>>>> above points though because I think they are also important. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Aljoscha >>>>>>>>>>>> >>>>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen <wander4...@gmail.com> >>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for your clarification Till. >>>>>>>>>>>>> >>>>>>>>>>>>> I agree with the current semantics of the per-job mode, one >>>>>>>>> should >>>>>>>>>>>> deploy a >>>>>>>>>>>>> new cluster for each part of the job. Apart from the >>>>> performance >>>>>>>>> concern >>>>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a >>>>> cluster >>>>>>>>> actually, >>>>>>>>>>>>> which is different from the description that Executor submit >>>>> a >>>>>>>>> job. >>>>>>>>>>>>> >>>>>>>>>>>>> Anyway it sounds workable and narrow the changes. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>> >>> >> >>