Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-10 Thread Kostas Kloudas
Yes, it is a JobClient.

Cheers,
Kostas

On Thu, Oct 10, 2019 at 11:26 AM Zili Chen  wrote:
>
> Thanks for your explanation Kostas.
>
> I agree that Clients are independent from the Executors. From
> your text I wonder one thing that whether Executor#execute
> returns a cluster client or a job client? As discussed previously
> I think conceptually it is a job client?
>
> Best,
> tison.
>
>
> Kostas Kloudas  于2019年10月10日周四 下午5:08写道:
>
> > Hi Tison,
> >
> > I would say that as a first step, and until we see that the interfaces
> > we introduce cover all intended purposes, we keep the Executors
> > non-public.
> > From the previous discussion, I think that in general the Clients are
> > independent from the Executors, as the Executors simply use the
> > clients to submit jobs and return a cluster client.
> >
> > Cheers,
> > Kostas
> >
> > On Wed, Oct 9, 2019 at 7:01 PM Zili Chen  wrote:
> > >
> > > Hi Kostas & Aljoscha,
> > >
> > > I'm drafting a plan exposing multi-layered clients. It is mainly about
> > > how we distinguish different layers and what clients we're going to
> > > expose.
> > >
> > > In FLIP-73 scope I'd like to ask a question that whether or not Executor
> > > becomes a public interface that can be made use of by downstream
> > > project developer? Or it just an internal concept for unifying job
> > > submission?
> > > If it is the latter, I'm feeling multi-layer client topic is totally
> > > independent from
> > > Executor.
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Thomas Weise  于2019年10月5日周六 上午12:17写道:
> > >
> > > > It might be useful to mention on FLIP-73 that the intention for
> > > > Executor.execute is to be an asynchronous API once it becomes public
> > and
> > > > also refer to FLIP-74 as such.
> > > >
> > > >
> > > > On Fri, Oct 4, 2019 at 2:52 AM Aljoscha Krettek 
> > > > wrote:
> > > >
> > > > > Hi Tison,
> > > > >
> > > > > I agree, for now the async Executor.execute() is an internal detail
> > but
> > > > > during your work for FLIP-74 it will probably also reach the public
> > API.
> > > > >
> > > > > Best,
> > > > > Aljoscha
> > > > >
> > > > > > On 4. Oct 2019, at 11:39, Zili Chen  wrote:
> > > > > >
> > > > > > Hi Aljoscha,
> > > > > >
> > > > > > After clearly narrow the scope of this FLIP it looks good to me the
> > > > > > interface
> > > > > > Executor and its discovery so that I'm glad to see the vote thread.
> > > > > >
> > > > > > As you said, we should still discuss on implementation details but
> > I
> > > > > don't
> > > > > > think
> > > > > > it should be a blocker of the vote thread because a vote means we
> > > > > generally
> > > > > > agree on the motivation and overall design.
> > > > > >
> > > > > > As for Executor.execute() to be async, it is much better than we
> > keep
> > > > the
> > > > > > difference between sync/async in this level. But I'd like to note
> > that
> > > > it
> > > > > > only
> > > > > > works internally for now because user-facing interface is still
> > > > > env.execute
> > > > > > which block and return a JobExecutionResult. I'm afraid that there
> > are
> > > > > > several
> > > > > > people depends on the result for doing post execution process,
> > although
> > > > > it
> > > > > > doesn't
> > > > > > work on current per-job mode.
> > > > > >
> > > > > > Best,
> > > > > > tison.
> > > > > >
> > > > > >
> > > > > > Aljoscha Krettek  于2019年10月4日周五 下午4:40写道:
> > > > > >
> > > > > >> Do you all think we could agree on the basic executor primitives
> > and
> > > > > start
> > > > > >> voting on this FLIP? There are still some implementation details
> > but I
> > > > > >> think we can discuss/tackle them when we get to them and the
> > various
> > > > > people
> > > > > >> implementing this should be in close collaboration.
> > > > > >>
> > > > > >> Best,
> > > > > >> Aljoscha
> > > > > >>
> > > > > >>> On 4. Oct 2019, at 10:15, Aljoscha Krettek 
> > > > > wrote:
> > > > > >>>
> > > > > >>> Hi,
> > > > > >>>
> > > > > >>> I think the end goal is to have only one environment per API,
> > but I
> > > > > >> think we won’t be able to achieve that in the short-term because
> > of
> > > > > >> backwards compatibility. This is most notable with the context
> > > > > environment,
> > > > > >> preview environments etc.
> > > > > >>>
> > > > > >>> To keep this FLIP very slim we can make this only about the
> > executors
> > > > > >> and executor discovery. Anything else like job submission
> > semantics,
> > > > > >> detached mode, … can be tackled after this. If we don’t focus I’m
> > > > afraid
> > > > > >> this will drag on for quite a while.
> > > > > >>>
> > > > > >>> One thing I would like to propose to make this easier is to
> > change
> > > > > >> Executor.execute() to return a CompletableFuture and to completely
> > > > > remove
> > > > > >> the “detached” logic from ClusterClient. That way, the new
> > components
> > > > > make
> > > > > >> no distinction between “detached” and “attached” but we can still
> > do
> > > > it
> > > 

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-10 Thread Zili Chen
Thanks for your explanation Kostas.

I agree that Clients are independent from the Executors. From
your text I wonder one thing that whether Executor#execute
returns a cluster client or a job client? As discussed previously
I think conceptually it is a job client?

Best,
tison.


Kostas Kloudas  于2019年10月10日周四 下午5:08写道:

> Hi Tison,
>
> I would say that as a first step, and until we see that the interfaces
> we introduce cover all intended purposes, we keep the Executors
> non-public.
> From the previous discussion, I think that in general the Clients are
> independent from the Executors, as the Executors simply use the
> clients to submit jobs and return a cluster client.
>
> Cheers,
> Kostas
>
> On Wed, Oct 9, 2019 at 7:01 PM Zili Chen  wrote:
> >
> > Hi Kostas & Aljoscha,
> >
> > I'm drafting a plan exposing multi-layered clients. It is mainly about
> > how we distinguish different layers and what clients we're going to
> > expose.
> >
> > In FLIP-73 scope I'd like to ask a question that whether or not Executor
> > becomes a public interface that can be made use of by downstream
> > project developer? Or it just an internal concept for unifying job
> > submission?
> > If it is the latter, I'm feeling multi-layer client topic is totally
> > independent from
> > Executor.
> >
> > Best,
> > tison.
> >
> >
> > Thomas Weise  于2019年10月5日周六 上午12:17写道:
> >
> > > It might be useful to mention on FLIP-73 that the intention for
> > > Executor.execute is to be an asynchronous API once it becomes public
> and
> > > also refer to FLIP-74 as such.
> > >
> > >
> > > On Fri, Oct 4, 2019 at 2:52 AM Aljoscha Krettek 
> > > wrote:
> > >
> > > > Hi Tison,
> > > >
> > > > I agree, for now the async Executor.execute() is an internal detail
> but
> > > > during your work for FLIP-74 it will probably also reach the public
> API.
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > > > On 4. Oct 2019, at 11:39, Zili Chen  wrote:
> > > > >
> > > > > Hi Aljoscha,
> > > > >
> > > > > After clearly narrow the scope of this FLIP it looks good to me the
> > > > > interface
> > > > > Executor and its discovery so that I'm glad to see the vote thread.
> > > > >
> > > > > As you said, we should still discuss on implementation details but
> I
> > > > don't
> > > > > think
> > > > > it should be a blocker of the vote thread because a vote means we
> > > > generally
> > > > > agree on the motivation and overall design.
> > > > >
> > > > > As for Executor.execute() to be async, it is much better than we
> keep
> > > the
> > > > > difference between sync/async in this level. But I'd like to note
> that
> > > it
> > > > > only
> > > > > works internally for now because user-facing interface is still
> > > > env.execute
> > > > > which block and return a JobExecutionResult. I'm afraid that there
> are
> > > > > several
> > > > > people depends on the result for doing post execution process,
> although
> > > > it
> > > > > doesn't
> > > > > work on current per-job mode.
> > > > >
> > > > > Best,
> > > > > tison.
> > > > >
> > > > >
> > > > > Aljoscha Krettek  于2019年10月4日周五 下午4:40写道:
> > > > >
> > > > >> Do you all think we could agree on the basic executor primitives
> and
> > > > start
> > > > >> voting on this FLIP? There are still some implementation details
> but I
> > > > >> think we can discuss/tackle them when we get to them and the
> various
> > > > people
> > > > >> implementing this should be in close collaboration.
> > > > >>
> > > > >> Best,
> > > > >> Aljoscha
> > > > >>
> > > > >>> On 4. Oct 2019, at 10:15, Aljoscha Krettek 
> > > > wrote:
> > > > >>>
> > > > >>> Hi,
> > > > >>>
> > > > >>> I think the end goal is to have only one environment per API,
> but I
> > > > >> think we won’t be able to achieve that in the short-term because
> of
> > > > >> backwards compatibility. This is most notable with the context
> > > > environment,
> > > > >> preview environments etc.
> > > > >>>
> > > > >>> To keep this FLIP very slim we can make this only about the
> executors
> > > > >> and executor discovery. Anything else like job submission
> semantics,
> > > > >> detached mode, … can be tackled after this. If we don’t focus I’m
> > > afraid
> > > > >> this will drag on for quite a while.
> > > > >>>
> > > > >>> One thing I would like to propose to make this easier is to
> change
> > > > >> Executor.execute() to return a CompletableFuture and to completely
> > > > remove
> > > > >> the “detached” logic from ClusterClient. That way, the new
> components
> > > > make
> > > > >> no distinction between “detached” and “attached” but we can still
> do
> > > it
> > > > in
> > > > >> the CLI (via the ContextEnvironment) to support the existing
> > > “detached”
> > > > >> behaviour of the CLI that users expect. What do you think about
> this?
> > > > >>>
> > > > >>> Best,
> > > > >>> Aljoscha
> > > > >>>
> > > >  On 3. Oct 2019, at 10:03, Zili Chen 
> wrote:
> > > > 
> > > >  Thanks for your explanation Kostas to make it clear subtasks
> u

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-10 Thread Kostas Kloudas
Hi Tison,

I would say that as a first step, and until we see that the interfaces
we introduce cover all intended purposes, we keep the Executors
non-public.
>From the previous discussion, I think that in general the Clients are
independent from the Executors, as the Executors simply use the
clients to submit jobs and return a cluster client.

Cheers,
Kostas

On Wed, Oct 9, 2019 at 7:01 PM Zili Chen  wrote:
>
> Hi Kostas & Aljoscha,
>
> I'm drafting a plan exposing multi-layered clients. It is mainly about
> how we distinguish different layers and what clients we're going to
> expose.
>
> In FLIP-73 scope I'd like to ask a question that whether or not Executor
> becomes a public interface that can be made use of by downstream
> project developer? Or it just an internal concept for unifying job
> submission?
> If it is the latter, I'm feeling multi-layer client topic is totally
> independent from
> Executor.
>
> Best,
> tison.
>
>
> Thomas Weise  于2019年10月5日周六 上午12:17写道:
>
> > It might be useful to mention on FLIP-73 that the intention for
> > Executor.execute is to be an asynchronous API once it becomes public and
> > also refer to FLIP-74 as such.
> >
> >
> > On Fri, Oct 4, 2019 at 2:52 AM Aljoscha Krettek 
> > wrote:
> >
> > > Hi Tison,
> > >
> > > I agree, for now the async Executor.execute() is an internal detail but
> > > during your work for FLIP-74 it will probably also reach the public API.
> > >
> > > Best,
> > > Aljoscha
> > >
> > > > On 4. Oct 2019, at 11:39, Zili Chen  wrote:
> > > >
> > > > Hi Aljoscha,
> > > >
> > > > After clearly narrow the scope of this FLIP it looks good to me the
> > > > interface
> > > > Executor and its discovery so that I'm glad to see the vote thread.
> > > >
> > > > As you said, we should still discuss on implementation details but I
> > > don't
> > > > think
> > > > it should be a blocker of the vote thread because a vote means we
> > > generally
> > > > agree on the motivation and overall design.
> > > >
> > > > As for Executor.execute() to be async, it is much better than we keep
> > the
> > > > difference between sync/async in this level. But I'd like to note that
> > it
> > > > only
> > > > works internally for now because user-facing interface is still
> > > env.execute
> > > > which block and return a JobExecutionResult. I'm afraid that there are
> > > > several
> > > > people depends on the result for doing post execution process, although
> > > it
> > > > doesn't
> > > > work on current per-job mode.
> > > >
> > > > Best,
> > > > tison.
> > > >
> > > >
> > > > Aljoscha Krettek  于2019年10月4日周五 下午4:40写道:
> > > >
> > > >> Do you all think we could agree on the basic executor primitives and
> > > start
> > > >> voting on this FLIP? There are still some implementation details but I
> > > >> think we can discuss/tackle them when we get to them and the various
> > > people
> > > >> implementing this should be in close collaboration.
> > > >>
> > > >> Best,
> > > >> Aljoscha
> > > >>
> > > >>> On 4. Oct 2019, at 10:15, Aljoscha Krettek 
> > > wrote:
> > > >>>
> > > >>> Hi,
> > > >>>
> > > >>> I think the end goal is to have only one environment per API, but I
> > > >> think we won’t be able to achieve that in the short-term because of
> > > >> backwards compatibility. This is most notable with the context
> > > environment,
> > > >> preview environments etc.
> > > >>>
> > > >>> To keep this FLIP very slim we can make this only about the executors
> > > >> and executor discovery. Anything else like job submission semantics,
> > > >> detached mode, … can be tackled after this. If we don’t focus I’m
> > afraid
> > > >> this will drag on for quite a while.
> > > >>>
> > > >>> One thing I would like to propose to make this easier is to change
> > > >> Executor.execute() to return a CompletableFuture and to completely
> > > remove
> > > >> the “detached” logic from ClusterClient. That way, the new components
> > > make
> > > >> no distinction between “detached” and “attached” but we can still do
> > it
> > > in
> > > >> the CLI (via the ContextEnvironment) to support the existing
> > “detached”
> > > >> behaviour of the CLI that users expect. What do you think about this?
> > > >>>
> > > >>> Best,
> > > >>> Aljoscha
> > > >>>
> > >  On 3. Oct 2019, at 10:03, Zili Chen  wrote:
> > > 
> > >  Thanks for your explanation Kostas to make it clear subtasks under
> > > >> FLIP-73.
> > > 
> > >  As you described, changes of Environment are included in this FLIP.
> > > For
> > >  "each
> > >  API to have a single Environment", it could be helpful to describe
> > > which
> > >  APIs we'd
> > >  like to have after FLIP-73. And if we keep multiple Environments,
> > > shall
> > > >> we
> > >  keep the
> > >  way inject context environment for each API?
> > > 
> > > 
> > >  Kostas Kloudas  于2019年10月3日周四 下午1:44写道:
> > > 
> > > > Hi Tison,
> > > >
> > > > The changes that this FLIP propose are:
> > > > - the intro

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-09 Thread Zili Chen
Hi Kostas & Aljoscha,

I'm drafting a plan exposing multi-layered clients. It is mainly about
how we distinguish different layers and what clients we're going to
expose.

In FLIP-73 scope I'd like to ask a question that whether or not Executor
becomes a public interface that can be made use of by downstream
project developer? Or it just an internal concept for unifying job
submission?
If it is the latter, I'm feeling multi-layer client topic is totally
independent from
Executor.

Best,
tison.


Thomas Weise  于2019年10月5日周六 上午12:17写道:

> It might be useful to mention on FLIP-73 that the intention for
> Executor.execute is to be an asynchronous API once it becomes public and
> also refer to FLIP-74 as such.
>
>
> On Fri, Oct 4, 2019 at 2:52 AM Aljoscha Krettek 
> wrote:
>
> > Hi Tison,
> >
> > I agree, for now the async Executor.execute() is an internal detail but
> > during your work for FLIP-74 it will probably also reach the public API.
> >
> > Best,
> > Aljoscha
> >
> > > On 4. Oct 2019, at 11:39, Zili Chen  wrote:
> > >
> > > Hi Aljoscha,
> > >
> > > After clearly narrow the scope of this FLIP it looks good to me the
> > > interface
> > > Executor and its discovery so that I'm glad to see the vote thread.
> > >
> > > As you said, we should still discuss on implementation details but I
> > don't
> > > think
> > > it should be a blocker of the vote thread because a vote means we
> > generally
> > > agree on the motivation and overall design.
> > >
> > > As for Executor.execute() to be async, it is much better than we keep
> the
> > > difference between sync/async in this level. But I'd like to note that
> it
> > > only
> > > works internally for now because user-facing interface is still
> > env.execute
> > > which block and return a JobExecutionResult. I'm afraid that there are
> > > several
> > > people depends on the result for doing post execution process, although
> > it
> > > doesn't
> > > work on current per-job mode.
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Aljoscha Krettek  于2019年10月4日周五 下午4:40写道:
> > >
> > >> Do you all think we could agree on the basic executor primitives and
> > start
> > >> voting on this FLIP? There are still some implementation details but I
> > >> think we can discuss/tackle them when we get to them and the various
> > people
> > >> implementing this should be in close collaboration.
> > >>
> > >> Best,
> > >> Aljoscha
> > >>
> > >>> On 4. Oct 2019, at 10:15, Aljoscha Krettek 
> > wrote:
> > >>>
> > >>> Hi,
> > >>>
> > >>> I think the end goal is to have only one environment per API, but I
> > >> think we won’t be able to achieve that in the short-term because of
> > >> backwards compatibility. This is most notable with the context
> > environment,
> > >> preview environments etc.
> > >>>
> > >>> To keep this FLIP very slim we can make this only about the executors
> > >> and executor discovery. Anything else like job submission semantics,
> > >> detached mode, … can be tackled after this. If we don’t focus I’m
> afraid
> > >> this will drag on for quite a while.
> > >>>
> > >>> One thing I would like to propose to make this easier is to change
> > >> Executor.execute() to return a CompletableFuture and to completely
> > remove
> > >> the “detached” logic from ClusterClient. That way, the new components
> > make
> > >> no distinction between “detached” and “attached” but we can still do
> it
> > in
> > >> the CLI (via the ContextEnvironment) to support the existing
> “detached”
> > >> behaviour of the CLI that users expect. What do you think about this?
> > >>>
> > >>> Best,
> > >>> Aljoscha
> > >>>
> >  On 3. Oct 2019, at 10:03, Zili Chen  wrote:
> > 
> >  Thanks for your explanation Kostas to make it clear subtasks under
> > >> FLIP-73.
> > 
> >  As you described, changes of Environment are included in this FLIP.
> > For
> >  "each
> >  API to have a single Environment", it could be helpful to describe
> > which
> >  APIs we'd
> >  like to have after FLIP-73. And if we keep multiple Environments,
> > shall
> > >> we
> >  keep the
> >  way inject context environment for each API?
> > 
> > 
> >  Kostas Kloudas  于2019年10月3日周四 下午1:44写道:
> > 
> > > Hi Tison,
> > >
> > > The changes that this FLIP propose are:
> > > - the introduction of the Executor interface
> > > - the fact that everything in the current state of job submission
> in
> > > Flink can be defined through configuration parameters
> > > - implementation of Executors that do not change any of the
> semantics
> > > of the currently offered "modes" of job submission
> > >
> > > In this, and in the FLIP itself where the
> > > ExecutionEnvironment.execute() method is described, there are
> details
> > > about parts of the
> > > integration with the existing Flink code-base.
> > >
> > > So I am not sure what do you mean by making the "integration a
> > > follow-up discussion".
> > >
> >

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-04 Thread Thomas Weise
It might be useful to mention on FLIP-73 that the intention for
Executor.execute is to be an asynchronous API once it becomes public and
also refer to FLIP-74 as such.


On Fri, Oct 4, 2019 at 2:52 AM Aljoscha Krettek  wrote:

> Hi Tison,
>
> I agree, for now the async Executor.execute() is an internal detail but
> during your work for FLIP-74 it will probably also reach the public API.
>
> Best,
> Aljoscha
>
> > On 4. Oct 2019, at 11:39, Zili Chen  wrote:
> >
> > Hi Aljoscha,
> >
> > After clearly narrow the scope of this FLIP it looks good to me the
> > interface
> > Executor and its discovery so that I'm glad to see the vote thread.
> >
> > As you said, we should still discuss on implementation details but I
> don't
> > think
> > it should be a blocker of the vote thread because a vote means we
> generally
> > agree on the motivation and overall design.
> >
> > As for Executor.execute() to be async, it is much better than we keep the
> > difference between sync/async in this level. But I'd like to note that it
> > only
> > works internally for now because user-facing interface is still
> env.execute
> > which block and return a JobExecutionResult. I'm afraid that there are
> > several
> > people depends on the result for doing post execution process, although
> it
> > doesn't
> > work on current per-job mode.
> >
> > Best,
> > tison.
> >
> >
> > Aljoscha Krettek  于2019年10月4日周五 下午4:40写道:
> >
> >> Do you all think we could agree on the basic executor primitives and
> start
> >> voting on this FLIP? There are still some implementation details but I
> >> think we can discuss/tackle them when we get to them and the various
> people
> >> implementing this should be in close collaboration.
> >>
> >> Best,
> >> Aljoscha
> >>
> >>> On 4. Oct 2019, at 10:15, Aljoscha Krettek 
> wrote:
> >>>
> >>> Hi,
> >>>
> >>> I think the end goal is to have only one environment per API, but I
> >> think we won’t be able to achieve that in the short-term because of
> >> backwards compatibility. This is most notable with the context
> environment,
> >> preview environments etc.
> >>>
> >>> To keep this FLIP very slim we can make this only about the executors
> >> and executor discovery. Anything else like job submission semantics,
> >> detached mode, … can be tackled after this. If we don’t focus I’m afraid
> >> this will drag on for quite a while.
> >>>
> >>> One thing I would like to propose to make this easier is to change
> >> Executor.execute() to return a CompletableFuture and to completely
> remove
> >> the “detached” logic from ClusterClient. That way, the new components
> make
> >> no distinction between “detached” and “attached” but we can still do it
> in
> >> the CLI (via the ContextEnvironment) to support the existing “detached”
> >> behaviour of the CLI that users expect. What do you think about this?
> >>>
> >>> Best,
> >>> Aljoscha
> >>>
>  On 3. Oct 2019, at 10:03, Zili Chen  wrote:
> 
>  Thanks for your explanation Kostas to make it clear subtasks under
> >> FLIP-73.
> 
>  As you described, changes of Environment are included in this FLIP.
> For
>  "each
>  API to have a single Environment", it could be helpful to describe
> which
>  APIs we'd
>  like to have after FLIP-73. And if we keep multiple Environments,
> shall
> >> we
>  keep the
>  way inject context environment for each API?
> 
> 
>  Kostas Kloudas  于2019年10月3日周四 下午1:44写道:
> 
> > Hi Tison,
> >
> > The changes that this FLIP propose are:
> > - the introduction of the Executor interface
> > - the fact that everything in the current state of job submission in
> > Flink can be defined through configuration parameters
> > - implementation of Executors that do not change any of the semantics
> > of the currently offered "modes" of job submission
> >
> > In this, and in the FLIP itself where the
> > ExecutionEnvironment.execute() method is described, there are details
> > about parts of the
> > integration with the existing Flink code-base.
> >
> > So I am not sure what do you mean by making the "integration a
> > follow-up discussion".
> >
> > Cheers,
> > Kostas
> >
> > On Wed, Oct 2, 2019 at 8:10 PM Zili Chen 
> wrote:
> >>
> >> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> >> Executors work, as they are using the exexute() method because this
> is
> >> the only "entry" to the user program. To this regard, I believe we
> >> should just see the fact that they have their dedicated environment
> as
> >> an "implementation detail".
> >>
> >> The proposal says
> >>
> >> In this document, we propose to abstract away from the Environments
> >> the
> > job
> >> submission logic and put it in a newly introduced Executor. This
> will
> >> allow *each
> >> API to have a single Environment* which, based on the provided
> >> configuration, will 

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-04 Thread Aljoscha Krettek
Hi Tison,

I agree, for now the async Executor.execute() is an internal detail but during 
your work for FLIP-74 it will probably also reach the public API.

Best,
Aljoscha

> On 4. Oct 2019, at 11:39, Zili Chen  wrote:
> 
> Hi Aljoscha,
> 
> After clearly narrow the scope of this FLIP it looks good to me the
> interface
> Executor and its discovery so that I'm glad to see the vote thread.
> 
> As you said, we should still discuss on implementation details but I don't
> think
> it should be a blocker of the vote thread because a vote means we generally
> agree on the motivation and overall design.
> 
> As for Executor.execute() to be async, it is much better than we keep the
> difference between sync/async in this level. But I'd like to note that it
> only
> works internally for now because user-facing interface is still env.execute
> which block and return a JobExecutionResult. I'm afraid that there are
> several
> people depends on the result for doing post execution process, although it
> doesn't
> work on current per-job mode.
> 
> Best,
> tison.
> 
> 
> Aljoscha Krettek  于2019年10月4日周五 下午4:40写道:
> 
>> Do you all think we could agree on the basic executor primitives and start
>> voting on this FLIP? There are still some implementation details but I
>> think we can discuss/tackle them when we get to them and the various people
>> implementing this should be in close collaboration.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 4. Oct 2019, at 10:15, Aljoscha Krettek  wrote:
>>> 
>>> Hi,
>>> 
>>> I think the end goal is to have only one environment per API, but I
>> think we won’t be able to achieve that in the short-term because of
>> backwards compatibility. This is most notable with the context environment,
>> preview environments etc.
>>> 
>>> To keep this FLIP very slim we can make this only about the executors
>> and executor discovery. Anything else like job submission semantics,
>> detached mode, … can be tackled after this. If we don’t focus I’m afraid
>> this will drag on for quite a while.
>>> 
>>> One thing I would like to propose to make this easier is to change
>> Executor.execute() to return a CompletableFuture and to completely remove
>> the “detached” logic from ClusterClient. That way, the new components make
>> no distinction between “detached” and “attached” but we can still do it in
>> the CLI (via the ContextEnvironment) to support the existing “detached”
>> behaviour of the CLI that users expect. What do you think about this?
>>> 
>>> Best,
>>> Aljoscha
>>> 
 On 3. Oct 2019, at 10:03, Zili Chen  wrote:
 
 Thanks for your explanation Kostas to make it clear subtasks under
>> FLIP-73.
 
 As you described, changes of Environment are included in this FLIP. For
 "each
 API to have a single Environment", it could be helpful to describe which
 APIs we'd
 like to have after FLIP-73. And if we keep multiple Environments, shall
>> we
 keep the
 way inject context environment for each API?
 
 
 Kostas Kloudas  于2019年10月3日周四 下午1:44写道:
 
> Hi Tison,
> 
> The changes that this FLIP propose are:
> - the introduction of the Executor interface
> - the fact that everything in the current state of job submission in
> Flink can be defined through configuration parameters
> - implementation of Executors that do not change any of the semantics
> of the currently offered "modes" of job submission
> 
> In this, and in the FLIP itself where the
> ExecutionEnvironment.execute() method is described, there are details
> about parts of the
> integration with the existing Flink code-base.
> 
> So I am not sure what do you mean by making the "integration a
> follow-up discussion".
> 
> Cheers,
> Kostas
> 
> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen  wrote:
>> 
>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>> Executors work, as they are using the exexute() method because this is
>> the only "entry" to the user program. To this regard, I believe we
>> should just see the fact that they have their dedicated environment as
>> an "implementation detail".
>> 
>> The proposal says
>> 
>> In this document, we propose to abstract away from the Environments
>> the
> job
>> submission logic and put it in a newly introduced Executor. This will
>> allow *each
>> API to have a single Environment* which, based on the provided
>> configuration, will decide which executor to use, *e.g.* Yarn, Local,
> etc.
>> In addition, it will allow different APIs and downstream projects to
> re-use
>> the provided executors, thus limiting the amount of code duplication
>> and
>> the amount of code that has to be written.
>> 
>> note that This will allow *each API to have a single Environment*  it
>> seems a bit diverge with you statement above. Or we say a single
> Environment
>> as a possib

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-04 Thread Zili Chen
Hi Aljoscha,

After clearly narrow the scope of this FLIP it looks good to me the
interface
Executor and its discovery so that I'm glad to see the vote thread.

As you said, we should still discuss on implementation details but I don't
think
it should be a blocker of the vote thread because a vote means we generally
agree on the motivation and overall design.

As for Executor.execute() to be async, it is much better than we keep the
difference between sync/async in this level. But I'd like to note that it
only
works internally for now because user-facing interface is still env.execute
which block and return a JobExecutionResult. I'm afraid that there are
several
people depends on the result for doing post execution process, although it
doesn't
work on current per-job mode.

Best,
tison.


Aljoscha Krettek  于2019年10月4日周五 下午4:40写道:

> Do you all think we could agree on the basic executor primitives and start
> voting on this FLIP? There are still some implementation details but I
> think we can discuss/tackle them when we get to them and the various people
> implementing this should be in close collaboration.
>
> Best,
> Aljoscha
>
> > On 4. Oct 2019, at 10:15, Aljoscha Krettek  wrote:
> >
> > Hi,
> >
> > I think the end goal is to have only one environment per API, but I
> think we won’t be able to achieve that in the short-term because of
> backwards compatibility. This is most notable with the context environment,
> preview environments etc.
> >
> > To keep this FLIP very slim we can make this only about the executors
> and executor discovery. Anything else like job submission semantics,
> detached mode, … can be tackled after this. If we don’t focus I’m afraid
> this will drag on for quite a while.
> >
> > One thing I would like to propose to make this easier is to change
> Executor.execute() to return a CompletableFuture and to completely remove
> the “detached” logic from ClusterClient. That way, the new components make
> no distinction between “detached” and “attached” but we can still do it in
> the CLI (via the ContextEnvironment) to support the existing “detached”
> behaviour of the CLI that users expect. What do you think about this?
> >
> > Best,
> > Aljoscha
> >
> >> On 3. Oct 2019, at 10:03, Zili Chen  wrote:
> >>
> >> Thanks for your explanation Kostas to make it clear subtasks under
> FLIP-73.
> >>
> >> As you described, changes of Environment are included in this FLIP. For
> >> "each
> >> API to have a single Environment", it could be helpful to describe which
> >> APIs we'd
> >> like to have after FLIP-73. And if we keep multiple Environments, shall
> we
> >> keep the
> >> way inject context environment for each API?
> >>
> >>
> >> Kostas Kloudas  于2019年10月3日周四 下午1:44写道:
> >>
> >>> Hi Tison,
> >>>
> >>> The changes that this FLIP propose are:
> >>> - the introduction of the Executor interface
> >>> - the fact that everything in the current state of job submission in
> >>> Flink can be defined through configuration parameters
> >>> - implementation of Executors that do not change any of the semantics
> >>> of the currently offered "modes" of job submission
> >>>
> >>> In this, and in the FLIP itself where the
> >>> ExecutionEnvironment.execute() method is described, there are details
> >>> about parts of the
> >>> integration with the existing Flink code-base.
> >>>
> >>> So I am not sure what do you mean by making the "integration a
> >>> follow-up discussion".
> >>>
> >>> Cheers,
> >>> Kostas
> >>>
> >>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen  wrote:
> 
>  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>  Executors work, as they are using the exexute() method because this is
>  the only "entry" to the user program. To this regard, I believe we
>  should just see the fact that they have their dedicated environment as
>  an "implementation detail".
> 
>  The proposal says
> 
>  In this document, we propose to abstract away from the Environments
> the
> >>> job
>  submission logic and put it in a newly introduced Executor. This will
>  allow *each
>  API to have a single Environment* which, based on the provided
>  configuration, will decide which executor to use, *e.g.* Yarn, Local,
> >>> etc.
>  In addition, it will allow different APIs and downstream projects to
> >>> re-use
>  the provided executors, thus limiting the amount of code duplication
> and
>  the amount of code that has to be written.
> 
>  note that This will allow *each API to have a single Environment*  it
>  seems a bit diverge with you statement above. Or we say a single
> >>> Environment
>  as a possible advantage after the introduction of Executor so that we
>  exclude it
>  from this pass.
> 
>  Best,
>  tison.
> 
> 
>  Zili Chen  于2019年10月3日周四 上午2:07写道:
> 
> > BTW, correct me if I misunderstand, now I learn more about our
> >>> community
> > way. Since FLIP-73 aimed at in

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-04 Thread Aljoscha Krettek
Do you all think we could agree on the basic executor primitives and start 
voting on this FLIP? There are still some implementation details but I think we 
can discuss/tackle them when we get to them and the various people implementing 
this should be in close collaboration.

Best,
Aljoscha

> On 4. Oct 2019, at 10:15, Aljoscha Krettek  wrote:
> 
> Hi,
> 
> I think the end goal is to have only one environment per API, but I think we 
> won’t be able to achieve that in the short-term because of backwards 
> compatibility. This is most notable with the context environment, preview 
> environments etc.
> 
> To keep this FLIP very slim we can make this only about the executors and 
> executor discovery. Anything else like job submission semantics, detached 
> mode, … can be tackled after this. If we don’t focus I’m afraid this will 
> drag on for quite a while.
> 
> One thing I would like to propose to make this easier is to change 
> Executor.execute() to return a CompletableFuture and to completely remove the 
> “detached” logic from ClusterClient. That way, the new components make no 
> distinction between “detached” and “attached” but we can still do it in the 
> CLI (via the ContextEnvironment) to support the existing “detached” behaviour 
> of the CLI that users expect. What do you think about this?
> 
> Best,
> Aljoscha
> 
>> On 3. Oct 2019, at 10:03, Zili Chen  wrote:
>> 
>> Thanks for your explanation Kostas to make it clear subtasks under FLIP-73.
>> 
>> As you described, changes of Environment are included in this FLIP. For
>> "each
>> API to have a single Environment", it could be helpful to describe which
>> APIs we'd
>> like to have after FLIP-73. And if we keep multiple Environments, shall we
>> keep the
>> way inject context environment for each API?
>> 
>> 
>> Kostas Kloudas  于2019年10月3日周四 下午1:44写道:
>> 
>>> Hi Tison,
>>> 
>>> The changes that this FLIP propose are:
>>> - the introduction of the Executor interface
>>> - the fact that everything in the current state of job submission in
>>> Flink can be defined through configuration parameters
>>> - implementation of Executors that do not change any of the semantics
>>> of the currently offered "modes" of job submission
>>> 
>>> In this, and in the FLIP itself where the
>>> ExecutionEnvironment.execute() method is described, there are details
>>> about parts of the
>>> integration with the existing Flink code-base.
>>> 
>>> So I am not sure what do you mean by making the "integration a
>>> follow-up discussion".
>>> 
>>> Cheers,
>>> Kostas
>>> 
>>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen  wrote:
 
 - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
 Executors work, as they are using the exexute() method because this is
 the only "entry" to the user program. To this regard, I believe we
 should just see the fact that they have their dedicated environment as
 an "implementation detail".
 
 The proposal says
 
 In this document, we propose to abstract away from the Environments the
>>> job
 submission logic and put it in a newly introduced Executor. This will
 allow *each
 API to have a single Environment* which, based on the provided
 configuration, will decide which executor to use, *e.g.* Yarn, Local,
>>> etc.
 In addition, it will allow different APIs and downstream projects to
>>> re-use
 the provided executors, thus limiting the amount of code duplication and
 the amount of code that has to be written.
 
 note that This will allow *each API to have a single Environment*  it
 seems a bit diverge with you statement above. Or we say a single
>>> Environment
 as a possible advantage after the introduction of Executor so that we
 exclude it
 from this pass.
 
 Best,
 tison.
 
 
 Zili Chen  于2019年10月3日周四 上午2:07写道:
 
> BTW, correct me if I misunderstand, now I learn more about our
>>> community
> way. Since FLIP-73 aimed at introducing an interface with community
> consensus the discussion is more about the interface in order to
>>> properly
> define a useful and extensible API. The integration story could be a
> follow up
> since this one does not affect current behavior at all.
> 
> Best,
> tison.
> 
> 
> Zili Chen  于2019年10月3日周四 上午2:02写道:
> 
>> Hi Kostas,
>> 
>> It seems does no harm we have a configuration parameter of
>> Executor#execute
>> since we can merge this one with the one configured on Executor
>>> created
>> and
>> let this one overwhelm that one.
>> 
>> I can see it is useful that conceptually we can create an Executor
>>> for a
>> series jobs
>> to the same cluster but with different job configuration per pipeline.
>> 
>> Best,
>> tison.
>> 
>> 
>> Kostas Kloudas  于2019年10月3日周四 上午1:37写道:
>> 
>>> Hi again,
>>> 
>>> I did not include this to my previous email, as th

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-04 Thread Aljoscha Krettek
Hi,

I think the end goal is to have only one environment per API, but I think we 
won’t be able to achieve that in the short-term because of backwards 
compatibility. This is most notable with the context environment, preview 
environments etc.

To keep this FLIP very slim we can make this only about the executors and 
executor discovery. Anything else like job submission semantics, detached mode, 
… can be tackled after this. If we don’t focus I’m afraid this will drag on for 
quite a while.

One thing I would like to propose to make this easier is to change 
Executor.execute() to return a CompletableFuture and to completely remove the 
“detached” logic from ClusterClient. That way, the new components make no 
distinction between “detached” and “attached” but we can still do it in the CLI 
(via the ContextEnvironment) to support the existing “detached” behaviour of 
the CLI that users expect. What do you think about this?

Best,
Aljoscha

> On 3. Oct 2019, at 10:03, Zili Chen  wrote:
> 
> Thanks for your explanation Kostas to make it clear subtasks under FLIP-73.
> 
> As you described, changes of Environment are included in this FLIP. For
> "each
> API to have a single Environment", it could be helpful to describe which
> APIs we'd
> like to have after FLIP-73. And if we keep multiple Environments, shall we
> keep the
> way inject context environment for each API?
> 
> 
> Kostas Kloudas  于2019年10月3日周四 下午1:44写道:
> 
>> Hi Tison,
>> 
>> The changes that this FLIP propose are:
>> - the introduction of the Executor interface
>> - the fact that everything in the current state of job submission in
>> Flink can be defined through configuration parameters
>> - implementation of Executors that do not change any of the semantics
>> of the currently offered "modes" of job submission
>> 
>> In this, and in the FLIP itself where the
>> ExecutionEnvironment.execute() method is described, there are details
>> about parts of the
>> integration with the existing Flink code-base.
>> 
>> So I am not sure what do you mean by making the "integration a
>> follow-up discussion".
>> 
>> Cheers,
>> Kostas
>> 
>> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen  wrote:
>>> 
>>> - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>>> Executors work, as they are using the exexute() method because this is
>>> the only "entry" to the user program. To this regard, I believe we
>>> should just see the fact that they have their dedicated environment as
>>> an "implementation detail".
>>> 
>>> The proposal says
>>> 
>>> In this document, we propose to abstract away from the Environments the
>> job
>>> submission logic and put it in a newly introduced Executor. This will
>>> allow *each
>>> API to have a single Environment* which, based on the provided
>>> configuration, will decide which executor to use, *e.g.* Yarn, Local,
>> etc.
>>> In addition, it will allow different APIs and downstream projects to
>> re-use
>>> the provided executors, thus limiting the amount of code duplication and
>>> the amount of code that has to be written.
>>> 
>>> note that This will allow *each API to have a single Environment*  it
>>> seems a bit diverge with you statement above. Or we say a single
>> Environment
>>> as a possible advantage after the introduction of Executor so that we
>>> exclude it
>>> from this pass.
>>> 
>>> Best,
>>> tison.
>>> 
>>> 
>>> Zili Chen  于2019年10月3日周四 上午2:07写道:
>>> 
 BTW, correct me if I misunderstand, now I learn more about our
>> community
 way. Since FLIP-73 aimed at introducing an interface with community
 consensus the discussion is more about the interface in order to
>> properly
 define a useful and extensible API. The integration story could be a
 follow up
 since this one does not affect current behavior at all.
 
 Best,
 tison.
 
 
 Zili Chen  于2019年10月3日周四 上午2:02写道:
 
> Hi Kostas,
> 
> It seems does no harm we have a configuration parameter of
> Executor#execute
> since we can merge this one with the one configured on Executor
>> created
> and
> let this one overwhelm that one.
> 
> I can see it is useful that conceptually we can create an Executor
>> for a
> series jobs
> to the same cluster but with different job configuration per pipeline.
> 
> Best,
> tison.
> 
> 
> Kostas Kloudas  于2019年10月3日周四 上午1:37写道:
> 
>> Hi again,
>> 
>> I did not include this to my previous email, as this is related to
>> the
>> proposal on the FLIP itself.
>> 
>> In the existing proposal, the Executor interface is the following.
>> 
>> public interface Executor {
>> 
>>  JobExecutionResult execute(Pipeline pipeline) throws Exception;
>> 
>> }
>> 
>> This implies that all the necessary information for the execution of
>> a
>> Pipeline should be included in the Configuration passed in the
>> ExecutorFactory which instantiates the Executor itself

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-03 Thread Zili Chen
Thanks for your explanation Kostas to make it clear subtasks under FLIP-73.

As you described, changes of Environment are included in this FLIP. For
"each
API to have a single Environment", it could be helpful to describe which
APIs we'd
like to have after FLIP-73. And if we keep multiple Environments, shall we
keep the
way inject context environment for each API?


Kostas Kloudas  于2019年10月3日周四 下午1:44写道:

> Hi Tison,
>
> The changes that this FLIP propose are:
> - the introduction of the Executor interface
> - the fact that everything in the current state of job submission in
> Flink can be defined through configuration parameters
> - implementation of Executors that do not change any of the semantics
> of the currently offered "modes" of job submission
>
> In this, and in the FLIP itself where the
> ExecutionEnvironment.execute() method is described, there are details
> about parts of the
> integration with the existing Flink code-base.
>
> So I am not sure what do you mean by making the "integration a
> follow-up discussion".
>
> Cheers,
> Kostas
>
> On Wed, Oct 2, 2019 at 8:10 PM Zili Chen  wrote:
> >
> >  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> > Executors work, as they are using the exexute() method because this is
> > the only "entry" to the user program. To this regard, I believe we
> > should just see the fact that they have their dedicated environment as
> > an "implementation detail".
> >
> > The proposal says
> >
> > In this document, we propose to abstract away from the Environments the
> job
> > submission logic and put it in a newly introduced Executor. This will
> > allow *each
> > API to have a single Environment* which, based on the provided
> > configuration, will decide which executor to use, *e.g.* Yarn, Local,
> etc.
> > In addition, it will allow different APIs and downstream projects to
> re-use
> > the provided executors, thus limiting the amount of code duplication and
> > the amount of code that has to be written.
> >
> > note that This will allow *each API to have a single Environment*  it
> > seems a bit diverge with you statement above. Or we say a single
> Environment
> > as a possible advantage after the introduction of Executor so that we
> > exclude it
> > from this pass.
> >
> > Best,
> > tison.
> >
> >
> > Zili Chen  于2019年10月3日周四 上午2:07写道:
> >
> > > BTW, correct me if I misunderstand, now I learn more about our
> community
> > > way. Since FLIP-73 aimed at introducing an interface with community
> > > consensus the discussion is more about the interface in order to
> properly
> > > define a useful and extensible API. The integration story could be a
> > > follow up
> > > since this one does not affect current behavior at all.
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Zili Chen  于2019年10月3日周四 上午2:02写道:
> > >
> > >> Hi Kostas,
> > >>
> > >> It seems does no harm we have a configuration parameter of
> > >> Executor#execute
> > >> since we can merge this one with the one configured on Executor
> created
> > >> and
> > >> let this one overwhelm that one.
> > >>
> > >> I can see it is useful that conceptually we can create an Executor
> for a
> > >> series jobs
> > >> to the same cluster but with different job configuration per pipeline.
> > >>
> > >> Best,
> > >> tison.
> > >>
> > >>
> > >> Kostas Kloudas  于2019年10月3日周四 上午1:37写道:
> > >>
> > >>> Hi again,
> > >>>
> > >>> I did not include this to my previous email, as this is related to
> the
> > >>> proposal on the FLIP itself.
> > >>>
> > >>> In the existing proposal, the Executor interface is the following.
> > >>>
> > >>> public interface Executor {
> > >>>
> > >>>   JobExecutionResult execute(Pipeline pipeline) throws Exception;
> > >>>
> > >>> }
> > >>>
> > >>> This implies that all the necessary information for the execution of
> a
> > >>> Pipeline should be included in the Configuration passed in the
> > >>> ExecutorFactory which instantiates the Executor itself. This should
> > >>> include, for example, all the parameters currently supplied by the
> > >>> ProgramOptions, which are conceptually not executor parameters but
> > >>> rather parameters for the execution of the specific pipeline. To this
> > >>> end, I would like to propose a change in the current Executor
> > >>> interface showcased below:
> > >>>
> > >>>
> > >>> public interface Executor {
> > >>>
> > >>>   JobExecutionResult execute(Pipeline pipeline, Configuration
> > >>> executionOptions) throws Exception;
> > >>>
> > >>> }
> > >>>
> > >>> The above will allow to have the Executor specific options passed in
> > >>> the configuration given during executor instantiation, while the
> > >>> pipeline specific options can be passed in the executionOptions. As a
> > >>> positive side-effect, this will make Executors re-usable, i.e.
> > >>> instantiate an executor and use it to execute multiple pipelines, if
> > >>> in the future we choose to do so.
> > >>>
> > >>> Let me know what do you think,
> > >>> Kostas
> > >>>
> > >>

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-02 Thread Kostas Kloudas
Hi Tison,

The changes that this FLIP propose are:
- the introduction of the Executor interface
- the fact that everything in the current state of job submission in
Flink can be defined through configuration parameters
- implementation of Executors that do not change any of the semantics
of the currently offered "modes" of job submission

In this, and in the FLIP itself where the
ExecutionEnvironment.execute() method is described, there are details
about parts of the
integration with the existing Flink code-base.

So I am not sure what do you mean by making the "integration a
follow-up discussion".

Cheers,
Kostas

On Wed, Oct 2, 2019 at 8:10 PM Zili Chen  wrote:
>
>  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> Executors work, as they are using the exexute() method because this is
> the only "entry" to the user program. To this regard, I believe we
> should just see the fact that they have their dedicated environment as
> an "implementation detail".
>
> The proposal says
>
> In this document, we propose to abstract away from the Environments the job
> submission logic and put it in a newly introduced Executor. This will
> allow *each
> API to have a single Environment* which, based on the provided
> configuration, will decide which executor to use, *e.g.* Yarn, Local, etc.
> In addition, it will allow different APIs and downstream projects to re-use
> the provided executors, thus limiting the amount of code duplication and
> the amount of code that has to be written.
>
> note that This will allow *each API to have a single Environment*  it
> seems a bit diverge with you statement above. Or we say a single Environment
> as a possible advantage after the introduction of Executor so that we
> exclude it
> from this pass.
>
> Best,
> tison.
>
>
> Zili Chen  于2019年10月3日周四 上午2:07写道:
>
> > BTW, correct me if I misunderstand, now I learn more about our community
> > way. Since FLIP-73 aimed at introducing an interface with community
> > consensus the discussion is more about the interface in order to properly
> > define a useful and extensible API. The integration story could be a
> > follow up
> > since this one does not affect current behavior at all.
> >
> > Best,
> > tison.
> >
> >
> > Zili Chen  于2019年10月3日周四 上午2:02写道:
> >
> >> Hi Kostas,
> >>
> >> It seems does no harm we have a configuration parameter of
> >> Executor#execute
> >> since we can merge this one with the one configured on Executor created
> >> and
> >> let this one overwhelm that one.
> >>
> >> I can see it is useful that conceptually we can create an Executor for a
> >> series jobs
> >> to the same cluster but with different job configuration per pipeline.
> >>
> >> Best,
> >> tison.
> >>
> >>
> >> Kostas Kloudas  于2019年10月3日周四 上午1:37写道:
> >>
> >>> Hi again,
> >>>
> >>> I did not include this to my previous email, as this is related to the
> >>> proposal on the FLIP itself.
> >>>
> >>> In the existing proposal, the Executor interface is the following.
> >>>
> >>> public interface Executor {
> >>>
> >>>   JobExecutionResult execute(Pipeline pipeline) throws Exception;
> >>>
> >>> }
> >>>
> >>> This implies that all the necessary information for the execution of a
> >>> Pipeline should be included in the Configuration passed in the
> >>> ExecutorFactory which instantiates the Executor itself. This should
> >>> include, for example, all the parameters currently supplied by the
> >>> ProgramOptions, which are conceptually not executor parameters but
> >>> rather parameters for the execution of the specific pipeline. To this
> >>> end, I would like to propose a change in the current Executor
> >>> interface showcased below:
> >>>
> >>>
> >>> public interface Executor {
> >>>
> >>>   JobExecutionResult execute(Pipeline pipeline, Configuration
> >>> executionOptions) throws Exception;
> >>>
> >>> }
> >>>
> >>> The above will allow to have the Executor specific options passed in
> >>> the configuration given during executor instantiation, while the
> >>> pipeline specific options can be passed in the executionOptions. As a
> >>> positive side-effect, this will make Executors re-usable, i.e.
> >>> instantiate an executor and use it to execute multiple pipelines, if
> >>> in the future we choose to do so.
> >>>
> >>> Let me know what do you think,
> >>> Kostas
> >>>
> >>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas 
> >>> wrote:
> >>> >
> >>> > Hi all,
> >>> >
> >>> > I agree with Tison that we should disentangle threads so that people
> >>> > can work independently.
> >>> >
> >>> > For FLIP-73:
> >>> >  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> >>> > Executors work, as they are using the exexute() method because this is
> >>> > the only "entry" to the user program. To this regard, I believe we
> >>> > should just see the fact that they have their dedicated environment as
> >>> > an "implementation detail".
> >>> >  - for getting rid of the per-job mode: as a first note, there was
> >>> > already a d

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-02 Thread Zili Chen
 - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
Executors work, as they are using the exexute() method because this is
the only "entry" to the user program. To this regard, I believe we
should just see the fact that they have their dedicated environment as
an "implementation detail".

The proposal says

In this document, we propose to abstract away from the Environments the job
submission logic and put it in a newly introduced Executor. This will
allow *each
API to have a single Environment* which, based on the provided
configuration, will decide which executor to use, *e.g.* Yarn, Local, etc.
In addition, it will allow different APIs and downstream projects to re-use
the provided executors, thus limiting the amount of code duplication and
the amount of code that has to be written.

note that This will allow *each API to have a single Environment*  it
seems a bit diverge with you statement above. Or we say a single Environment
as a possible advantage after the introduction of Executor so that we
exclude it
from this pass.

Best,
tison.


Zili Chen  于2019年10月3日周四 上午2:07写道:

> BTW, correct me if I misunderstand, now I learn more about our community
> way. Since FLIP-73 aimed at introducing an interface with community
> consensus the discussion is more about the interface in order to properly
> define a useful and extensible API. The integration story could be a
> follow up
> since this one does not affect current behavior at all.
>
> Best,
> tison.
>
>
> Zili Chen  于2019年10月3日周四 上午2:02写道:
>
>> Hi Kostas,
>>
>> It seems does no harm we have a configuration parameter of
>> Executor#execute
>> since we can merge this one with the one configured on Executor created
>> and
>> let this one overwhelm that one.
>>
>> I can see it is useful that conceptually we can create an Executor for a
>> series jobs
>> to the same cluster but with different job configuration per pipeline.
>>
>> Best,
>> tison.
>>
>>
>> Kostas Kloudas  于2019年10月3日周四 上午1:37写道:
>>
>>> Hi again,
>>>
>>> I did not include this to my previous email, as this is related to the
>>> proposal on the FLIP itself.
>>>
>>> In the existing proposal, the Executor interface is the following.
>>>
>>> public interface Executor {
>>>
>>>   JobExecutionResult execute(Pipeline pipeline) throws Exception;
>>>
>>> }
>>>
>>> This implies that all the necessary information for the execution of a
>>> Pipeline should be included in the Configuration passed in the
>>> ExecutorFactory which instantiates the Executor itself. This should
>>> include, for example, all the parameters currently supplied by the
>>> ProgramOptions, which are conceptually not executor parameters but
>>> rather parameters for the execution of the specific pipeline. To this
>>> end, I would like to propose a change in the current Executor
>>> interface showcased below:
>>>
>>>
>>> public interface Executor {
>>>
>>>   JobExecutionResult execute(Pipeline pipeline, Configuration
>>> executionOptions) throws Exception;
>>>
>>> }
>>>
>>> The above will allow to have the Executor specific options passed in
>>> the configuration given during executor instantiation, while the
>>> pipeline specific options can be passed in the executionOptions. As a
>>> positive side-effect, this will make Executors re-usable, i.e.
>>> instantiate an executor and use it to execute multiple pipelines, if
>>> in the future we choose to do so.
>>>
>>> Let me know what do you think,
>>> Kostas
>>>
>>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas 
>>> wrote:
>>> >
>>> > Hi all,
>>> >
>>> > I agree with Tison that we should disentangle threads so that people
>>> > can work independently.
>>> >
>>> > For FLIP-73:
>>> >  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>>> > Executors work, as they are using the exexute() method because this is
>>> > the only "entry" to the user program. To this regard, I believe we
>>> > should just see the fact that they have their dedicated environment as
>>> > an "implementation detail".
>>> >  - for getting rid of the per-job mode: as a first note, there was
>>> > already a discussion here:
>>> >
>>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>>> > with many people, including myself, expressing their opinion. I am
>>> > mentioning that to show that this topic already has some history and
>>> > the discussin does not start from scratch but there are already some
>>> > contradicting opinions. My opinion is that we should not get rid of
>>> > the per-job mode but I agree that we should discuss about the
>>> > semantics in more detail. Although in terms of code it may be tempting
>>> > to "merge" the two submission modes, one of the main benefits of the
>>> > per-job mode is isolation, both for resources and security, as the
>>> > jobGraph to be executed is fixed and the cluster is "locked" just for
>>> > that specific graph. This would be violated by having a session
>>> > cluster la

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-02 Thread Zili Chen
BTW, correct me if I misunderstand, now I learn more about our community
way. Since FLIP-73 aimed at introducing an interface with community
consensus the discussion is more about the interface in order to properly
define a useful and extensible API. The integration story could be a follow
up
since this one does not affect current behavior at all.

Best,
tison.


Zili Chen  于2019年10月3日周四 上午2:02写道:

> Hi Kostas,
>
> It seems does no harm we have a configuration parameter of Executor#execute
> since we can merge this one with the one configured on Executor created and
> let this one overwhelm that one.
>
> I can see it is useful that conceptually we can create an Executor for a
> series jobs
> to the same cluster but with different job configuration per pipeline.
>
> Best,
> tison.
>
>
> Kostas Kloudas  于2019年10月3日周四 上午1:37写道:
>
>> Hi again,
>>
>> I did not include this to my previous email, as this is related to the
>> proposal on the FLIP itself.
>>
>> In the existing proposal, the Executor interface is the following.
>>
>> public interface Executor {
>>
>>   JobExecutionResult execute(Pipeline pipeline) throws Exception;
>>
>> }
>>
>> This implies that all the necessary information for the execution of a
>> Pipeline should be included in the Configuration passed in the
>> ExecutorFactory which instantiates the Executor itself. This should
>> include, for example, all the parameters currently supplied by the
>> ProgramOptions, which are conceptually not executor parameters but
>> rather parameters for the execution of the specific pipeline. To this
>> end, I would like to propose a change in the current Executor
>> interface showcased below:
>>
>>
>> public interface Executor {
>>
>>   JobExecutionResult execute(Pipeline pipeline, Configuration
>> executionOptions) throws Exception;
>>
>> }
>>
>> The above will allow to have the Executor specific options passed in
>> the configuration given during executor instantiation, while the
>> pipeline specific options can be passed in the executionOptions. As a
>> positive side-effect, this will make Executors re-usable, i.e.
>> instantiate an executor and use it to execute multiple pipelines, if
>> in the future we choose to do so.
>>
>> Let me know what do you think,
>> Kostas
>>
>> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas 
>> wrote:
>> >
>> > Hi all,
>> >
>> > I agree with Tison that we should disentangle threads so that people
>> > can work independently.
>> >
>> > For FLIP-73:
>> >  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
>> > Executors work, as they are using the exexute() method because this is
>> > the only "entry" to the user program. To this regard, I believe we
>> > should just see the fact that they have their dedicated environment as
>> > an "implementation detail".
>> >  - for getting rid of the per-job mode: as a first note, there was
>> > already a discussion here:
>> >
>> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>> > with many people, including myself, expressing their opinion. I am
>> > mentioning that to show that this topic already has some history and
>> > the discussin does not start from scratch but there are already some
>> > contradicting opinions. My opinion is that we should not get rid of
>> > the per-job mode but I agree that we should discuss about the
>> > semantics in more detail. Although in terms of code it may be tempting
>> > to "merge" the two submission modes, one of the main benefits of the
>> > per-job mode is isolation, both for resources and security, as the
>> > jobGraph to be executed is fixed and the cluster is "locked" just for
>> > that specific graph. This would be violated by having a session
>> > cluster launched and having all the infrastrucutre (ports and
>> > endpoints) set for submittting to that cluster any job.
>> > - for getting rid of the "detached" mode: I agree with getting rid of
>> > it but this implies some potential user-facing changes that should be
>> > discussed.
>> >
>> > Given the above, I think that:
>> > 1) in the context of FLIP-73 we should not change any semantics but
>> > simply push the existing submission logic behind a reusable
>> > abstraction and make it usable via public APIs, as Aljoscha said.
>> > 2) as Till said, changing the semantics is beyond the scope of this
>> > FLIP and as Tison mentioned we should work towards decoupling
>> > discussions rather than the opposite. So let's discuss about the
>> > future of the per-job and detached modes in a separate thread. This
>> > will also allow to give the proper visibility to such an important
>> > topic.
>> >
>> > Cheers,
>> > Kostas
>> >
>> > On Wed, Oct 2, 2019 at 4:40 PM Zili Chen  wrote:
>> > >
>> > > Thanks for your thoughts Aljoscha.
>> > >
>> > > Another question since FLIP-73 might contains refactors on
>> Environemnt:
>> > > shall we support
>> > > something like PreviewPlanEnvironment? If so, how? From a user
>> per

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-02 Thread Zili Chen
Hi Kostas,

It seems does no harm we have a configuration parameter of Executor#execute
since we can merge this one with the one configured on Executor created and
let this one overwhelm that one.

I can see it is useful that conceptually we can create an Executor for a
series jobs
to the same cluster but with different job configuration per pipeline.

Best,
tison.


Kostas Kloudas  于2019年10月3日周四 上午1:37写道:

> Hi again,
>
> I did not include this to my previous email, as this is related to the
> proposal on the FLIP itself.
>
> In the existing proposal, the Executor interface is the following.
>
> public interface Executor {
>
>   JobExecutionResult execute(Pipeline pipeline) throws Exception;
>
> }
>
> This implies that all the necessary information for the execution of a
> Pipeline should be included in the Configuration passed in the
> ExecutorFactory which instantiates the Executor itself. This should
> include, for example, all the parameters currently supplied by the
> ProgramOptions, which are conceptually not executor parameters but
> rather parameters for the execution of the specific pipeline. To this
> end, I would like to propose a change in the current Executor
> interface showcased below:
>
>
> public interface Executor {
>
>   JobExecutionResult execute(Pipeline pipeline, Configuration
> executionOptions) throws Exception;
>
> }
>
> The above will allow to have the Executor specific options passed in
> the configuration given during executor instantiation, while the
> pipeline specific options can be passed in the executionOptions. As a
> positive side-effect, this will make Executors re-usable, i.e.
> instantiate an executor and use it to execute multiple pipelines, if
> in the future we choose to do so.
>
> Let me know what do you think,
> Kostas
>
> On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas  wrote:
> >
> > Hi all,
> >
> > I agree with Tison that we should disentangle threads so that people
> > can work independently.
> >
> > For FLIP-73:
> >  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> > Executors work, as they are using the exexute() method because this is
> > the only "entry" to the user program. To this regard, I believe we
> > should just see the fact that they have their dedicated environment as
> > an "implementation detail".
> >  - for getting rid of the per-job mode: as a first note, there was
> > already a discussion here:
> >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > with many people, including myself, expressing their opinion. I am
> > mentioning that to show that this topic already has some history and
> > the discussin does not start from scratch but there are already some
> > contradicting opinions. My opinion is that we should not get rid of
> > the per-job mode but I agree that we should discuss about the
> > semantics in more detail. Although in terms of code it may be tempting
> > to "merge" the two submission modes, one of the main benefits of the
> > per-job mode is isolation, both for resources and security, as the
> > jobGraph to be executed is fixed and the cluster is "locked" just for
> > that specific graph. This would be violated by having a session
> > cluster launched and having all the infrastrucutre (ports and
> > endpoints) set for submittting to that cluster any job.
> > - for getting rid of the "detached" mode: I agree with getting rid of
> > it but this implies some potential user-facing changes that should be
> > discussed.
> >
> > Given the above, I think that:
> > 1) in the context of FLIP-73 we should not change any semantics but
> > simply push the existing submission logic behind a reusable
> > abstraction and make it usable via public APIs, as Aljoscha said.
> > 2) as Till said, changing the semantics is beyond the scope of this
> > FLIP and as Tison mentioned we should work towards decoupling
> > discussions rather than the opposite. So let's discuss about the
> > future of the per-job and detached modes in a separate thread. This
> > will also allow to give the proper visibility to such an important
> > topic.
> >
> > Cheers,
> > Kostas
> >
> > On Wed, Oct 2, 2019 at 4:40 PM Zili Chen  wrote:
> > >
> > > Thanks for your thoughts Aljoscha.
> > >
> > > Another question since FLIP-73 might contains refactors on Environemnt:
> > > shall we support
> > > something like PreviewPlanEnvironment? If so, how? From a user
> perspective
> > > preview plan
> > > is useful, by give visual view, to modify topos and configure without
> > > submit it.
> > >
> > > Best,
> > > tison.
> > >
> > >
> > > Aljoscha Krettek  于2019年10月2日周三 下午10:10写道:
> > >
> > > > I agree with Till that we should not change the semantics of per-job
> mode.
> > > > In my opinion per-job mode means that the cluster (JobManager) is
> brought
> > > > up with one job and it only executes that one job. There should be
> no open
> > > > ports/anything that would allow submitting furth

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-02 Thread Kostas Kloudas
Hi again,

I did not include this to my previous email, as this is related to the
proposal on the FLIP itself.

In the existing proposal, the Executor interface is the following.

public interface Executor {

  JobExecutionResult execute(Pipeline pipeline) throws Exception;

}

This implies that all the necessary information for the execution of a
Pipeline should be included in the Configuration passed in the
ExecutorFactory which instantiates the Executor itself. This should
include, for example, all the parameters currently supplied by the
ProgramOptions, which are conceptually not executor parameters but
rather parameters for the execution of the specific pipeline. To this
end, I would like to propose a change in the current Executor
interface showcased below:


public interface Executor {

  JobExecutionResult execute(Pipeline pipeline, Configuration
executionOptions) throws Exception;

}

The above will allow to have the Executor specific options passed in
the configuration given during executor instantiation, while the
pipeline specific options can be passed in the executionOptions. As a
positive side-effect, this will make Executors re-usable, i.e.
instantiate an executor and use it to execute multiple pipelines, if
in the future we choose to do so.

Let me know what do you think,
Kostas

On Wed, Oct 2, 2019 at 7:23 PM Kostas Kloudas  wrote:
>
> Hi all,
>
> I agree with Tison that we should disentangle threads so that people
> can work independently.
>
> For FLIP-73:
>  - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
> Executors work, as they are using the exexute() method because this is
> the only "entry" to the user program. To this regard, I believe we
> should just see the fact that they have their dedicated environment as
> an "implementation detail".
>  - for getting rid of the per-job mode: as a first note, there was
> already a discussion here:
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> with many people, including myself, expressing their opinion. I am
> mentioning that to show that this topic already has some history and
> the discussin does not start from scratch but there are already some
> contradicting opinions. My opinion is that we should not get rid of
> the per-job mode but I agree that we should discuss about the
> semantics in more detail. Although in terms of code it may be tempting
> to "merge" the two submission modes, one of the main benefits of the
> per-job mode is isolation, both for resources and security, as the
> jobGraph to be executed is fixed and the cluster is "locked" just for
> that specific graph. This would be violated by having a session
> cluster launched and having all the infrastrucutre (ports and
> endpoints) set for submittting to that cluster any job.
> - for getting rid of the "detached" mode: I agree with getting rid of
> it but this implies some potential user-facing changes that should be
> discussed.
>
> Given the above, I think that:
> 1) in the context of FLIP-73 we should not change any semantics but
> simply push the existing submission logic behind a reusable
> abstraction and make it usable via public APIs, as Aljoscha said.
> 2) as Till said, changing the semantics is beyond the scope of this
> FLIP and as Tison mentioned we should work towards decoupling
> discussions rather than the opposite. So let's discuss about the
> future of the per-job and detached modes in a separate thread. This
> will also allow to give the proper visibility to such an important
> topic.
>
> Cheers,
> Kostas
>
> On Wed, Oct 2, 2019 at 4:40 PM Zili Chen  wrote:
> >
> > Thanks for your thoughts Aljoscha.
> >
> > Another question since FLIP-73 might contains refactors on Environemnt:
> > shall we support
> > something like PreviewPlanEnvironment? If so, how? From a user perspective
> > preview plan
> > is useful, by give visual view, to modify topos and configure without
> > submit it.
> >
> > Best,
> > tison.
> >
> >
> > Aljoscha Krettek  于2019年10月2日周三 下午10:10写道:
> >
> > > I agree with Till that we should not change the semantics of per-job mode.
> > > In my opinion per-job mode means that the cluster (JobManager) is brought
> > > up with one job and it only executes that one job. There should be no open
> > > ports/anything that would allow submitting further jobs. This is very
> > > important for deployments in docker/Kubernetes or other environments were
> > > you bring up jobs without necessarily having the notion of a Flink 
> > > cluster.
> > >
> > > What this means for a user program that has multiple execute() calls is
> > > that you will get a fresh cluster for each execute call. This also means,
> > > that further execute() calls will only happen if the “client” is still
> > > alive, because it is the one driving execution. Currently, this only works
> > > if you start the job in “attached” mode. If you start in “detached” mode
> > > only the first execute()

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-02 Thread Kostas Kloudas
Hi all,

I agree with Tison that we should disentangle threads so that people
can work independently.

For FLIP-73:
 - for Preview/OptimizedPlanEnv: I think they are orthogonal to the
Executors work, as they are using the exexute() method because this is
the only "entry" to the user program. To this regard, I believe we
should just see the fact that they have their dedicated environment as
an "implementation detail".
 - for getting rid of the per-job mode: as a first note, there was
already a discussion here:
https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
with many people, including myself, expressing their opinion. I am
mentioning that to show that this topic already has some history and
the discussin does not start from scratch but there are already some
contradicting opinions. My opinion is that we should not get rid of
the per-job mode but I agree that we should discuss about the
semantics in more detail. Although in terms of code it may be tempting
to "merge" the two submission modes, one of the main benefits of the
per-job mode is isolation, both for resources and security, as the
jobGraph to be executed is fixed and the cluster is "locked" just for
that specific graph. This would be violated by having a session
cluster launched and having all the infrastrucutre (ports and
endpoints) set for submittting to that cluster any job.
- for getting rid of the "detached" mode: I agree with getting rid of
it but this implies some potential user-facing changes that should be
discussed.

Given the above, I think that:
1) in the context of FLIP-73 we should not change any semantics but
simply push the existing submission logic behind a reusable
abstraction and make it usable via public APIs, as Aljoscha said.
2) as Till said, changing the semantics is beyond the scope of this
FLIP and as Tison mentioned we should work towards decoupling
discussions rather than the opposite. So let's discuss about the
future of the per-job and detached modes in a separate thread. This
will also allow to give the proper visibility to such an important
topic.

Cheers,
Kostas

On Wed, Oct 2, 2019 at 4:40 PM Zili Chen  wrote:
>
> Thanks for your thoughts Aljoscha.
>
> Another question since FLIP-73 might contains refactors on Environemnt:
> shall we support
> something like PreviewPlanEnvironment? If so, how? From a user perspective
> preview plan
> is useful, by give visual view, to modify topos and configure without
> submit it.
>
> Best,
> tison.
>
>
> Aljoscha Krettek  于2019年10月2日周三 下午10:10写道:
>
> > I agree with Till that we should not change the semantics of per-job mode.
> > In my opinion per-job mode means that the cluster (JobManager) is brought
> > up with one job and it only executes that one job. There should be no open
> > ports/anything that would allow submitting further jobs. This is very
> > important for deployments in docker/Kubernetes or other environments were
> > you bring up jobs without necessarily having the notion of a Flink cluster.
> >
> > What this means for a user program that has multiple execute() calls is
> > that you will get a fresh cluster for each execute call. This also means,
> > that further execute() calls will only happen if the “client” is still
> > alive, because it is the one driving execution. Currently, this only works
> > if you start the job in “attached” mode. If you start in “detached” mode
> > only the first execute() will happen and the rest will be ignored.
> >
> > This brings us to the tricky question about what to do about “detached”
> > and “attached”. In the long run, I would like to get rid of the distinction
> > and leave it up to the user program, by either blocking or not on the
> > Future (or JobClient or whatnot) that job submission returns. This,
> > however, means that users cannot simply request “detached” execution when
> > using bin/flink, the user program has to “play along”. On the other hand,
> > “detached” mode is quite strange for the user program. The execute() call
> > either returns with a proper job result after the job ran (in “attached”
> > mode) or with a dummy result (in “detached” mode) right after submission. I
> > think this can even lead to weird cases where multiple "execute()” run in
> > parallel. For per-job detached mode we also “throw” out of the first
> > execute so the rest (including result processing logic) is ignored.
> >
> > For this here FLIP-73 we can (and should) ignore these problems, because
> > FLIP-73 only moves the existing submission logic behind a reusable
> > abstraction and makes it usable via API. We should closely follow up on the
> > above points though because I think they are also important.
> >
> > Best,
> > Aljoscha
> >
> > > On 2. Oct 2019, at 12:08, Zili Chen  wrote:
> > >
> > > Thanks for your clarification Till.
> > >
> > > I agree with the current semantics of the per-job mode, one should
> > deploy a
> > > new cluster for each part of the job

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-02 Thread Zili Chen
Thanks for your thoughts Aljoscha.

Another question since FLIP-73 might contains refactors on Environemnt:
shall we support
something like PreviewPlanEnvironment? If so, how? From a user perspective
preview plan
is useful, by give visual view, to modify topos and configure without
submit it.

Best,
tison.


Aljoscha Krettek  于2019年10月2日周三 下午10:10写道:

> I agree with Till that we should not change the semantics of per-job mode.
> In my opinion per-job mode means that the cluster (JobManager) is brought
> up with one job and it only executes that one job. There should be no open
> ports/anything that would allow submitting further jobs. This is very
> important for deployments in docker/Kubernetes or other environments were
> you bring up jobs without necessarily having the notion of a Flink cluster.
>
> What this means for a user program that has multiple execute() calls is
> that you will get a fresh cluster for each execute call. This also means,
> that further execute() calls will only happen if the “client” is still
> alive, because it is the one driving execution. Currently, this only works
> if you start the job in “attached” mode. If you start in “detached” mode
> only the first execute() will happen and the rest will be ignored.
>
> This brings us to the tricky question about what to do about “detached”
> and “attached”. In the long run, I would like to get rid of the distinction
> and leave it up to the user program, by either blocking or not on the
> Future (or JobClient or whatnot) that job submission returns. This,
> however, means that users cannot simply request “detached” execution when
> using bin/flink, the user program has to “play along”. On the other hand,
> “detached” mode is quite strange for the user program. The execute() call
> either returns with a proper job result after the job ran (in “attached”
> mode) or with a dummy result (in “detached” mode) right after submission. I
> think this can even lead to weird cases where multiple "execute()” run in
> parallel. For per-job detached mode we also “throw” out of the first
> execute so the rest (including result processing logic) is ignored.
>
> For this here FLIP-73 we can (and should) ignore these problems, because
> FLIP-73 only moves the existing submission logic behind a reusable
> abstraction and makes it usable via API. We should closely follow up on the
> above points though because I think they are also important.
>
> Best,
> Aljoscha
>
> > On 2. Oct 2019, at 12:08, Zili Chen  wrote:
> >
> > Thanks for your clarification Till.
> >
> > I agree with the current semantics of the per-job mode, one should
> deploy a
> > new cluster for each part of the job. Apart from the performance concern
> > it also means that PerJobExecutor knows how to deploy a cluster actually,
> > which is different from the description that Executor submit a job.
> >
> > Anyway it sounds workable and narrow the changes.
>
>


Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-02 Thread Aljoscha Krettek
I agree with Till that we should not change the semantics of per-job mode. In 
my opinion per-job mode means that the cluster (JobManager) is brought up with 
one job and it only executes that one job. There should be no open 
ports/anything that would allow submitting further jobs. This is very important 
for deployments in docker/Kubernetes or other environments were you bring up 
jobs without necessarily having the notion of a Flink cluster.

What this means for a user program that has multiple execute() calls is that 
you will get a fresh cluster for each execute call. This also means, that 
further execute() calls will only happen if the “client” is still alive, 
because it is the one driving execution. Currently, this only works if you 
start the job in “attached” mode. If you start in “detached” mode only the 
first execute() will happen and the rest will be ignored.

This brings us to the tricky question about what to do about “detached” and 
“attached”. In the long run, I would like to get rid of the distinction and 
leave it up to the user program, by either blocking or not on the Future (or 
JobClient or whatnot) that job submission returns. This, however, means that 
users cannot simply request “detached” execution when using bin/flink, the user 
program has to “play along”. On the other hand, “detached” mode is quite 
strange for the user program. The execute() call either returns with a proper 
job result after the job ran (in “attached” mode) or with a dummy result (in 
“detached” mode) right after submission. I think this can even lead to weird 
cases where multiple "execute()” run in parallel. For per-job detached mode we 
also “throw” out of the first execute so the rest (including result processing 
logic) is ignored.

For this here FLIP-73 we can (and should) ignore these problems, because 
FLIP-73 only moves the existing submission logic behind a reusable abstraction 
and makes it usable via API. We should closely follow up on the above points 
though because I think they are also important.

Best,
Aljoscha

> On 2. Oct 2019, at 12:08, Zili Chen  wrote:
> 
> Thanks for your clarification Till.
> 
> I agree with the current semantics of the per-job mode, one should deploy a
> new cluster for each part of the job. Apart from the performance concern
> it also means that PerJobExecutor knows how to deploy a cluster actually,
> which is different from the description that Executor submit a job.
> 
> Anyway it sounds workable and narrow the changes.



Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-02 Thread Zili Chen
Thanks for your clarification Till.

I agree with the current semantics of the per-job mode, one should deploy a
new cluster for each part of the job. Apart from the performance concern
it also means that PerJobExecutor knows how to deploy a cluster actually,
which is different from the description that Executor submit a job.

Anyway it sounds workable and narrow the changes.


Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-02 Thread Till Rohrmann
I'm not saying that one cannot change the semantics of the per-job mode,
I'm mainly wondering whether this needs to happen as part of this FLIP. In
my understanding this has lot of overlap with the driver mode effort and
maybe not so much the executors. However, I agree that it is helpful to
have a long term vision for which one needs to discuss it.

On Wed, Oct 2, 2019 at 11:40 AM Zili Chen  wrote:

> To be honest I formerly want to firstly start a thread discuss about
> what per-job mode means because things gets quite different whether
> or not per-job mode contains exactly one JobGraph or allow to have
> multiple part. Plus the complexity that whether or not we support
> post-execution logic it becomes more unclear what per-job
> looks like in user perspective.
>
> But the original purpose is towards a concrete PerJobExecutor and I
> want to save bandwidth by reduce concurrent coupled threads a bit.
>
>
> Zili Chen  于2019年10月2日周三 下午5:33写道:
>
>> Hi Till,
>>
>> The purpose to post thoughts above here is because FLIP-73 is unclear on
>> how to
>> achieve PerJobExecutor. In order to discuss this topic it is necessary to
>> clarify how
>> per-job mode runs regardless what it is now.
>>
>> With PerJobExecutor called in Environment I don't think we still keep
>> current logic. If
>> we keep current logic, it looks like
>>
>> 1. env.execute calls executor.execute
>> 2. executor get current job graph, deploy a job cluster
>> 3. for the rest part, shall we deploy a new job cluster? reuse the
>> previous job cluster?
>> or as current logic, we abort on the first submission?
>>
>
With the current semantics of the per-job mode, one should deploy a new
cluster for each part of the job.

>
>> These question should be answered to clarify what PerJobExecutor is and
>> how it works.
>>
>> Best,
>> tison
>>
>>
>> Till Rohrmann  于2019年10月2日周三 下午5:19写道:
>>
>>> I'm not sure whether removing the current per-job mode semantics all
>>> together is a good idea. It has some nice properties, for example the
>>> JobGraph stays constant. With your proposal which I would coin the
>>> driver mode, the JobGraph would be regenerated in case of a failover.
>>> Depending on the user code logic, this could generate a different JobGraph.
>>>
>>> Aren't we unnecessarily widening the scope of this FLIP here? Wouldn't
>>> it be possible to introduce the Executors without changing Flink's
>>> deployment options in the first step? I don't fully understand where this
>>> need/requirement comes from.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Oct 2, 2019 at 10:58 AM Zili Chen  wrote:
>>>
 Thanks for your thoughts Kostas!

 I agree Executor to be a concept on clients now. And sincerely second
 the description

 Now the Executor simply uses a client, e.g. a ClusterClient, to submit
 the job (JobGraph) that it will create from the user program.
 In that sense, the Executor is one level of abstraction above the
 clients, as it adds more functionality and it uses the one offered by
 the client.

 In fact, let's think of the statement an Executor simply uses a client
 to submit the job.
 I'd like to give a description of how job submission works in per-job
 mode and it will
 follow a similar view now which

 (1) achieve run client on cluster side @Stephan Ewen 

 (2) support multi-parts per-job program so that we don't hack to
 fallback to session cluster
 in this case @Till Rohrmann 

 Let's start with an example we submit a user program via CLI in per-job
 mode.

 1) CLI generates configuration for getting all information about
 deployment.
 2) CLI deploys a job cluster *with user jars* and specially mark the
 jar contains user program
 3) JobClusterEntrypoint takes care of the bootstrap of flink cluster
 and executes user program,
 respects all configuration passed from client
 4) user program now runs on cluster side, it starts executing main
 method, get a environment with
 information of the associated job cluster. since the cluster has
 already started, it can submit the
 job to that cluster as in session cluster.
 5) job cluster shutdown on user program exits *and* Dispatcher doesn't
 maintain any jobs.

 Since we actually runs client on cluster side we can execute
 multi-parts program because we submit
 to local cluster one by one. And because we change the process from

 - start a per job cluster with job graph

 to

 + start a per job cluster with user program

 we runs client on cluster side, it avoids that we "extract" job graph
 from user program which limits
 on multi-parts program and doesn't respect user logic outside of Flink
 related code.

 Take session scenario into consideration, overall we now have

 1. ClusterDeployer and its factory which are SPI for platform
 developers so that they can deploy

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-02 Thread Zili Chen
Hi Till,

The purpose to post thoughts above here is because FLIP-73 is unclear on
how to
achieve PerJobExecutor. In order to discuss this topic it is necessary to
clarify how
per-job mode runs regardless what it is now.

With PerJobExecutor called in Environment I don't think we still keep
current logic. If
we keep current logic, it looks like

1. env.execute calls executor.execute
2. executor get current job graph, deploy a job cluster
3. for the rest part, shall we deploy a new job cluster? reuse the previous
job cluster?
or as current logic, we abort on the first submission?

These question should be answered to clarify what PerJobExecutor is and how
it works.

Best,
tison


Till Rohrmann  于2019年10月2日周三 下午5:19写道:

> I'm not sure whether removing the current per-job mode semantics all
> together is a good idea. It has some nice properties, for example the
> JobGraph stays constant. With your proposal which I would coin the
> driver mode, the JobGraph would be regenerated in case of a failover.
> Depending on the user code logic, this could generate a different JobGraph.
>
> Aren't we unnecessarily widening the scope of this FLIP here? Wouldn't it
> be possible to introduce the Executors without changing Flink's deployment
> options in the first step? I don't fully understand where this
> need/requirement comes from.
>
> Cheers,
> Till
>
> On Wed, Oct 2, 2019 at 10:58 AM Zili Chen  wrote:
>
>> Thanks for your thoughts Kostas!
>>
>> I agree Executor to be a concept on clients now. And sincerely second the
>> description
>>
>> Now the Executor simply uses a client, e.g. a ClusterClient, to submit
>> the job (JobGraph) that it will create from the user program.
>> In that sense, the Executor is one level of abstraction above the
>> clients, as it adds more functionality and it uses the one offered by
>> the client.
>>
>> In fact, let's think of the statement an Executor simply uses a client to
>> submit the job.
>> I'd like to give a description of how job submission works in per-job
>> mode and it will
>> follow a similar view now which
>>
>> (1) achieve run client on cluster side @Stephan Ewen 
>> (2) support multi-parts per-job program so that we don't hack to fallback
>> to session cluster
>> in this case @Till Rohrmann 
>>
>> Let's start with an example we submit a user program via CLI in per-job
>> mode.
>>
>> 1) CLI generates configuration for getting all information about
>> deployment.
>> 2) CLI deploys a job cluster *with user jars* and specially mark the jar
>> contains user program
>> 3) JobClusterEntrypoint takes care of the bootstrap of flink cluster and
>> executes user program,
>> respects all configuration passed from client
>> 4) user program now runs on cluster side, it starts executing main
>> method, get a environment with
>> information of the associated job cluster. since the cluster has already
>> started, it can submit the
>> job to that cluster as in session cluster.
>> 5) job cluster shutdown on user program exits *and* Dispatcher doesn't
>> maintain any jobs.
>>
>> Since we actually runs client on cluster side we can execute multi-parts
>> program because we submit
>> to local cluster one by one. And because we change the process from
>>
>> - start a per job cluster with job graph
>>
>> to
>>
>> + start a per job cluster with user program
>>
>> we runs client on cluster side, it avoids that we "extract" job graph
>> from user program which limits
>> on multi-parts program and doesn't respect user logic outside of Flink
>> related code.
>>
>> Take session scenario into consideration, overall we now have
>>
>> 1. ClusterDeployer and its factory which are SPI for platform developers
>> so that they can deploy a
>> job cluster with user program or session cluster.
>>
>> 2. Environment and Executor is unified. Environment helps describe user
>> program logic and internally
>> compile the job as well as submit job with Executor. Executor always make
>> use of a ClusterClient
>> to submit the job. Specifically, in per-job mode, Environment reads
>> configuration refined by job cluster
>> so that it knows how to generate a ClusterClient.
>>
>> 3. Platform developers gets ClusterClient as return value of deploy
>> method of ClusterDeployer or
>> retrieves from an existing public known session Cluster(by
>> ClusterRetriever or extend ClusterDeploy to
>> another general concept).
>>
>> 4. JobClient can be used by user program writer or platform developer for
>> manage job in different condition.
>>
>> There are many other refactor we can do to respect this architecture but
>> let's re-emphasize the key difference
>>
>> ** job cluster doesn't start with a job graph anymore but start with a
>> user program and it runs the program
>> on the same place as the cluster runs on. So that for the program, it is
>> nothing different to a session cluster.
>> It just an existing cluster. **
>>
>> Best,
>> tison.
>>
>


Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-02 Thread Zili Chen
To be honest I formerly want to firstly start a thread discuss about
what per-job mode means because things gets quite different whether
or not per-job mode contains exactly one JobGraph or allow to have
multiple part. Plus the complexity that whether or not we support
post-execution logic it becomes more unclear what per-job
looks like in user perspective.

But the original purpose is towards a concrete PerJobExecutor and I
want to save bandwidth by reduce concurrent coupled threads a bit.


Zili Chen  于2019年10月2日周三 下午5:33写道:

> Hi Till,
>
> The purpose to post thoughts above here is because FLIP-73 is unclear on
> how to
> achieve PerJobExecutor. In order to discuss this topic it is necessary to
> clarify how
> per-job mode runs regardless what it is now.
>
> With PerJobExecutor called in Environment I don't think we still keep
> current logic. If
> we keep current logic, it looks like
>
> 1. env.execute calls executor.execute
> 2. executor get current job graph, deploy a job cluster
> 3. for the rest part, shall we deploy a new job cluster? reuse the
> previous job cluster?
> or as current logic, we abort on the first submission?
>
> These question should be answered to clarify what PerJobExecutor is and
> how it works.
>
> Best,
> tison
>
>
> Till Rohrmann  于2019年10月2日周三 下午5:19写道:
>
>> I'm not sure whether removing the current per-job mode semantics all
>> together is a good idea. It has some nice properties, for example the
>> JobGraph stays constant. With your proposal which I would coin the
>> driver mode, the JobGraph would be regenerated in case of a failover.
>> Depending on the user code logic, this could generate a different JobGraph.
>>
>> Aren't we unnecessarily widening the scope of this FLIP here? Wouldn't it
>> be possible to introduce the Executors without changing Flink's deployment
>> options in the first step? I don't fully understand where this
>> need/requirement comes from.
>>
>> Cheers,
>> Till
>>
>> On Wed, Oct 2, 2019 at 10:58 AM Zili Chen  wrote:
>>
>>> Thanks for your thoughts Kostas!
>>>
>>> I agree Executor to be a concept on clients now. And sincerely second
>>> the description
>>>
>>> Now the Executor simply uses a client, e.g. a ClusterClient, to submit
>>> the job (JobGraph) that it will create from the user program.
>>> In that sense, the Executor is one level of abstraction above the
>>> clients, as it adds more functionality and it uses the one offered by
>>> the client.
>>>
>>> In fact, let's think of the statement an Executor simply uses a client
>>> to submit the job.
>>> I'd like to give a description of how job submission works in per-job
>>> mode and it will
>>> follow a similar view now which
>>>
>>> (1) achieve run client on cluster side @Stephan Ewen 
>>> (2) support multi-parts per-job program so that we don't hack to
>>> fallback to session cluster
>>> in this case @Till Rohrmann 
>>>
>>> Let's start with an example we submit a user program via CLI in per-job
>>> mode.
>>>
>>> 1) CLI generates configuration for getting all information about
>>> deployment.
>>> 2) CLI deploys a job cluster *with user jars* and specially mark the jar
>>> contains user program
>>> 3) JobClusterEntrypoint takes care of the bootstrap of flink cluster and
>>> executes user program,
>>> respects all configuration passed from client
>>> 4) user program now runs on cluster side, it starts executing main
>>> method, get a environment with
>>> information of the associated job cluster. since the cluster has already
>>> started, it can submit the
>>> job to that cluster as in session cluster.
>>> 5) job cluster shutdown on user program exits *and* Dispatcher doesn't
>>> maintain any jobs.
>>>
>>> Since we actually runs client on cluster side we can execute multi-parts
>>> program because we submit
>>> to local cluster one by one. And because we change the process from
>>>
>>> - start a per job cluster with job graph
>>>
>>> to
>>>
>>> + start a per job cluster with user program
>>>
>>> we runs client on cluster side, it avoids that we "extract" job graph
>>> from user program which limits
>>> on multi-parts program and doesn't respect user logic outside of Flink
>>> related code.
>>>
>>> Take session scenario into consideration, overall we now have
>>>
>>> 1. ClusterDeployer and its factory which are SPI for platform developers
>>> so that they can deploy a
>>> job cluster with user program or session cluster.
>>>
>>> 2. Environment and Executor is unified. Environment helps describe user
>>> program logic and internally
>>> compile the job as well as submit job with Executor. Executor always
>>> make use of a ClusterClient
>>> to submit the job. Specifically, in per-job mode, Environment reads
>>> configuration refined by job cluster
>>> so that it knows how to generate a ClusterClient.
>>>
>>> 3. Platform developers gets ClusterClient as return value of deploy
>>> method of ClusterDeployer or
>>> retrieves from an existing public known session Cluster(by
>>> ClusterRetriever or extend

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-02 Thread Till Rohrmann
I'm not sure whether removing the current per-job mode semantics all
together is a good idea. It has some nice properties, for example the
JobGraph stays constant. With your proposal which I would coin the
driver mode, the JobGraph would be regenerated in case of a failover.
Depending on the user code logic, this could generate a different JobGraph.

Aren't we unnecessarily widening the scope of this FLIP here? Wouldn't it
be possible to introduce the Executors without changing Flink's deployment
options in the first step? I don't fully understand where this
need/requirement comes from.

Cheers,
Till

On Wed, Oct 2, 2019 at 10:58 AM Zili Chen  wrote:

> Thanks for your thoughts Kostas!
>
> I agree Executor to be a concept on clients now. And sincerely second the
> description
>
> Now the Executor simply uses a client, e.g. a ClusterClient, to submit
> the job (JobGraph) that it will create from the user program.
> In that sense, the Executor is one level of abstraction above the
> clients, as it adds more functionality and it uses the one offered by
> the client.
>
> In fact, let's think of the statement an Executor simply uses a client to
> submit the job.
> I'd like to give a description of how job submission works in per-job mode
> and it will
> follow a similar view now which
>
> (1) achieve run client on cluster side @Stephan Ewen 
> (2) support multi-parts per-job program so that we don't hack to fallback
> to session cluster
> in this case @Till Rohrmann 
>
> Let's start with an example we submit a user program via CLI in per-job
> mode.
>
> 1) CLI generates configuration for getting all information about
> deployment.
> 2) CLI deploys a job cluster *with user jars* and specially mark the jar
> contains user program
> 3) JobClusterEntrypoint takes care of the bootstrap of flink cluster and
> executes user program,
> respects all configuration passed from client
> 4) user program now runs on cluster side, it starts executing main method,
> get a environment with
> information of the associated job cluster. since the cluster has already
> started, it can submit the
> job to that cluster as in session cluster.
> 5) job cluster shutdown on user program exits *and* Dispatcher doesn't
> maintain any jobs.
>
> Since we actually runs client on cluster side we can execute multi-parts
> program because we submit
> to local cluster one by one. And because we change the process from
>
> - start a per job cluster with job graph
>
> to
>
> + start a per job cluster with user program
>
> we runs client on cluster side, it avoids that we "extract" job graph from
> user program which limits
> on multi-parts program and doesn't respect user logic outside of Flink
> related code.
>
> Take session scenario into consideration, overall we now have
>
> 1. ClusterDeployer and its factory which are SPI for platform developers
> so that they can deploy a
> job cluster with user program or session cluster.
>
> 2. Environment and Executor is unified. Environment helps describe user
> program logic and internally
> compile the job as well as submit job with Executor. Executor always make
> use of a ClusterClient
> to submit the job. Specifically, in per-job mode, Environment reads
> configuration refined by job cluster
> so that it knows how to generate a ClusterClient.
>
> 3. Platform developers gets ClusterClient as return value of deploy method
> of ClusterDeployer or
> retrieves from an existing public known session Cluster(by
> ClusterRetriever or extend ClusterDeploy to
> another general concept).
>
> 4. JobClient can be used by user program writer or platform developer for
> manage job in different condition.
>
> There are many other refactor we can do to respect this architecture but
> let's re-emphasize the key difference
>
> ** job cluster doesn't start with a job graph anymore but start with a
> user program and it runs the program
> on the same place as the cluster runs on. So that for the program, it is
> nothing different to a session cluster.
> It just an existing cluster. **
>
> Best,
> tison.
>


Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-02 Thread Zili Chen
Thomas,

For your requirement on jar run REST endpoint we can follow a similar way
as job cluster way described above. That is, we submit the jar and runs user
program on cluster side.

Also cc Aljoscha,

There is no JobManager.runJar in mind. All logic that handles jar run is in
WebMonitor,
we prepare the configuration especially that for looking up cluster for the
program
and execute it in another progress.

Obviously it might cause some security issues because main method possibly
contains
arbitrary codes, but

1. We can provide an option to forbid jar run on cluster when it is
considered important
2. Session cluster already share resource between jobs, if a job want to
have its dedicate
resource, use per-job mode.

Does this way fits your requirement Thomas?


Zili Chen  于2019年10月2日周三 下午4:57写道:

> Thanks for your thoughts Kostas!
>
> I agree Executor to be a concept on clients now. And sincerely second the
> description
>
> Now the Executor simply uses a client, e.g. a ClusterClient, to submit
> the job (JobGraph) that it will create from the user program.
> In that sense, the Executor is one level of abstraction above the
> clients, as it adds more functionality and it uses the one offered by
> the client.
>
> In fact, let's think of the statement an Executor simply uses a client to
> submit the job.
> I'd like to give a description of how job submission works in per-job mode
> and it will
> follow a similar view now which
>
> (1) achieve run client on cluster side @Stephan Ewen 
> (2) support multi-parts per-job program so that we don't hack to fallback
> to session cluster
> in this case @Till Rohrmann 
>
> Let's start with an example we submit a user program via CLI in per-job
> mode.
>
> 1) CLI generates configuration for getting all information about
> deployment.
> 2) CLI deploys a job cluster *with user jars* and specially mark the jar
> contains user program
> 3) JobClusterEntrypoint takes care of the bootstrap of flink cluster and
> executes user program,
> respects all configuration passed from client
> 4) user program now runs on cluster side, it starts executing main method,
> get a environment with
> information of the associated job cluster. since the cluster has already
> started, it can submit the
> job to that cluster as in session cluster.
> 5) job cluster shutdown on user program exits *and* Dispatcher doesn't
> maintain any jobs.
>
> Since we actually runs client on cluster side we can execute multi-parts
> program because we submit
> to local cluster one by one. And because we change the process from
>
> - start a per job cluster with job graph
>
> to
>
> + start a per job cluster with user program
>
> we runs client on cluster side, it avoids that we "extract" job graph from
> user program which limits
> on multi-parts program and doesn't respect user logic outside of Flink
> related code.
>
> Take session scenario into consideration, overall we now have
>
> 1. ClusterDeployer and its factory which are SPI for platform developers
> so that they can deploy a
> job cluster with user program or session cluster.
>
> 2. Environment and Executor is unified. Environment helps describe user
> program logic and internally
> compile the job as well as submit job with Executor. Executor always make
> use of a ClusterClient
> to submit the job. Specifically, in per-job mode, Environment reads
> configuration refined by job cluster
> so that it knows how to generate a ClusterClient.
>
> 3. Platform developers gets ClusterClient as return value of deploy method
> of ClusterDeployer or
> retrieves from an existing public known session Cluster(by
> ClusterRetriever or extend ClusterDeploy to
> another general concept).
>
> 4. JobClient can be used by user program writer or platform developer for
> manage job in different condition.
>
> There are many other refactor we can do to respect this architecture but
> let's re-emphasize the key difference
>
> ** job cluster doesn't start with a job graph anymore but start with a
> user program and it runs the program
> on the same place as the cluster runs on. So that for the program, it is
> nothing different to a session cluster.
> It just an existing cluster. **
>
> Best,
> tison.
>


Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-10-02 Thread Zili Chen
Thanks for your thoughts Kostas!

I agree Executor to be a concept on clients now. And sincerely second the
description

Now the Executor simply uses a client, e.g. a ClusterClient, to submit
the job (JobGraph) that it will create from the user program.
In that sense, the Executor is one level of abstraction above the
clients, as it adds more functionality and it uses the one offered by
the client.

In fact, let's think of the statement an Executor simply uses a client to
submit the job.
I'd like to give a description of how job submission works in per-job mode
and it will
follow a similar view now which

(1) achieve run client on cluster side @Stephan Ewen 
(2) support multi-parts per-job program so that we don't hack to fallback
to session cluster
in this case @Till Rohrmann 

Let's start with an example we submit a user program via CLI in per-job
mode.

1) CLI generates configuration for getting all information about deployment.
2) CLI deploys a job cluster *with user jars* and specially mark the jar
contains user program
3) JobClusterEntrypoint takes care of the bootstrap of flink cluster and
executes user program,
respects all configuration passed from client
4) user program now runs on cluster side, it starts executing main method,
get a environment with
information of the associated job cluster. since the cluster has already
started, it can submit the
job to that cluster as in session cluster.
5) job cluster shutdown on user program exits *and* Dispatcher doesn't
maintain any jobs.

Since we actually runs client on cluster side we can execute multi-parts
program because we submit
to local cluster one by one. And because we change the process from

- start a per job cluster with job graph

to

+ start a per job cluster with user program

we runs client on cluster side, it avoids that we "extract" job graph from
user program which limits
on multi-parts program and doesn't respect user logic outside of Flink
related code.

Take session scenario into consideration, overall we now have

1. ClusterDeployer and its factory which are SPI for platform developers so
that they can deploy a
job cluster with user program or session cluster.

2. Environment and Executor is unified. Environment helps describe user
program logic and internally
compile the job as well as submit job with Executor. Executor always make
use of a ClusterClient
to submit the job. Specifically, in per-job mode, Environment reads
configuration refined by job cluster
so that it knows how to generate a ClusterClient.

3. Platform developers gets ClusterClient as return value of deploy method
of ClusterDeployer or
retrieves from an existing public known session Cluster(by ClusterRetriever
or extend ClusterDeploy to
another general concept).

4. JobClient can be used by user program writer or platform developer for
manage job in different condition.

There are many other refactor we can do to respect this architecture but
let's re-emphasize the key difference

** job cluster doesn't start with a job graph anymore but start with a user
program and it runs the program
on the same place as the cluster runs on. So that for the program, it is
nothing different to a session cluster.
It just an existing cluster. **

Best,
tison.


Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-09-27 Thread Kostas Kloudas
Hi Zili,

I think we are more or less on the same page with most of the stuff
you mentioned.

The only slight difference, at least in my opinion, is that I do not
see the Executors as being a "Client".
As you mentioned, I can see having the following:

1). ClusterClientFactory: responsible for deploy session cluster and
retrieve session cluster client.
2). ClusterClient: interact with session cluster, responsible for
query cluster level status, submit Flink job and retrieve Flink job
client.
3) JobClient: interact with Flink job, responsible for query job level
status and perform job level operation such as trigger savepoint.

Now the Executor simply uses a client, e.g. a ClusterClient, to submit
the job (JobGraph) that it will create from the user program.
In that sense, the Executor is one level of abstraction above the
clients, as it adds more functionality and it uses the one offered by
the client.

For the two questions you mentioned:

I). It seems we cannot have a ClusterClient of JobCluster. Is it
expected(due to the cluster bound to the job)?

 I think that this is expected for the reason that you also mention.
In this case, given that the lifecycle of the cluster
and that of the job are identical, the JobClient is essentially a
ClusterClient. You cannot submit jobs, but you can take
a savepoint or cancel the job that is currently being executed. In the
case of cancelling, I believe that it should also
kill the cluster.

II). It seems we treat session cluster quite different from job
cluster, but cluster client can submit a job, which overlaps a bit
with Executor.

It is true that we treat them differently, for the reason that in the
per-job mode, we have a "single-purpose" cluster and when its job is
done, it has no reason to keep occupying resources. In my opinion, the
"cluster client" or "job client" (semantically in per-job mode the are
the same) should not be able to submit new jobs in this scenario.

I agree that the PerJobExecutor requires a bit more discussion and I
will keep on updating the FLIP and discussing on this thread as more
details become clearer for this case.

Thanks for your thoughts on the topic and keep them coming ;)

Cheers,
Kostas

On Fri, Sep 27, 2019 at 10:28 AM Zili Chen  wrote:
>
> Thanks for your reply Kostas.
>
> As mentioned in FLIP-74 thread[1] there are two questions on Executor design
>
> (1) Where Executor is in a multi-layered clients view.
> (2) A bit more details about PerJobExecutor implementation.
>
> For (1) Where Executor is in a multi-layered clients view,
>
> As described in the multi-layered client thread[2], in our current codebase, 
> with JobClient
> introduced in FLIP-74, clients can be layered as
>
> 1) ClusterDescriptor: interact with external resource manager, responsible 
> for deploy Flink
> application cluster and retrieve Flink application cluster client.
> 2) ClusterClient: interact with Flink application cluster, responsible for 
> query cluster level
> status, submit Flink job and retrieve Flink job client.
> 3) JobClient: interact with Flink job, responsible for query job level status 
> and perform job
> level operation such as trigger savepoint.
>
> However, the singularity is JobCluster, which couple a bit cluster deployment 
> and job
> submission. From my perspective with FLIP-73 and Kostas's thoughts in FLIP-74 
> thread,
> we form a multi-layered client as below
>
> 1) Executor: responsible for job submission, whether the corresponding 
> cluster is
> SessionCluster or JobCluster doesn't matter. Executor always returns 
> JobClient.
> 2). ClusterClientFactory: responsible for deploy session cluster and retrieve 
> session cluster
> client.
> 3). ClusterClient: interact with session cluster, responsible for query 
> cluster level
> status, submit Flink job and retrieve Flink job client.
> 4) JobClient: interact with Flink job, responsible for query job level status 
> and perform job
> level operation such as trigger savepoint.
>
> I am not sure if the structure above is the same as that in your mind. If so, 
> there are two questions
>
> I). It seems we cannot have a ClusterClient of JobCluster. Is it expected(due 
> to the cluster bound to the job)?
> II). It seems we treat session cluster quite different from job cluster, but 
> cluster client can submit a job, which
> overlaps a bit with Executor.
>
> For (2) A bit more details about PerJobExecutor implementation,
>
> From the content of FLIP-73 it doesn't describe how PerJobExecutor would be 
> although it is spoken a bit in
> the design document[3]. In FLIP-74 thread I forward previous insights in our 
> community which towards two
> attributes of JobCluster
>
> I). Running Flink job by invoke user main method and execute throughout, 
> instead of create JobGraph from main-class.
> II). Run the client inside the cluster.
>
> Does PerJobExecutor fit this requirement? Anyway, it would be helpful we 
> describe the abstraction of Executor
> in the FLIP, at least the different be

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-09-27 Thread Zili Chen
Thanks for your reply Kostas.

As mentioned in FLIP-74 thread[1] there are two questions on Executor design

(1) Where Executor is in a multi-layered clients view.
(2) A bit more details about PerJobExecutor implementation.

For (1) Where Executor is in a multi-layered clients view,

As described in the multi-layered client thread[2], in our current
codebase, with JobClient
introduced in FLIP-74, clients can be layered as

1) ClusterDescriptor: interact with external resource manager, responsible
for deploy Flink
application cluster and retrieve Flink application cluster client.
2) ClusterClient: interact with Flink application cluster, responsible for
query cluster level
status, submit Flink job and retrieve Flink job client.
3) JobClient: interact with Flink job, responsible for query job level
status and perform job
level operation such as trigger savepoint.

However, the singularity is JobCluster, which couple a bit cluster
deployment and job
submission. From my perspective with FLIP-73 and Kostas's thoughts in
FLIP-74 thread,
we form a multi-layered client as below

1) Executor: responsible for job submission, whether the corresponding
cluster is
SessionCluster or JobCluster doesn't matter. Executor always returns
JobClient.
2). ClusterClientFactory: responsible for deploy session cluster and
retrieve session cluster
client.
3). ClusterClient: interact with session cluster, responsible for query
cluster level
status, submit Flink job and retrieve Flink job client.
4) JobClient: interact with Flink job, responsible for query job level
status and perform job
level operation such as trigger savepoint.

I am not sure if the structure above is the same as that in your mind. If
so, there are two questions

I). It seems we cannot have a ClusterClient of JobCluster. Is it
expected(due to the cluster bound to the job)?
II). It seems we treat session cluster quite different from job cluster,
but cluster client can submit a job, which
overlaps a bit with Executor.

For (2) A bit more details about PerJobExecutor implementation,

>From the content of FLIP-73 it doesn't describe how PerJobExecutor would be
although it is spoken a bit in
the design document[3]. In FLIP-74 thread I forward previous insights in
our community which towards two
attributes of JobCluster

I). Running Flink job by invoke user main method and execute throughout,
instead of create JobGraph from main-class.
II). Run the client inside the cluster.

Does PerJobExecutor fit this requirement? Anyway, it would be helpful we
describe the abstraction of Executor
in the FLIP, at least the different between PerJobExecutor and
SessionExecutor is essential.

Best,
tison.

[1]
https://lists.apache.org/x/thread.html/b2e22a45aeb94a8d06b50c4de078f7b23d9ff08b8226918a1a903768@%3Cdev.flink.apache.org%3E
[2]
https://lists.apache.org/x/thread.html/240582148eda905a772d59b2424cb38fa16ab993647824d178cacb02@%3Cdev.flink.apache.org%3E
[3]
https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit?ts=5d8cbe34#heading=h.qq4wc2suukg


Kostas Kloudas  于2019年9月25日周三 下午8:27写道:

> Hi,
>
> @Aljoscha, I believe that it is better to be done like this so that we
> do not step on each-other's feet. If the executor already "knew" about
> the JobClient, then we should also know about how we expect the
> JobExecutionResult is retrieved (which is part of FLIP-74). I think it
> is nice to have each discussion self-contained.
>
> Cheers,
> Kostas
>
> On Wed, Sep 25, 2019 at 2:13 PM Aljoscha Krettek 
> wrote:
> >
> > Hi,
> >
> > I’m fine with either signature for the new execute() method but I think
> we should focus on the executor discovery and executor configuration part
> in this FLIP while FLIP-74 is about the evolution of the method signature
> to return a future.
> >
> > I understand that it’s a bit weird, that this FLIP introduces a new
> interface only to be changed within the same Flink release in a follow-up
> FLIP. But I think we can still do it. What do you think?
> >
> > Best,
> > Aljoscha
> >
> > > On 25. Sep 2019, at 10:11, Kostas Kloudas  wrote:
> > >
> > > Hi Thomas and Zili,
> > >
> > > As you both said the Executor is a new addition so there are no
> > > compatibility concerns.
> > > Backwards compatibility comes into play on the
> > > (Stream)ExecutionEnvironment#execute().
> > >
> > > This method has to stay and keep having the same (blocking) semantics,
> > > but we can
> > > add a new one, sth along the lines of executeAsync() that will return
> > > the JobClient and
> > > will allow the caller to interact with the job.
> > >
> > > Cheers,
> > > Kostas
> > >
> > > On Wed, Sep 25, 2019 at 2:44 AM Zili Chen 
> wrote:
> > >>
> > >>> Since Exceutor is a new interface, why is backward compatibility a
> concern?
> > >>
> > >> For backward compatibility, it is on
> (Stream)ExecutionEnvironment#execute.
> > >> You're right that we don't stick to blocking to return a
> JobExecutionResult in
> > >> Executor aspect but implementing env.execute wi

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-09-25 Thread Kostas Kloudas
Hi,

@Aljoscha, I believe that it is better to be done like this so that we
do not step on each-other's feet. If the executor already "knew" about
the JobClient, then we should also know about how we expect the
JobExecutionResult is retrieved (which is part of FLIP-74). I think it
is nice to have each discussion self-contained.

Cheers,
Kostas

On Wed, Sep 25, 2019 at 2:13 PM Aljoscha Krettek  wrote:
>
> Hi,
>
> I’m fine with either signature for the new execute() method but I think we 
> should focus on the executor discovery and executor configuration part in 
> this FLIP while FLIP-74 is about the evolution of the method signature to 
> return a future.
>
> I understand that it’s a bit weird, that this FLIP introduces a new interface 
> only to be changed within the same Flink release in a follow-up FLIP. But I 
> think we can still do it. What do you think?
>
> Best,
> Aljoscha
>
> > On 25. Sep 2019, at 10:11, Kostas Kloudas  wrote:
> >
> > Hi Thomas and Zili,
> >
> > As you both said the Executor is a new addition so there are no
> > compatibility concerns.
> > Backwards compatibility comes into play on the
> > (Stream)ExecutionEnvironment#execute().
> >
> > This method has to stay and keep having the same (blocking) semantics,
> > but we can
> > add a new one, sth along the lines of executeAsync() that will return
> > the JobClient and
> > will allow the caller to interact with the job.
> >
> > Cheers,
> > Kostas
> >
> > On Wed, Sep 25, 2019 at 2:44 AM Zili Chen  wrote:
> >>
> >>> Since Exceutor is a new interface, why is backward compatibility a 
> >>> concern?
> >>
> >> For backward compatibility, it is on (Stream)ExecutionEnvironment#execute.
> >> You're right that we don't stick to blocking to return a 
> >> JobExecutionResult in
> >> Executor aspect but implementing env.execute with a unique
> >>
> >> Executor#execute(or with suffix Async): CompletableFuture
> >>
> >> what do you think @Kostas Kloudas?
> >>
> >>> I could see that become an issue later when replacing Executor execute 
> >>> with
> >>> executeAsync. Or are both targeted for 1.10?
> >>
> >> IIUC both Executors and JobClient are targeted for 1.10.
> >>
> >>
> >> Thomas Weise  于2019年9月25日周三 上午2:39写道:
> >>>
> >>> Since Exceutor is a new interface, why is backward compatibility a 
> >>> concern?
> >>>
> >>> I could see that become an issue later when replacing Executor execute 
> >>> with
> >>> executeAsync. Or are both targeted for 1.10?
> >>>
> >>>
> >>> On Tue, Sep 24, 2019 at 10:24 AM Zili Chen  wrote:
> >>>
>  Hi Thomas,
> 
> > Should the new Executor execute method be defined as asynchronous? It
>  could
> > return a job handle to interact with the job and the legacy environments
> > can still block to retain their semantics.
> 
>  During our discussion there will be a method
> 
>  executeAsync(...): CompletableFuture
> 
>  where JobClient can be regarded as job handle in your context.
> 
>  I think we remain
> 
>  execute(...): JobExecutionResult
> 
>  just for backward compatibility because this effort towards 1.10 which is
>  not a
>  major version bump.
> 
>  BTW, I am drafting details of JobClient(as FLIP-74). Will start a 
>  separated
>  discussion
>  thread on that interface as soon as I finish an early version.
> 
>  Best,
>  tison.
> 
> 
>  Thomas Weise  于2019年9月25日周三 上午1:17写道:
> 
> > Thanks for the proposal. These changes will make it significantly easier
>  to
> > programmatically use Flink in downstream frameworks.
> >
> > Should the new Executor execute method be defined as asynchronous? It
>  could
> > return a job handle to interact with the job and the legacy environments
> > can still block to retain their semantics.
> >
> > (The blocking execution has also made things more difficult in Beam, we
> > could simply switch to use Executor directly.)
> >
> > Thomas
> >
> >
> > On Tue, Sep 24, 2019 at 6:48 AM Kostas Kloudas 
> > wrote:
> >
> >> Hi all,
> >>
> >> In the context of the discussion about introducing the Job Client API
> > [1],
> >> there was a side-discussion about refactoring the way users submit jobs
> > in
> >> Flink. There were many different interesting ideas on the topic and 3
> >> design documents that were trying to tackle both the issue about code
> >> submission and the Job Client API.
> >>
> >> This discussion thread aims at the job submission part and proposes the
> >> approach of introducing the Executor abstraction which will abstract
>  the
> >> job submission logic from the Environments and will make it API
>  agnostic.
> >>
> >> The FLIP can be found at [2].
> >>
> >> Please keep the discussion here, in the mailing list.
> >>
> >> Looking forward to your opinions,
> >> Kostas
> >>
> >> [1]
> >>

Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-09-25 Thread Aljoscha Krettek
Hi,

I’m fine with either signature for the new execute() method but I think we 
should focus on the executor discovery and executor configuration part in this 
FLIP while FLIP-74 is about the evolution of the method signature to return a 
future.

I understand that it’s a bit weird, that this FLIP introduces a new interface 
only to be changed within the same Flink release in a follow-up FLIP. But I 
think we can still do it. What do you think?

Best,
Aljoscha

> On 25. Sep 2019, at 10:11, Kostas Kloudas  wrote:
> 
> Hi Thomas and Zili,
> 
> As you both said the Executor is a new addition so there are no
> compatibility concerns.
> Backwards compatibility comes into play on the
> (Stream)ExecutionEnvironment#execute().
> 
> This method has to stay and keep having the same (blocking) semantics,
> but we can
> add a new one, sth along the lines of executeAsync() that will return
> the JobClient and
> will allow the caller to interact with the job.
> 
> Cheers,
> Kostas
> 
> On Wed, Sep 25, 2019 at 2:44 AM Zili Chen  wrote:
>> 
>>> Since Exceutor is a new interface, why is backward compatibility a concern?
>> 
>> For backward compatibility, it is on (Stream)ExecutionEnvironment#execute.
>> You're right that we don't stick to blocking to return a JobExecutionResult 
>> in
>> Executor aspect but implementing env.execute with a unique
>> 
>> Executor#execute(or with suffix Async): CompletableFuture
>> 
>> what do you think @Kostas Kloudas?
>> 
>>> I could see that become an issue later when replacing Executor execute with
>>> executeAsync. Or are both targeted for 1.10?
>> 
>> IIUC both Executors and JobClient are targeted for 1.10.
>> 
>> 
>> Thomas Weise  于2019年9月25日周三 上午2:39写道:
>>> 
>>> Since Exceutor is a new interface, why is backward compatibility a concern?
>>> 
>>> I could see that become an issue later when replacing Executor execute with
>>> executeAsync. Or are both targeted for 1.10?
>>> 
>>> 
>>> On Tue, Sep 24, 2019 at 10:24 AM Zili Chen  wrote:
>>> 
 Hi Thomas,
 
> Should the new Executor execute method be defined as asynchronous? It
 could
> return a job handle to interact with the job and the legacy environments
> can still block to retain their semantics.
 
 During our discussion there will be a method
 
 executeAsync(...): CompletableFuture
 
 where JobClient can be regarded as job handle in your context.
 
 I think we remain
 
 execute(...): JobExecutionResult
 
 just for backward compatibility because this effort towards 1.10 which is
 not a
 major version bump.
 
 BTW, I am drafting details of JobClient(as FLIP-74). Will start a separated
 discussion
 thread on that interface as soon as I finish an early version.
 
 Best,
 tison.
 
 
 Thomas Weise  于2019年9月25日周三 上午1:17写道:
 
> Thanks for the proposal. These changes will make it significantly easier
 to
> programmatically use Flink in downstream frameworks.
> 
> Should the new Executor execute method be defined as asynchronous? It
 could
> return a job handle to interact with the job and the legacy environments
> can still block to retain their semantics.
> 
> (The blocking execution has also made things more difficult in Beam, we
> could simply switch to use Executor directly.)
> 
> Thomas
> 
> 
> On Tue, Sep 24, 2019 at 6:48 AM Kostas Kloudas 
> wrote:
> 
>> Hi all,
>> 
>> In the context of the discussion about introducing the Job Client API
> [1],
>> there was a side-discussion about refactoring the way users submit jobs
> in
>> Flink. There were many different interesting ideas on the topic and 3
>> design documents that were trying to tackle both the issue about code
>> submission and the Job Client API.
>> 
>> This discussion thread aims at the job submission part and proposes the
>> approach of introducing the Executor abstraction which will abstract
 the
>> job submission logic from the Environments and will make it API
 agnostic.
>> 
>> The FLIP can be found at [2].
>> 
>> Please keep the discussion here, in the mailing list.
>> 
>> Looking forward to your opinions,
>> Kostas
>> 
>> [1]
>> 
>> 
> 
 https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>> [2]
>> 
>> 
> 
 https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
>> 
> 
 



Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-09-25 Thread Kostas Kloudas
Hi Thomas and Zili,

As you both said the Executor is a new addition so there are no
compatibility concerns.
Backwards compatibility comes into play on the
(Stream)ExecutionEnvironment#execute().

This method has to stay and keep having the same (blocking) semantics,
but we can
add a new one, sth along the lines of executeAsync() that will return
the JobClient and
will allow the caller to interact with the job.

Cheers,
Kostas

On Wed, Sep 25, 2019 at 2:44 AM Zili Chen  wrote:
>
> >Since Exceutor is a new interface, why is backward compatibility a concern?
>
> For backward compatibility, it is on (Stream)ExecutionEnvironment#execute.
> You're right that we don't stick to blocking to return a JobExecutionResult in
> Executor aspect but implementing env.execute with a unique
>
> Executor#execute(or with suffix Async): CompletableFuture
>
> what do you think @Kostas Kloudas?
>
> >I could see that become an issue later when replacing Executor execute with
> >executeAsync. Or are both targeted for 1.10?
>
> IIUC both Executors and JobClient are targeted for 1.10.
>
>
> Thomas Weise  于2019年9月25日周三 上午2:39写道:
>>
>> Since Exceutor is a new interface, why is backward compatibility a concern?
>>
>> I could see that become an issue later when replacing Executor execute with
>> executeAsync. Or are both targeted for 1.10?
>>
>>
>> On Tue, Sep 24, 2019 at 10:24 AM Zili Chen  wrote:
>>
>> > Hi Thomas,
>> >
>> > >Should the new Executor execute method be defined as asynchronous? It
>> > could
>> > >return a job handle to interact with the job and the legacy environments
>> > >can still block to retain their semantics.
>> >
>> > During our discussion there will be a method
>> >
>> > executeAsync(...): CompletableFuture
>> >
>> > where JobClient can be regarded as job handle in your context.
>> >
>> > I think we remain
>> >
>> > execute(...): JobExecutionResult
>> >
>> > just for backward compatibility because this effort towards 1.10 which is
>> > not a
>> > major version bump.
>> >
>> > BTW, I am drafting details of JobClient(as FLIP-74). Will start a separated
>> > discussion
>> > thread on that interface as soon as I finish an early version.
>> >
>> > Best,
>> > tison.
>> >
>> >
>> > Thomas Weise  于2019年9月25日周三 上午1:17写道:
>> >
>> > > Thanks for the proposal. These changes will make it significantly easier
>> > to
>> > > programmatically use Flink in downstream frameworks.
>> > >
>> > > Should the new Executor execute method be defined as asynchronous? It
>> > could
>> > > return a job handle to interact with the job and the legacy environments
>> > > can still block to retain their semantics.
>> > >
>> > > (The blocking execution has also made things more difficult in Beam, we
>> > > could simply switch to use Executor directly.)
>> > >
>> > > Thomas
>> > >
>> > >
>> > > On Tue, Sep 24, 2019 at 6:48 AM Kostas Kloudas 
>> > > wrote:
>> > >
>> > > > Hi all,
>> > > >
>> > > > In the context of the discussion about introducing the Job Client API
>> > > [1],
>> > > > there was a side-discussion about refactoring the way users submit jobs
>> > > in
>> > > > Flink. There were many different interesting ideas on the topic and 3
>> > > > design documents that were trying to tackle both the issue about code
>> > > > submission and the Job Client API.
>> > > >
>> > > > This discussion thread aims at the job submission part and proposes the
>> > > > approach of introducing the Executor abstraction which will abstract
>> > the
>> > > > job submission logic from the Environments and will make it API
>> > agnostic.
>> > > >
>> > > > The FLIP can be found at [2].
>> > > >
>> > > > Please keep the discussion here, in the mailing list.
>> > > >
>> > > > Looking forward to your opinions,
>> > > > Kostas
>> > > >
>> > > > [1]
>> > > >
>> > > >
>> > >
>> > https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>> > > > [2]
>> > > >
>> > > >
>> > >
>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
>> > > >
>> > >
>> >


Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-09-24 Thread Zili Chen
>Since Exceutor is a new interface, why is backward compatibility a concern?

For backward compatibility, it is on (Stream)ExecutionEnvironment#execute.
You're right that we don't stick to blocking to return a JobExecutionResult
in
Executor aspect but implementing env.execute with a unique

Executor#execute(or with suffix Async): CompletableFuture

what do you think @Kostas Kloudas ?

>I could see that become an issue later when replacing Executor execute with
>executeAsync. Or are both targeted for 1.10?

IIUC both Executors and JobClient are targeted for 1.10.


Thomas Weise  于2019年9月25日周三 上午2:39写道:

> Since Exceutor is a new interface, why is backward compatibility a concern?
>
> I could see that become an issue later when replacing Executor execute with
> executeAsync. Or are both targeted for 1.10?
>
>
> On Tue, Sep 24, 2019 at 10:24 AM Zili Chen  wrote:
>
> > Hi Thomas,
> >
> > >Should the new Executor execute method be defined as asynchronous? It
> > could
> > >return a job handle to interact with the job and the legacy environments
> > >can still block to retain their semantics.
> >
> > During our discussion there will be a method
> >
> > executeAsync(...): CompletableFuture
> >
> > where JobClient can be regarded as job handle in your context.
> >
> > I think we remain
> >
> > execute(...): JobExecutionResult
> >
> > just for backward compatibility because this effort towards 1.10 which is
> > not a
> > major version bump.
> >
> > BTW, I am drafting details of JobClient(as FLIP-74). Will start a
> separated
> > discussion
> > thread on that interface as soon as I finish an early version.
> >
> > Best,
> > tison.
> >
> >
> > Thomas Weise  于2019年9月25日周三 上午1:17写道:
> >
> > > Thanks for the proposal. These changes will make it significantly
> easier
> > to
> > > programmatically use Flink in downstream frameworks.
> > >
> > > Should the new Executor execute method be defined as asynchronous? It
> > could
> > > return a job handle to interact with the job and the legacy
> environments
> > > can still block to retain their semantics.
> > >
> > > (The blocking execution has also made things more difficult in Beam, we
> > > could simply switch to use Executor directly.)
> > >
> > > Thomas
> > >
> > >
> > > On Tue, Sep 24, 2019 at 6:48 AM Kostas Kloudas 
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > In the context of the discussion about introducing the Job Client API
> > > [1],
> > > > there was a side-discussion about refactoring the way users submit
> jobs
> > > in
> > > > Flink. There were many different interesting ideas on the topic and 3
> > > > design documents that were trying to tackle both the issue about code
> > > > submission and the Job Client API.
> > > >
> > > > This discussion thread aims at the job submission part and proposes
> the
> > > > approach of introducing the Executor abstraction which will abstract
> > the
> > > > job submission logic from the Environments and will make it API
> > agnostic.
> > > >
> > > > The FLIP can be found at [2].
> > > >
> > > > Please keep the discussion here, in the mailing list.
> > > >
> > > > Looking forward to your opinions,
> > > > Kostas
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > > > [2]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
> > > >
> > >
> >
>


Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-09-24 Thread Thomas Weise
Since Exceutor is a new interface, why is backward compatibility a concern?

I could see that become an issue later when replacing Executor execute with
executeAsync. Or are both targeted for 1.10?


On Tue, Sep 24, 2019 at 10:24 AM Zili Chen  wrote:

> Hi Thomas,
>
> >Should the new Executor execute method be defined as asynchronous? It
> could
> >return a job handle to interact with the job and the legacy environments
> >can still block to retain their semantics.
>
> During our discussion there will be a method
>
> executeAsync(...): CompletableFuture
>
> where JobClient can be regarded as job handle in your context.
>
> I think we remain
>
> execute(...): JobExecutionResult
>
> just for backward compatibility because this effort towards 1.10 which is
> not a
> major version bump.
>
> BTW, I am drafting details of JobClient(as FLIP-74). Will start a separated
> discussion
> thread on that interface as soon as I finish an early version.
>
> Best,
> tison.
>
>
> Thomas Weise  于2019年9月25日周三 上午1:17写道:
>
> > Thanks for the proposal. These changes will make it significantly easier
> to
> > programmatically use Flink in downstream frameworks.
> >
> > Should the new Executor execute method be defined as asynchronous? It
> could
> > return a job handle to interact with the job and the legacy environments
> > can still block to retain their semantics.
> >
> > (The blocking execution has also made things more difficult in Beam, we
> > could simply switch to use Executor directly.)
> >
> > Thomas
> >
> >
> > On Tue, Sep 24, 2019 at 6:48 AM Kostas Kloudas 
> > wrote:
> >
> > > Hi all,
> > >
> > > In the context of the discussion about introducing the Job Client API
> > [1],
> > > there was a side-discussion about refactoring the way users submit jobs
> > in
> > > Flink. There were many different interesting ideas on the topic and 3
> > > design documents that were trying to tackle both the issue about code
> > > submission and the Job Client API.
> > >
> > > This discussion thread aims at the job submission part and proposes the
> > > approach of introducing the Executor abstraction which will abstract
> the
> > > job submission logic from the Environments and will make it API
> agnostic.
> > >
> > > The FLIP can be found at [2].
> > >
> > > Please keep the discussion here, in the mailing list.
> > >
> > > Looking forward to your opinions,
> > > Kostas
> > >
> > > [1]
> > >
> > >
> >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > > [2]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
> > >
> >
>


Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-09-24 Thread Zili Chen
Hi Thomas,

>Should the new Executor execute method be defined as asynchronous? It could
>return a job handle to interact with the job and the legacy environments
>can still block to retain their semantics.

During our discussion there will be a method

executeAsync(...): CompletableFuture

where JobClient can be regarded as job handle in your context.

I think we remain

execute(...): JobExecutionResult

just for backward compatibility because this effort towards 1.10 which is
not a
major version bump.

BTW, I am drafting details of JobClient(as FLIP-74). Will start a separated
discussion
thread on that interface as soon as I finish an early version.

Best,
tison.


Thomas Weise  于2019年9月25日周三 上午1:17写道:

> Thanks for the proposal. These changes will make it significantly easier to
> programmatically use Flink in downstream frameworks.
>
> Should the new Executor execute method be defined as asynchronous? It could
> return a job handle to interact with the job and the legacy environments
> can still block to retain their semantics.
>
> (The blocking execution has also made things more difficult in Beam, we
> could simply switch to use Executor directly.)
>
> Thomas
>
>
> On Tue, Sep 24, 2019 at 6:48 AM Kostas Kloudas 
> wrote:
>
> > Hi all,
> >
> > In the context of the discussion about introducing the Job Client API
> [1],
> > there was a side-discussion about refactoring the way users submit jobs
> in
> > Flink. There were many different interesting ideas on the topic and 3
> > design documents that were trying to tackle both the issue about code
> > submission and the Job Client API.
> >
> > This discussion thread aims at the job submission part and proposes the
> > approach of introducing the Executor abstraction which will abstract the
> > job submission logic from the Environments and will make it API agnostic.
> >
> > The FLIP can be found at [2].
> >
> > Please keep the discussion here, in the mailing list.
> >
> > Looking forward to your opinions,
> > Kostas
> >
> > [1]
> >
> >
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> > [2]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
> >
>


Re: [DISCUSS] FLIP-73: Introducing Executors for job submission

2019-09-24 Thread Thomas Weise
Thanks for the proposal. These changes will make it significantly easier to
programmatically use Flink in downstream frameworks.

Should the new Executor execute method be defined as asynchronous? It could
return a job handle to interact with the job and the legacy environments
can still block to retain their semantics.

(The blocking execution has also made things more difficult in Beam, we
could simply switch to use Executor directly.)

Thomas


On Tue, Sep 24, 2019 at 6:48 AM Kostas Kloudas  wrote:

> Hi all,
>
> In the context of the discussion about introducing the Job Client API [1],
> there was a side-discussion about refactoring the way users submit jobs in
> Flink. There were many different interesting ideas on the topic and 3
> design documents that were trying to tackle both the issue about code
> submission and the Job Client API.
>
> This discussion thread aims at the job submission part and proposes the
> approach of introducing the Executor abstraction which will abstract the
> job submission logic from the Environments and will make it API agnostic.
>
> The FLIP can be found at [2].
>
> Please keep the discussion here, in the mailing list.
>
> Looking forward to your opinions,
> Kostas
>
> [1]
>
> https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
> [2]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission
>


[DISCUSS] FLIP-73: Introducing Executors for job submission

2019-09-24 Thread Kostas Kloudas
Hi all,

In the context of the discussion about introducing the Job Client API [1],
there was a side-discussion about refactoring the way users submit jobs in
Flink. There were many different interesting ideas on the topic and 3
design documents that were trying to tackle both the issue about code
submission and the Job Client API.

This discussion thread aims at the job submission part and proposes the
approach of introducing the Executor abstraction which will abstract the
job submission logic from the Environments and will make it API agnostic.

The FLIP can be found at [2].

Please keep the discussion here, in the mailing list.

Looking forward to your opinions,
Kostas

[1]
https://lists.apache.org/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-73%3A+Introducing+Executors+for+job+submission