It might be useful to mention on FLIP-73 that the intention for Executor.execute is to be an asynchronous API once it becomes public and also refer to FLIP-74 as such.
On Fri, Oct 4, 2019 at 2:52 AM Aljoscha Krettek <aljos...@apache.org> wrote: > Hi Tison, > > I agree, for now the async Executor.execute() is an internal detail but > during your work for FLIP-74 it will probably also reach the public API. > > Best, > Aljoscha > > > On 4. Oct 2019, at 11:39, Zili Chen <wander4...@gmail.com> wrote: > > > > Hi Aljoscha, > > > > After clearly narrow the scope of this FLIP it looks good to me the > > interface > > Executor and its discovery so that I'm glad to see the vote thread. > > > > As you said, we should still discuss on implementation details but I > don't > > think > > it should be a blocker of the vote thread because a vote means we > generally > > agree on the motivation and overall design. > > > > As for Executor.execute() to be async, it is much better than we keep the > > difference between sync/async in this level. But I'd like to note that it > > only > > works internally for now because user-facing interface is still > env.execute > > which block and return a JobExecutionResult. I'm afraid that there are > > several > > people depends on the result for doing post execution process, although > it > > doesn't > > work on current per-job mode. > > > > Best, > > tison. > > > > > > Aljoscha Krettek <aljos...@apache.org> 于2019年10月4日周五 下午4:40写道: > > > >> Do you all think we could agree on the basic executor primitives and > start > >> voting on this FLIP? There are still some implementation details but I > >> think we can discuss/tackle them when we get to them and the various > people > >> implementing this should be in close collaboration. > >> > >> Best, > >> Aljoscha > >> > >>> On 4. Oct 2019, at 10:15, Aljoscha Krettek <aljos...@apache.org> > wrote: > >>> > >>> Hi, > >>> > >>> I think the end goal is to have only one environment per API, but I > >> think we won’t be able to achieve that in the short-term because of > >> backwards compatibility. This is most notable with the context > environment, > >> preview environments etc. > >>> > >>> To keep this FLIP very slim we can make this only about the executors > >> and executor discovery. Anything else like job submission semantics, > >> detached mode, … can be tackled after this. If we don’t focus I’m afraid > >> this will drag on for quite a while. > >>> > >>> One thing I would like to propose to make this easier is to change > >> Executor.execute() to return a CompletableFuture and to completely > remove > >> the “detached” logic from ClusterClient. That way, the new components > make > >> no distinction between “detached” and “attached” but we can still do it > in > >> the CLI (via the ContextEnvironment) to support the existing “detached” > >> behaviour of the CLI that users expect. What do you think about this? > >>> > >>> Best, > >>> Aljoscha > >>> > >>>> On 3. Oct 2019, at 10:03, Zili Chen <wander4...@gmail.com> wrote: > >>>> > >>>> Thanks for your explanation Kostas to make it clear subtasks under > >> FLIP-73. > >>>> > >>>> As you described, changes of Environment are included in this FLIP. > For > >>>> "each > >>>> API to have a single Environment", it could be helpful to describe > which > >>>> APIs we'd > >>>> like to have after FLIP-73. And if we keep multiple Environments, > shall > >> we > >>>> keep the > >>>> way inject context environment for each API? > >>>> > >>>> > >>>> Kostas Kloudas <kklou...@gmail.com> 于2019年10月3日周四 下午1:44写道: > >>>> > >>>>> Hi Tison, > >>>>> > >>>>> The changes that this FLIP propose are: > >>>>> - the introduction of the Executor interface > >>>>> - the fact that everything in the current state of job submission in > >>>>> Flink can be defined through configuration parameters > >>>>> - implementation of Executors that do not change any of the semantics > >>>>> of the currently offered "modes" of job submission > >>>>> > >>>>> In this, and in the FLIP itself where the > >>>>> ExecutionEnvironment.execute() method is described, there are details > >>>>> about parts of the > >>>>> integration with the existing Flink code-base. > >>>>> > >>>>> So I am not sure what do you mean by making the "integration a > >>>>> follow-up discussion". > >>>>> > >>>>> Cheers, > >>>>> Kostas > >>>>> > >>>>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen <wander4...@gmail.com> > wrote: > >>>>>> > >>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the > >>>>>> Executors work, as they are using the exexute() method because this > is > >>>>>> the only "entry" to the user program. To this regard, I believe we > >>>>>> should just see the fact that they have their dedicated environment > as > >>>>>> an "implementation detail". > >>>>>> > >>>>>> The proposal says > >>>>>> > >>>>>> In this document, we propose to abstract away from the Environments > >> the > >>>>> job > >>>>>> submission logic and put it in a newly introduced Executor. This > will > >>>>>> allow *each > >>>>>> API to have a single Environment* which, based on the provided > >>>>>> configuration, will decide which executor to use, *e.g.* Yarn, > Local, > >>>>> etc. > >>>>>> In addition, it will allow different APIs and downstream projects to > >>>>> re-use > >>>>>> the provided executors, thus limiting the amount of code duplication > >> and > >>>>>> the amount of code that has to be written. > >>>>>> > >>>>>> note that This will allow *each API to have a single Environment* > it > >>>>>> seems a bit diverge with you statement above. Or we say a single > >>>>> Environment > >>>>>> as a possible advantage after the introduction of Executor so that > we > >>>>>> exclude it > >>>>>> from this pass. > >>>>>> > >>>>>> Best, > >>>>>> tison. > >>>>>> > >>>>>> > >>>>>> Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:07写道: > >>>>>> > >>>>>>> BTW, correct me if I misunderstand, now I learn more about our > >>>>> community > >>>>>>> way. Since FLIP-73 aimed at introducing an interface with community > >>>>>>> consensus the discussion is more about the interface in order to > >>>>> properly > >>>>>>> define a useful and extensible API. The integration story could be > a > >>>>>>> follow up > >>>>>>> since this one does not affect current behavior at all. > >>>>>>> > >>>>>>> Best, > >>>>>>> tison. > >>>>>>> > >>>>>>> > >>>>>>> Zili Chen <wander4...@gmail.com> 于2019年10月3日周四 上午2:02写道: > >>>>>>> > >>>>>>>> Hi Kostas, > >>>>>>>> > >>>>>>>> It seems does no harm we have a configuration parameter of > >>>>>>>> Executor#execute > >>>>>>>> since we can merge this one with the one configured on Executor > >>>>> created > >>>>>>>> and > >>>>>>>> let this one overwhelm that one. > >>>>>>>> > >>>>>>>> I can see it is useful that conceptually we can create an Executor > >>>>> for a > >>>>>>>> series jobs > >>>>>>>> to the same cluster but with different job configuration per > >> pipeline. > >>>>>>>> > >>>>>>>> Best, > >>>>>>>> tison. > >>>>>>>> > >>>>>>>> > >>>>>>>> Kostas Kloudas <kklou...@apache.org> 于2019年10月3日周四 上午1:37写道: > >>>>>>>> > >>>>>>>>> Hi again, > >>>>>>>>> > >>>>>>>>> I did not include this to my previous email, as this is related > to > >>>>> the > >>>>>>>>> proposal on the FLIP itself. > >>>>>>>>> > >>>>>>>>> In the existing proposal, the Executor interface is the > following. > >>>>>>>>> > >>>>>>>>> public interface Executor { > >>>>>>>>> > >>>>>>>>> JobExecutionResult execute(Pipeline pipeline) throws Exception; > >>>>>>>>> > >>>>>>>>> } > >>>>>>>>> > >>>>>>>>> This implies that all the necessary information for the execution > >> of > >>>>> a > >>>>>>>>> Pipeline should be included in the Configuration passed in the > >>>>>>>>> ExecutorFactory which instantiates the Executor itself. This > should > >>>>>>>>> include, for example, all the parameters currently supplied by > the > >>>>>>>>> ProgramOptions, which are conceptually not executor parameters > but > >>>>>>>>> rather parameters for the execution of the specific pipeline. To > >> this > >>>>>>>>> end, I would like to propose a change in the current Executor > >>>>>>>>> interface showcased below: > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> public interface Executor { > >>>>>>>>> > >>>>>>>>> JobExecutionResult execute(Pipeline pipeline, Configuration > >>>>>>>>> executionOptions) throws Exception; > >>>>>>>>> > >>>>>>>>> } > >>>>>>>>> > >>>>>>>>> The above will allow to have the Executor specific options passed > >> in > >>>>>>>>> the configuration given during executor instantiation, while the > >>>>>>>>> pipeline specific options can be passed in the executionOptions. > >> As a > >>>>>>>>> positive side-effect, this will make Executors re-usable, i.e. > >>>>>>>>> instantiate an executor and use it to execute multiple pipelines, > >> if > >>>>>>>>> in the future we choose to do so. > >>>>>>>>> > >>>>>>>>> Let me know what do you think, > >>>>>>>>> Kostas > >>>>>>>>> > >>>>>>>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas < > kklou...@apache.org > >>> > >>>>>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>> Hi all, > >>>>>>>>>> > >>>>>>>>>> I agree with Tison that we should disentangle threads so that > >>>>> people > >>>>>>>>>> can work independently. > >>>>>>>>>> > >>>>>>>>>> For FLIP-73: > >>>>>>>>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to > the > >>>>>>>>>> Executors work, as they are using the exexute() method because > >>>>> this is > >>>>>>>>>> the only "entry" to the user program. To this regard, I believe > we > >>>>>>>>>> should just see the fact that they have their dedicated > >>>>> environment as > >>>>>>>>>> an "implementation detail". > >>>>>>>>>> - for getting rid of the per-job mode: as a first note, there > was > >>>>>>>>>> already a discussion here: > >>>>>>>>>> > >>>>>>>>> > >>>>> > >> > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E > >>>>>>>>>> with many people, including myself, expressing their opinion. I > am > >>>>>>>>>> mentioning that to show that this topic already has some history > >>>>> and > >>>>>>>>>> the discussin does not start from scratch but there are already > >>>>> some > >>>>>>>>>> contradicting opinions. My opinion is that we should not get rid > >> of > >>>>>>>>>> the per-job mode but I agree that we should discuss about the > >>>>>>>>>> semantics in more detail. Although in terms of code it may be > >>>>> tempting > >>>>>>>>>> to "merge" the two submission modes, one of the main benefits of > >>>>> the > >>>>>>>>>> per-job mode is isolation, both for resources and security, as > the > >>>>>>>>>> jobGraph to be executed is fixed and the cluster is "locked" > just > >>>>> for > >>>>>>>>>> that specific graph. This would be violated by having a session > >>>>>>>>>> cluster launched and having all the infrastrucutre (ports and > >>>>>>>>>> endpoints) set for submittting to that cluster any job. > >>>>>>>>>> - for getting rid of the "detached" mode: I agree with getting > rid > >>>>> of > >>>>>>>>>> it but this implies some potential user-facing changes that > should > >>>>> be > >>>>>>>>>> discussed. > >>>>>>>>>> > >>>>>>>>>> Given the above, I think that: > >>>>>>>>>> 1) in the context of FLIP-73 we should not change any semantics > >> but > >>>>>>>>>> simply push the existing submission logic behind a reusable > >>>>>>>>>> abstraction and make it usable via public APIs, as Aljoscha > said. > >>>>>>>>>> 2) as Till said, changing the semantics is beyond the scope of > >> this > >>>>>>>>>> FLIP and as Tison mentioned we should work towards decoupling > >>>>>>>>>> discussions rather than the opposite. So let's discuss about the > >>>>>>>>>> future of the per-job and detached modes in a separate thread. > >> This > >>>>>>>>>> will also allow to give the proper visibility to such an > important > >>>>>>>>>> topic. > >>>>>>>>>> > >>>>>>>>>> Cheers, > >>>>>>>>>> Kostas > >>>>>>>>>> > >>>>>>>>>> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen <wander4...@gmail.com> > >>>>> wrote: > >>>>>>>>>>> > >>>>>>>>>>> Thanks for your thoughts Aljoscha. > >>>>>>>>>>> > >>>>>>>>>>> Another question since FLIP-73 might contains refactors on > >>>>>>>>> Environemnt: > >>>>>>>>>>> shall we support > >>>>>>>>>>> something like PreviewPlanEnvironment? If so, how? From a user > >>>>>>>>> perspective > >>>>>>>>>>> preview plan > >>>>>>>>>>> is useful, by give visual view, to modify topos and configure > >>>>> without > >>>>>>>>>>> submit it. > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> tison. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> 于2019年10月2日周三 > 下午10:10写道: > >>>>>>>>>>> > >>>>>>>>>>>> I agree with Till that we should not change the semantics of > >>>>>>>>> per-job mode. > >>>>>>>>>>>> In my opinion per-job mode means that the cluster (JobManager) > >>>>> is > >>>>>>>>> brought > >>>>>>>>>>>> up with one job and it only executes that one job. There > >>>>> should be > >>>>>>>>> no open > >>>>>>>>>>>> ports/anything that would allow submitting further jobs. This > >>>>> is > >>>>>>>>> very > >>>>>>>>>>>> important for deployments in docker/Kubernetes or other > >>>>>>>>> environments were > >>>>>>>>>>>> you bring up jobs without necessarily having the notion of a > >>>>> Flink > >>>>>>>>> cluster. > >>>>>>>>>>>> > >>>>>>>>>>>> What this means for a user program that has multiple execute() > >>>>>>>>> calls is > >>>>>>>>>>>> that you will get a fresh cluster for each execute call. This > >>>>> also > >>>>>>>>> means, > >>>>>>>>>>>> that further execute() calls will only happen if the “client” > >>>>> is > >>>>>>>>> still > >>>>>>>>>>>> alive, because it is the one driving execution. Currently, > this > >>>>>>>>> only works > >>>>>>>>>>>> if you start the job in “attached” mode. If you start in > >>>>>>>>> “detached” mode > >>>>>>>>>>>> only the first execute() will happen and the rest will be > >>>>> ignored. > >>>>>>>>>>>> > >>>>>>>>>>>> This brings us to the tricky question about what to do about > >>>>>>>>> “detached” > >>>>>>>>>>>> and “attached”. In the long run, I would like to get rid of > the > >>>>>>>>> distinction > >>>>>>>>>>>> and leave it up to the user program, by either blocking or not > >>>>> on > >>>>>>>>> the > >>>>>>>>>>>> Future (or JobClient or whatnot) that job submission returns. > >>>>> This, > >>>>>>>>>>>> however, means that users cannot simply request “detached” > >>>>>>>>> execution when > >>>>>>>>>>>> using bin/flink, the user program has to “play along”. On the > >>>>>>>>> other hand, > >>>>>>>>>>>> “detached” mode is quite strange for the user program. The > >>>>>>>>> execute() call > >>>>>>>>>>>> either returns with a proper job result after the job ran (in > >>>>>>>>> “attached” > >>>>>>>>>>>> mode) or with a dummy result (in “detached” mode) right after > >>>>>>>>> submission. I > >>>>>>>>>>>> think this can even lead to weird cases where multiple > >>>>> "execute()” > >>>>>>>>> run in > >>>>>>>>>>>> parallel. For per-job detached mode we also “throw” out of the > >>>>>>>>> first > >>>>>>>>>>>> execute so the rest (including result processing logic) is > >>>>> ignored. > >>>>>>>>>>>> > >>>>>>>>>>>> For this here FLIP-73 we can (and should) ignore these > >>>>> problems, > >>>>>>>>> because > >>>>>>>>>>>> FLIP-73 only moves the existing submission logic behind a > >>>>> reusable > >>>>>>>>>>>> abstraction and makes it usable via API. We should closely > >>>>> follow > >>>>>>>>> up on the > >>>>>>>>>>>> above points though because I think they are also important. > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Aljoscha > >>>>>>>>>>>> > >>>>>>>>>>>>> On 2. Oct 2019, at 12:08, Zili Chen <wander4...@gmail.com> > >>>>>>>>> wrote: > >>>>>>>>>>>>> > >>>>>>>>>>>>> Thanks for your clarification Till. > >>>>>>>>>>>>> > >>>>>>>>>>>>> I agree with the current semantics of the per-job mode, one > >>>>>>>>> should > >>>>>>>>>>>> deploy a > >>>>>>>>>>>>> new cluster for each part of the job. Apart from the > >>>>> performance > >>>>>>>>> concern > >>>>>>>>>>>>> it also means that PerJobExecutor knows how to deploy a > >>>>> cluster > >>>>>>>>> actually, > >>>>>>>>>>>>> which is different from the description that Executor submit > >>>>> a > >>>>>>>>> job. > >>>>>>>>>>>>> > >>>>>>>>>>>>> Anyway it sounds workable and narrow the changes. > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>> > >>>>>>>> > >>>>> > >>> > >> > >> > >