This issue is another case where we have problems figuring out the boundaries and responsibilities between the ExecutionEnvironments and the ClusterClient.

I believe we should figure this out first, and decide whether the ClusterClient (or anything based on it) should be made public to accomodate use-cases such as this.

Personally, I believe the environments to be overloaded as is and would very much not want more features to be added to them.

On 09/05/2019 10:13, Flavio Pompermaier wrote:
Hi everybody,
any news on this? For us would be VERY helpful to have such a feature
because we need to execute a call to a REST service once a job ends.
Right now we do this after the env.execute() but this works only if the job
is submitted via the CLI client, the REST client doesn't execute anything
after env.execute().

Best,
Flavio




On Thu, Apr 25, 2019 at 3:12 PM Jeff Zhang <zjf...@gmail.com> wrote:

Hi  Beckett,

Thanks for your feedback, See my comments inline

  How do user specify the listener? *
What I proposal is to register JobListener in ExecutionEnvironment. I
don't think we should make ClusterClient as public api.

Where should the listener run? *
I don't think it is proper to run listener in JobMaster. The listener is
user code, and usually it is depends on user's other component. So running
it in client side make more sense to me.

What should be reported to the Listener? *
I am open to add other api in this JobListener. But for now, I am afraid
the ExecutionEnvironment is not aware of failover, so it is not possible to
report failover event.

What can the listeners do on notifications? *
Do you mean to pass JobGraph to these methods ? like following ( I am
afraid JobGraph is not a public and stable api, we should not expose it to
users)

public interface JobListener {

void onJobSubmitted(JobGraph graph, JobID jobId);

void onJobExecuted(JobGraph graph, JobExecutionResult jobResult);

void onJobCanceled(JobGraph graph, JobID jobId, String savepointPath);
}


Becket Qin <becket....@gmail.com> 于2019年4月25日周四 下午7:40写道:

Thanks for the proposal, Jeff. Adding a listener to allow users handle
events during the job lifecycle makes a lot of sense to me.

Here are my two cents.

* How do user specify the listener? *
It is not quite clear to me whether we consider ClusterClient as a public
interface? From what I understand ClusterClient is not a public interface
right now. In contrast, ExecutionEnvironment is the de facto interface for
administrative work. After job submission, it is essentially bound to a job
as an administrative handle. Given this current state, personally I feel
acceptable to have the listener registered to the ExecutionEnvironment.

* Where should the listener run? *
If the listener runs on the client side, the client have to be always
connected to the Flink cluster. This does not quite work if the Job is a
streaming job. Should we provide the option to run the listener in
JobMaster as well?

* What should be reported to the Listener? *
Besides the proposed APIs, does it make sense to also report events such
as failover?

* What can the listeners do on notifications? *
If the listeners are expected to do anything on the job, should some
helper class to manipulate the jobs be passed to the listener method?
Otherwise users may not be able to easily take action.

Thanks,

Jiangjie (Becket) Qin




On Wed, Apr 24, 2019 at 2:43 PM Jeff Zhang <zjf...@gmail.com> wrote:

Hi Till,

IMHO, allow adding hooks involves 2 steps.
1. Provide hook interface, and call these hook in flink (ClusterClient)
at the right place. This should be done by framework (flink)
2. Implement new hook implementation and add/register them into
framework(flink)

What I am doing is step 1 which should be done by flink, step 2 is done
by users. But IIUC, your suggestion of using custom ClusterClient seems
mixing these 2 steps together. Say I'd like to add new hooks, I have to
implement a new custom ClusterClient, add new hooks and call them in the
custom ClusterClient at the right place.
This doesn't make sense to me. For a user who want to add hooks, he is
not supposed to understand the mechanism of ClusterClient, and should not
touch ClusterClient. What do you think ?




Till Rohrmann <trohrm...@apache.org> 于2019年4月23日周二 下午4:24写道:

I think we should not expose the ClusterClient configuration via the
ExecutionEnvironment (env.getClusterClient().addJobListener) because this
is effectively the same as exposing the JobListener interface directly on
the ExecutionEnvironment. Instead I think it could be possible to provide a
ClusterClient factory which is picked up from the Configuration or some
other mechanism for example. That way it would not need to be exposed via
the ExecutionEnvironment at all.

Cheers,
Till

On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang <zjf...@gmail.com> wrote:

  The ExecutionEnvironment is usually used by the user who writes
the code and this person (I assume) would not be really interested in these
callbacks.

Usually ExecutionEnvironment is used by the user who write the code,
but it doesn't needs to be created and configured by this person. e.g. in
Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just
use ExecutionEnvironment to write flink program.  You are right that the
end user would not be interested in these callback, but the third party
library that integrate with zeppelin would be interested in these callbacks.

In your case, it could be sufficient to offer some hooks for the
ClusterClient or being able to provide a custom ClusterClient.

Actually in my initial PR (https://github.com/apache/flink/pull/8190),
I do pass JobListener to ClusterClient and invoke it there.
But IMHO, ClusterClient is not supposed be a public api for users.
Instead JobClient is the public api that user should use to control job. So
adding hooks to ClusterClient directly and provide a custom ClusterClient
doesn't make sense to me. IIUC, you are suggesting the following approach
      env.getClusterClient().addJobListener(jobListener)
but I don't see its benefit compared to this.
      env.addJobListener(jobListener)

Overall, I think adding hooks is orthogonal with fine grained job
control. And I agree that we should refactor the flink client component,
but I don't think it would affect the JobListener interface. What do you
think ?




Till Rohrmann <trohrm...@apache.org> 于2019年4月18日周四 下午8:57写道:

Thanks for starting this discussion Jeff. I can see the need for
additional hooks for third party integrations.

The thing I'm wondering is whether we really need/want to expose a
JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
usually used by the user who writes the code and this person (I assume)
would not be really interested in these callbacks. If he would, then one
should rather think about a better programmatic job control where the
`ExecutionEnvironment#execute` call returns a `JobClient` instance.
Moreover, we would effectively make this part of the public API and every
implementation would need to offer it.

In your case, it could be sufficient to offer some hooks for the
ClusterClient or being able to provide a custom ClusterClient. The
ClusterClient is the component responsible for the job submission and
retrieval of the job result and, hence, would be able to signal when a job
has been submitted or completed.

Cheers,
Till

On Thu, Apr 18, 2019 at 8:57 AM vino yang <yanghua1...@gmail.com>
wrote:

Hi Jeff,

I personally like this proposal. From the perspective of
programmability, the JobListener can make the third program more
appreciable.

The scene where I need the listener is the Flink cube engine for
Apache Kylin. In the case, the Flink job program is embedded into the
Kylin's executable context.

If we could have this listener, it would be easier to integrate with
Kylin.

Best,
Vino

Jeff Zhang <zjf...@gmail.com> 于2019年4月18日周四 下午1:30写道:

Hi All,

I created FLINK-12214
<https://issues.apache.org/jira/browse/FLINK-12214> for adding
JobListener (hook) in flink job lifecycle. Since this is a new public api
for flink, so I'd like to discuss it more widely in community to get more
feedback.

The background and motivation is that I am integrating flink into apache
zeppelin <http://zeppelin.apache.org/>(which is a notebook in case
you don't know). And I'd like to capture some job context (like jobId) in
the lifecycle of flink job (submission, executed, cancelled) so that I can
manipulate job in more fined grained control (e.g. I can capture the jobId
when job is submitted, and then associate it with one paragraph, and when
user click the cancel button, I can call the flink cancel api to cancel
this job)

I believe other projects which integrate flink would need similar
mechanism. I plan to add api addJobListener in
ExecutionEnvironment/StreamExecutionEnvironment so that user can add
customized hook in flink job lifecycle.

Here's draft interface JobListener.

public interface JobListener {

void onJobSubmitted(JobID jobId);

void onJobExecuted(JobExecutionResult jobResult);

void onJobCanceled(JobID jobId, String savepointPath);
}

Let me know your comment and concern, thanks.


--
Best Regards

Jeff Zhang

--
Best Regards

Jeff Zhang

--
Best Regards

Jeff Zhang

--
Best Regards

Jeff Zhang


Reply via email to