This is an automated email from the ASF dual-hosted git repository.

zjureel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 99cb5c7addc [FLINK-33212][core] add job status changed listener for 
lineage (#24754)
99cb5c7addc is described below

commit 99cb5c7addc932aba517ea4481934ffe2b162052
Author: Peter Huang <huangzhenqiu0...@gmail.com>
AuthorDate: Mon Jul 1 22:16:16 2024 -0700

    [FLINK-33212][core] add job status changed listener for lineage (#24754)
---
 .../deployment/advanced/job_status_listener.md     |  81 +++++++
 .../generated/deployment_configuration.html        |   6 +
 .../application/executors/EmbeddedExecutor.java    |  33 ++-
 .../executors/EmbeddedExecutorFactory.java         |   1 +
 .../executors/WebSubmissionExecutorFactory.java    |   1 +
 .../executors/AbstractSessionClusterExecutor.java  |  37 ++-
 .../client/deployment/executors/LocalExecutor.java |  31 ++-
 .../executors/PipelineExecutorUtils.java           |  41 ++++
 .../deployment/executors/RemoteExecutor.java       |   5 +-
 .../executors/RemoteExecutorFactory.java           |   2 +-
 .../flink/configuration/DeploymentOptions.java     |   8 +
 .../execution/DefaultJobExecutionStatusEvent.java  |  74 ++++++
 .../core/execution/JobExecutionStatusEvent.java    |  28 ++-
 .../core/execution/JobStatusChangedEvent.java      |  21 +-
 .../core/execution/JobStatusChangedListener.java   |  24 +-
 .../execution/JobStatusChangedListenerFactory.java |  54 +++++
 .../execution/JobStatusChangedListenerUtils.java   |  79 ++++++
 .../KubernetesSessionClusterExecutor.java          |   5 +-
 .../KubernetesSessionClusterExecutorFactory.java   |   2 +-
 .../executiongraph/DefaultExecutionGraph.java      |  22 +-
 .../DefaultExecutionGraphBuilder.java              |   9 +-
 .../flink/streaming/api/graph/StreamGraph.java     |   6 +
 .../runtime/execution/DefaultJobCreatedEvent.java  |  64 +++++
 .../runtime/execution/JobCreatedEvent.java         |  25 +-
 .../MiniClusterPipelineExecutorServiceLoader.java  |  27 ++-
 .../execution/JobStatusChangedListenerITCase.java  | 266 +++++++++++++++++++++
 .../yarn/executors/YarnSessionClusterExecutor.java |   5 +-
 .../YarnSessionClusterExecutorFactory.java         |   2 +-
 28 files changed, 888 insertions(+), 71 deletions(-)

diff --git a/docs/content/docs/deployment/advanced/job_status_listener.md 
b/docs/content/docs/deployment/advanced/job_status_listener.md
new file mode 100644
index 00000000000..723cc862594
--- /dev/null
+++ b/docs/content/docs/deployment/advanced/job_status_listener.md
@@ -0,0 +1,81 @@
+
+---
+title: "Job Status Changed Listener"
+nav-title: job-status-listener
+nav-parent_id: advanced
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Job status changed listener
+Flink provides a pluggable interface for users to register their custom logic 
for handling with the job status changes in which lineage info about 
source/sink is provided.
+This enables users to implement their own flink lineage reporter to send 
lineage info to third party data lineage systems for example Datahub and 
Openlineage.
+
+The job status changed listeners are triggered every time status change 
happened for the application. The data lineage info is included in the 
JobCreatedEvent.
+
+### Implement a plugin for your custom enricher
+
+To implement a custom JobStatusChangedListener plugin, you need to:
+
+- Add your own JobStatusChangedListener by implementing the {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListener.java"
 name="JobStatusChangedListener" >}} interface.
+
+- Add your own JobStatusChangedListenerFactory by implementing the {{< gh_link 
file="/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerFactory.java"
 name="JobStatusChangedListenerFactory" >}} interface.
+
+- Add a service entry. Create a file 
`META-INF/services/org.apache.flink.core.execution.JobStatusChangedListenerFactory`
 which contains the class name of your job status changed listener factory 
class (see [Java Service 
Loader](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/ServiceLoader.html)
 docs for more details).
+
+
+Then, create a jar which includes your `JobStatusChangedListener`, 
`JobStatusChangedListenerFactory`, `META-INF/services/` and all external 
dependencies.
+Make a directory in `plugins/` of your Flink distribution with an arbitrary 
name, e.g. "job-status-changed-listener", and put the jar into this directory.
+See [Flink Plugin]({{< ref "docs/deployment/filesystems/plugins" >}}) for more 
details.
+
+JobStatusChangedListenerFactory example:
+
+``` java
+package org.apache.flink.test.execution;
+
+public static class TestingJobStatusChangedListenerFactory
+        implements JobStatusChangedListenerFactory {
+
+    @Override
+    public JobStatusChangedListener createListener(Context context) {
+        return new TestingJobStatusChangedListener();
+    }
+}
+```
+
+JobStatusChangedListener example:
+
+``` java
+package org.apache.flink.test.execution;
+
+private static class TestingJobStatusChangedListener implements 
JobStatusChangedListener {
+
+    @Override
+    public void onEvent(JobStatusChangedEvent event) {
+        statusChangedEvents.add(event);
+    }
+}
+```
+
+### Configuration
+
+Flink components loads JobStatusChangedListener plugins at startup. To make 
sure your JobStatusChangedListeners are loaded all class names should be 
defined as part of [execution.job-status-changed-listeners]({{< ref 
"docs/deployment/config#execution.job-status-changed-listeners" >}}).
+  If this configuration is empty, NO enrichers will be started. Example:
+```
+    execution.job-status-changed-listeners = 
org.apache.flink.test.execution.TestingJobStatusChangedListenerFactory
+```
diff --git a/docs/layouts/shortcodes/generated/deployment_configuration.html 
b/docs/layouts/shortcodes/generated/deployment_configuration.html
index 07daa602a07..f3e08ecd696 100644
--- a/docs/layouts/shortcodes/generated/deployment_configuration.html
+++ b/docs/layouts/shortcodes/generated/deployment_configuration.html
@@ -20,6 +20,12 @@
             <td>List&lt;String&gt;</td>
             <td>Custom JobListeners to be registered with the execution 
environment. The registered listeners cannot have constructors with 
arguments.</td>
         </tr>
+        <tr>
+            <td><h5>execution.job-status-changed-listeners</h5></td>
+            <td style="word-wrap: break-word;">(none)</td>
+            <td>List&lt;String&gt;</td>
+            <td>When job is created or its status is changed, Flink will 
generate job event and notify job status changed listener.</td>
+        </tr>
         <tr>
             <td><h5>execution.program-config.enabled</h5></td>
             <td style="word-wrap: break-word;">true</td>
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
index 7094c1cac13..408bc69ad31 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
@@ -27,12 +27,15 @@ import 
org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptionsInternal;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobStatusChangedListener;
+import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
 import org.apache.flink.core.execution.PipelineExecutor;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.client.ClientUtils;
 import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.function.FunctionUtils;
 
 import org.slf4j.Logger;
@@ -41,9 +44,12 @@ import org.slf4j.LoggerFactory;
 import java.net.InetSocketAddress;
 import java.net.MalformedURLException;
 import java.util.Collection;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -57,6 +63,10 @@ public class EmbeddedExecutor implements PipelineExecutor {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(EmbeddedExecutor.class);
 
+    private final ExecutorService executorService =
+            Executors.newFixedThreadPool(
+                    1, new 
ExecutorThreadFactory("Flink-EmbeddedClusterExecutor-IO"));
+
     public static final String NAME = "embedded";
 
     private final Collection<JobID> submittedJobIds;
@@ -65,6 +75,8 @@ public class EmbeddedExecutor implements PipelineExecutor {
 
     private final EmbeddedJobClientCreator jobClientCreator;
 
+    private final List<JobStatusChangedListener> jobStatusChangedListeners;
+
     /**
      * Creates a {@link EmbeddedExecutor}.
      *
@@ -73,14 +85,22 @@ public class EmbeddedExecutor implements PipelineExecutor {
      *     caller.
      * @param dispatcherGateway the dispatcher of the cluster which is going 
to be used to submit
      *     jobs.
+     * @param configuration the flink application configuration
+     * @param jobClientCreator the job client creator
      */
     public EmbeddedExecutor(
             final Collection<JobID> submittedJobIds,
             final DispatcherGateway dispatcherGateway,
+            final Configuration configuration,
             final EmbeddedJobClientCreator jobClientCreator) {
         this.submittedJobIds = checkNotNull(submittedJobIds);
         this.dispatcherGateway = checkNotNull(dispatcherGateway);
         this.jobClientCreator = checkNotNull(jobClientCreator);
+        this.jobStatusChangedListeners =
+                JobStatusChangedListenerUtils.createJobStatusChangedListeners(
+                        Thread.currentThread().getContextClassLoader(),
+                        configuration,
+                        executorService);
     }
 
     @Override
@@ -153,7 +173,18 @@ public class EmbeddedExecutor implements PipelineExecutor {
                                     return jobId;
                                 }))
                 .thenApplyAsync(
-                        jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader));
+                        jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader))
+                .whenCompleteAsync(
+                        (jobClient, throwable) -> {
+                            if (throwable == null) {
+                                PipelineExecutorUtils.notifyJobStatusListeners(
+                                        pipeline, jobGraph, 
jobStatusChangedListeners);
+                            } else {
+                                LOG.error(
+                                        "Failed to submit job graph to 
application cluster",
+                                        throwable);
+                            }
+                        });
     }
 
     private static CompletableFuture<JobID> submitJob(
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
index 0810c1e1c06..790fffb0a1f 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutorFactory.java
@@ -79,6 +79,7 @@ public class EmbeddedExecutorFactory implements 
PipelineExecutorFactory {
         return new EmbeddedExecutor(
                 submittedJobIds,
                 dispatcherGateway,
+                configuration,
                 (jobId, userCodeClassloader) -> {
                     final Time timeout =
                             Time.milliseconds(
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/WebSubmissionExecutorFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/WebSubmissionExecutorFactory.java
index f087b5f30db..16e3bf05813 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/WebSubmissionExecutorFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/WebSubmissionExecutorFactory.java
@@ -75,6 +75,7 @@ public class WebSubmissionExecutorFactory implements 
PipelineExecutorFactory {
         return new EmbeddedExecutor(
                 submittedJobIds,
                 dispatcherGateway,
+                configuration,
                 (jobId, userCodeClassloader) -> new 
WebSubmissionJobClient(jobId));
     }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
index 9397e506ccf..7c22131a4d5 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java
@@ -29,16 +29,25 @@ import 
org.apache.flink.client.program.ClusterClientProvider;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.CacheSupportedPipelineExecutor;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobStatusChangedListener;
+import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
 import org.apache.flink.core.execution.PipelineExecutor;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.function.FunctionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import javax.annotation.Nonnull;
 
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -55,11 +64,24 @@ import static 
org.apache.flink.util.Preconditions.checkState;
 public class AbstractSessionClusterExecutor<
                 ClusterID, ClientFactory extends 
ClusterClientFactory<ClusterID>>
         implements CacheSupportedPipelineExecutor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractSessionClusterExecutor.class);
+    private final ExecutorService executorService =
+            Executors.newFixedThreadPool(
+                    1, new 
ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));
 
     private final ClientFactory clusterClientFactory;
+    private final Configuration configuration;
+    private final List<JobStatusChangedListener> jobStatusChangedListeners;
 
-    public AbstractSessionClusterExecutor(@Nonnull final ClientFactory 
clusterClientFactory) {
+    public AbstractSessionClusterExecutor(
+            @Nonnull final ClientFactory clusterClientFactory, Configuration 
configuration) {
         this.clusterClientFactory = checkNotNull(clusterClientFactory);
+        this.configuration = configuration;
+        this.jobStatusChangedListeners =
+                JobStatusChangedListenerUtils.createJobStatusChangedListeners(
+                        Thread.currentThread().getContextClassLoader(),
+                        configuration,
+                        executorService);
     }
 
     @Override
@@ -97,7 +119,18 @@ public class AbstractSessionClusterExecutor<
                                                     clusterClientProvider,
                                                     jobID,
                                                     userCodeClassloader))
-                    .whenCompleteAsync((ignored1, ignored2) -> 
clusterClient.close());
+                    .whenCompleteAsync(
+                            (jobClient, throwable) -> {
+                                if (throwable == null) {
+                                    
PipelineExecutorUtils.notifyJobStatusListeners(
+                                            pipeline, jobGraph, 
jobStatusChangedListeners);
+                                } else {
+                                    LOG.error(
+                                            "Failed to submit job graph to 
remote session cluster.",
+                                            throwable);
+                                }
+                                clusterClient.close();
+                            });
         }
     }
 
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
index b8b60fe9adf..27e1043a785 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
@@ -26,13 +26,22 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobStatusChangedListener;
+import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
 import org.apache.flink.core.execution.PipelineExecutor;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.MalformedURLException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -41,11 +50,15 @@ import static 
org.apache.flink.util.Preconditions.checkState;
 /** An {@link PipelineExecutor} for executing a {@link Pipeline} locally. */
 @Internal
 public class LocalExecutor implements PipelineExecutor {
+    private static final Logger LOG = 
LoggerFactory.getLogger(LocalExecutor.class);
+    private final ExecutorService executorService =
+            Executors.newFixedThreadPool(1, new 
ExecutorThreadFactory("Flink-LocalExecutor-IO"));
 
     public static final String NAME = "local";
 
     private final Configuration configuration;
     private final Function<MiniClusterConfiguration, MiniCluster> 
miniClusterFactory;
+    private final List<JobStatusChangedListener> jobStatusChangedListeners;
 
     public static LocalExecutor create(Configuration configuration) {
         return new LocalExecutor(configuration, MiniCluster::new);
@@ -62,6 +75,11 @@ public class LocalExecutor implements PipelineExecutor {
             Function<MiniClusterConfiguration, MiniCluster> 
miniClusterFactory) {
         this.configuration = configuration;
         this.miniClusterFactory = miniClusterFactory;
+        this.jobStatusChangedListeners =
+                JobStatusChangedListenerUtils.createJobStatusChangedListeners(
+                        Thread.currentThread().getContextClassLoader(),
+                        configuration,
+                        executorService);
     }
 
     @Override
@@ -81,7 +99,18 @@ public class LocalExecutor implements PipelineExecutor {
         final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, 
userCodeClassloader);
 
         return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, 
miniClusterFactory)
-                .submitJob(jobGraph, userCodeClassloader);
+                .submitJob(jobGraph, userCodeClassloader)
+                .whenComplete(
+                        (ignored, throwable) -> {
+                            if (throwable == null) {
+                                PipelineExecutorUtils.notifyJobStatusListeners(
+                                        pipeline, jobGraph, 
jobStatusChangedListeners);
+                            } else {
+                                LOG.error(
+                                        "Failed to submit job graph to local 
mini cluster.",
+                                        throwable);
+                            }
+                        });
     }
 
     private JobGraph getJobGraph(
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
index 2981492a9cd..3617bc01ba5 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
@@ -19,23 +19,33 @@
 package org.apache.flink.client.deployment.executors;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.cli.ClientOptions;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.configuration.PipelineOptionsInternal;
+import org.apache.flink.core.execution.JobStatusChangedListener;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
 import java.net.MalformedURLException;
+import java.util.List;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Utility class with method related to job execution. */
 public class PipelineExecutorUtils {
+    private static final Logger LOG = 
LoggerFactory.getLogger(PipelineExecutorUtils.class);
 
     /**
      * Creates the {@link JobGraph} corresponding to the provided {@link 
Pipeline}.
@@ -80,4 +90,35 @@ public class PipelineExecutorUtils {
 
         return jobGraph;
     }
+
+    /**
+     * Notify the {@link DefaultJobCreatedEvent} to job status changed 
listeners.
+     *
+     * @param pipeline the pipeline that contains lineage graph information.
+     * @param jobGraph jobGraph that contains job basic info
+     * @param listeners the list of job status changed listeners
+     */
+    public static void notifyJobStatusListeners(
+            @Nonnull final Pipeline pipeline,
+            @Nonnull final JobGraph jobGraph,
+            List<JobStatusChangedListener> listeners) {
+        RuntimeExecutionMode executionMode =
+                
jobGraph.getJobConfiguration().get(ExecutionOptions.RUNTIME_MODE);
+        listeners.forEach(
+                listener -> {
+                    try {
+                        listener.onEvent(
+                                new DefaultJobCreatedEvent(
+                                        jobGraph.getJobID(),
+                                        jobGraph.getName(),
+                                        ((StreamGraph) 
pipeline).getLineageGraph(),
+                                        executionMode));
+                    } catch (Throwable e) {
+                        LOG.error(
+                                "Fail to notify job status changed listener 
{}",
+                                listener.getClass().getName(),
+                                e);
+                    }
+                });
+    }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
index 761d3cc9141..cbf66cd5edb 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
@@ -21,6 +21,7 @@ package org.apache.flink.client.deployment.executors;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.client.deployment.StandaloneClientFactory;
 import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.PipelineExecutor;
 
 /** The {@link PipelineExecutor} to be used when executing a job on an already 
running cluster. */
@@ -30,7 +31,7 @@ public class RemoteExecutor
 
     public static final String NAME = "remote";
 
-    public RemoteExecutor() {
-        super(new StandaloneClientFactory());
+    public RemoteExecutor(Configuration configuration) {
+        super(new StandaloneClientFactory(), configuration);
     }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutorFactory.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutorFactory.java
index 5548156aa0c..bbcb7465e35 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutorFactory.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutorFactory.java
@@ -40,6 +40,6 @@ public class RemoteExecutorFactory implements 
PipelineExecutorFactory {
 
     @Override
     public PipelineExecutor getExecutor(final Configuration configuration) {
-        return new RemoteExecutor();
+        return new RemoteExecutor(configuration);
     }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
index f6ca0d8a345..aeb58a59bc5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/DeploymentOptions.java
@@ -78,6 +78,14 @@ public class DeploymentOptions {
                             "Custom JobListeners to be registered with the 
execution environment."
                                     + " The registered listeners cannot have 
constructors with arguments.");
 
+    public static final ConfigOption<List<String>> 
JOB_STATUS_CHANGED_LISTENERS =
+            key("execution.job-status-changed-listeners")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription(
+                            "When job is created or its status is changed, 
Flink will generate job event and notify job status changed listener.");
+
     public static final ConfigOption<Boolean> SHUTDOWN_ON_APPLICATION_FINISH =
             ConfigOptions.key("execution.shutdown-on-application-finish")
                     .booleanType()
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultJobExecutionStatusEvent.java
 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultJobExecutionStatusEvent.java
new file mode 100644
index 00000000000..d8cb2da4568
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultJobExecutionStatusEvent.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.execution;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+
+import javax.annotation.Nullable;
+
+/** Default implementation for {@link JobExecutionStatusEvent}. */
+@Internal
+public class DefaultJobExecutionStatusEvent implements JobExecutionStatusEvent 
{
+    private final JobID jobId;
+    private final String jobName;
+    private final JobStatus oldStatus;
+    private final JobStatus newStatus;
+    @Nullable private final Throwable cause;
+
+    public DefaultJobExecutionStatusEvent(
+            JobID jobId,
+            String jobName,
+            JobStatus oldStatus,
+            JobStatus newStatus,
+            @Nullable Throwable cause) {
+        this.jobId = jobId;
+        this.jobName = jobName;
+        this.oldStatus = oldStatus;
+        this.newStatus = newStatus;
+        this.cause = cause;
+    }
+
+    @Override
+    public JobStatus oldStatus() {
+        return oldStatus;
+    }
+
+    @Override
+    public JobStatus newStatus() {
+        return newStatus;
+    }
+
+    @Nullable
+    @Override
+    public Throwable exception() {
+        return cause;
+    }
+
+    @Override
+    public JobID jobId() {
+        return jobId;
+    }
+
+    @Override
+    public String jobName() {
+        return jobName;
+    }
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobExecutionStatusEvent.java
similarity index 55%
copy from 
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
copy to 
flink-core/src/main/java/org/apache/flink/core/execution/JobExecutionStatusEvent.java
index 761d3cc9141..82aeda5c6df 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobExecutionStatusEvent.java
@@ -16,21 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.core.execution;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.StandaloneClientFactory;
-import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.core.execution.PipelineExecutor;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobStatus;
 
-/** The {@link PipelineExecutor} to be used when executing a job on an already 
running cluster. */
-@Internal
-public class RemoteExecutor
-        extends AbstractSessionClusterExecutor<StandaloneClusterId, 
StandaloneClientFactory> {
+import javax.annotation.Nullable;
 
-    public static final String NAME = "remote";
+/** Job execution status event. */
+@PublicEvolving
+public interface JobExecutionStatusEvent extends JobStatusChangedEvent {
+    /** Old status for job. */
+    JobStatus oldStatus();
 
-    public RemoteExecutor() {
-        super(new StandaloneClientFactory());
-    }
+    /** New status for job. */
+    JobStatus newStatus();
+
+    /** Exception for job. */
+    @Nullable
+    Throwable exception();
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedEvent.java
similarity index 55%
copy from 
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
copy to 
flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedEvent.java
index 761d3cc9141..21d0717ed28 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedEvent.java
@@ -16,21 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.core.execution;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.StandaloneClientFactory;
-import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.core.execution.PipelineExecutor;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.JobID;
 
-/** The {@link PipelineExecutor} to be used when executing a job on an already 
running cluster. */
-@Internal
-public class RemoteExecutor
-        extends AbstractSessionClusterExecutor<StandaloneClusterId, 
StandaloneClientFactory> {
+/** Basic job status event. */
+@PublicEvolving
+public interface JobStatusChangedEvent {
 
-    public static final String NAME = "remote";
+    JobID jobId();
 
-    public RemoteExecutor() {
-        super(new StandaloneClientFactory());
-    }
+    String jobName();
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListener.java
similarity index 55%
copy from 
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
copy to 
flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListener.java
index 761d3cc9141..3610cd36055 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListener.java
@@ -16,21 +16,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.core.execution;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.StandaloneClientFactory;
-import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.core.execution.PipelineExecutor;
+import org.apache.flink.annotation.PublicEvolving;
 
-/** The {@link PipelineExecutor} to be used when executing a job on an already 
running cluster. */
-@Internal
-public class RemoteExecutor
-        extends AbstractSessionClusterExecutor<StandaloneClusterId, 
StandaloneClientFactory> {
-
-    public static final String NAME = "remote";
+/**
+ * When job is created or its status is changed, Flink will generate job event 
and notify job status
+ * changed listener.
+ */
+@PublicEvolving
+public interface JobStatusChangedListener {
 
-    public RemoteExecutor() {
-        super(new StandaloneClientFactory());
-    }
+    /* Event will be fired when job status is changed. */
+    void onEvent(JobStatusChangedEvent event);
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerFactory.java
 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerFactory.java
new file mode 100644
index 00000000000..03c1d49a0d6
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerFactory.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.execution;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.concurrent.Executor;
+
+/** Factory for job status changed listener. */
+@PublicEvolving
+public interface JobStatusChangedListenerFactory {
+
+    JobStatusChangedListener createListener(Context context);
+
+    @PublicEvolving
+    interface Context {
+        /*
+         * Configuration for the factory to create listener, users can add 
customized options to flink and get them here to create the listener. For
+         * example, users can add rest address for datahub to the 
configuration, and get it when they need to create http client for the listener.
+         */
+        Configuration getConfiguration();
+
+        /**
+         * User classloader for the flink application.
+         *
+         * @return
+         */
+        ClassLoader getUserClassLoader();
+
+        /*
+         * Get an Executor pool for the listener to run async operations that 
can potentially be IO-heavy. `JobMaster` will provide an independent executor
+         * for io operations and it won't block the main-thread. All tasks 
submitted to the executor will be executed in parallel, and when the job ends,
+         * previously submitted tasks will be executed, but no new tasks will 
be accepted.
+         */
+        Executor getIOExecutor();
+    }
+}
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerUtils.java
 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerUtils.java
new file mode 100644
index 00000000000..a73641d7ffd
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/core/execution/JobStatusChangedListenerUtils.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.execution;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.InstantiationUtil;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS;
+
+/** Util class for {@link JobStatusChangedListener}. */
+@Internal
+public final class JobStatusChangedListenerUtils {
+    /**
+     * Create job status changed listeners from configuration for job.
+     *
+     * @param configuration The job configuration.
+     * @return the job status changed listeners.
+     */
+    public static List<JobStatusChangedListener> 
createJobStatusChangedListeners(
+            ClassLoader userClassLoader, Configuration configuration, Executor 
ioExecutor) {
+        List<String> jobStatusChangedListeners = 
configuration.get(JOB_STATUS_CHANGED_LISTENERS);
+        if (jobStatusChangedListeners == null || 
jobStatusChangedListeners.isEmpty()) {
+            return Collections.emptyList();
+        }
+        return jobStatusChangedListeners.stream()
+                .map(
+                        fac -> {
+                            try {
+                                return InstantiationUtil.instantiate(
+                                                fac,
+                                                
JobStatusChangedListenerFactory.class,
+                                                userClassLoader)
+                                        .createListener(
+                                                new 
JobStatusChangedListenerFactory.Context() {
+                                                    @Override
+                                                    public Configuration 
getConfiguration() {
+                                                        return configuration;
+                                                    }
+
+                                                    @Override
+                                                    public ClassLoader 
getUserClassLoader() {
+                                                        return userClassLoader;
+                                                    }
+
+                                                    @Override
+                                                    public Executor 
getIOExecutor() {
+                                                        return ioExecutor;
+                                                    }
+                                                });
+                            } catch (FlinkException e) {
+                                throw new RuntimeException(e);
+                            }
+                        })
+                .collect(Collectors.toList());
+    }
+}
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutor.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutor.java
index c9edef90777..84eba0e1565 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutor.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutor.java
@@ -20,6 +20,7 @@ package org.apache.flink.kubernetes.executors;
 
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.PipelineExecutor;
 import org.apache.flink.kubernetes.KubernetesClusterClientFactory;
 import org.apache.flink.kubernetes.configuration.KubernetesDeploymentTarget;
@@ -31,7 +32,7 @@ public class KubernetesSessionClusterExecutor
 
     public static final String NAME = 
KubernetesDeploymentTarget.SESSION.getName();
 
-    public KubernetesSessionClusterExecutor() {
-        super(new KubernetesClusterClientFactory());
+    public KubernetesSessionClusterExecutor(Configuration configuration) {
+        super(new KubernetesClusterClientFactory(), configuration);
     }
 }
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutorFactory.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutorFactory.java
index 90c6a62adf5..196bc201039 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutorFactory.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/executors/KubernetesSessionClusterExecutorFactory.java
@@ -44,6 +44,6 @@ public class KubernetesSessionClusterExecutorFactory 
implements PipelineExecutor
 
     @Override
     public PipelineExecutor getExecutor(@Nonnull final Configuration 
configuration) {
-        return new KubernetesSessionClusterExecutor();
+        return new KubernetesSessionClusterExecutor(configuration);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
index c53e227ec1e..afd36a2951f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
@@ -27,6 +27,8 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent;
+import org.apache.flink.core.execution.JobStatusChangedListener;
 import org.apache.flink.core.execution.JobStatusHook;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.SimpleCounter;
@@ -302,6 +304,8 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
 
     private final TaskDeploymentDescriptorFactory 
taskDeploymentDescriptorFactory;
 
+    private final List<JobStatusChangedListener> jobStatusChangedListeners;
+
     // 
--------------------------------------------------------------------------------------------
     //   Constructors
     // 
--------------------------------------------------------------------------------------------
@@ -327,7 +331,8 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
             ExecutionJobVertex.Factory executionJobVertexFactory,
             List<JobStatusHook> jobStatusHooks,
             MarkPartitionFinishedStrategy markPartitionFinishedStrategy,
-            TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory) {
+            TaskDeploymentDescriptorFactory taskDeploymentDescriptorFactory,
+            List<JobStatusChangedListener> jobStatusChangedListeners) {
 
         this.jobType = jobType;
         this.executionGraphId = new ExecutionGraphID();
@@ -398,6 +403,8 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
 
         this.taskDeploymentDescriptorFactory = 
checkNotNull(taskDeploymentDescriptorFactory);
 
+        this.jobStatusChangedListeners = 
checkNotNull(jobStatusChangedListeners);
+
         LOG.info(
                 "Created execution graph {} for job {}.",
                 executionGraphId,
@@ -1164,7 +1171,7 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
                     error);
 
             stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
-            notifyJobStatusChange(newState);
+            notifyJobStatusChange(current, newState, error);
             notifyJobStatusHooks(newState, error);
             return true;
         } else {
@@ -1600,7 +1607,8 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
         }
     }
 
-    private void notifyJobStatusChange(JobStatus newState) {
+    private void notifyJobStatusChange(
+            JobStatus oldState, JobStatus newState, @Nullable Throwable cause) 
{
         if (jobStatusListeners.size() > 0) {
             final long timestamp = System.currentTimeMillis();
 
@@ -1612,6 +1620,14 @@ public class DefaultExecutionGraph implements 
ExecutionGraph, InternalExecutionG
                 }
             }
         }
+
+        if (jobStatusChangedListeners.size() > 0) {
+            jobStatusChangedListeners.forEach(
+                    listener ->
+                            listener.onEvent(
+                                    new DefaultJobExecutionStatusEvent(
+                                            getJobID(), getJobName(), 
oldState, newState, cause)));
+        }
     }
 
     private void notifyJobStatusHooks(JobStatus newState, @Nullable Throwable 
cause) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
index 57be7f49abf..30b5af6e3f7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphBuilder.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.core.execution.JobStatusChangedListener;
+import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -141,6 +143,10 @@ public class DefaultExecutionGraphBuilder {
             throw new JobException("Could not create the 
TaskDeploymentDescriptorFactory.", e);
         }
 
+        final List<JobStatusChangedListener> jobStatusChangedListeners =
+                JobStatusChangedListenerUtils.createJobStatusChangedListeners(
+                        classLoader, jobManagerConfig, ioExecutor);
+
         // create a new execution graph, if none exists so far
         final DefaultExecutionGraph executionGraph =
                 new DefaultExecutionGraph(
@@ -164,7 +170,8 @@ public class DefaultExecutionGraphBuilder {
                         executionJobVertexFactory,
                         jobGraph.getJobStatusHooks(),
                         markPartitionFinishedStrategy,
-                        taskDeploymentDescriptorFactory);
+                        taskDeploymentDescriptorFactory,
+                        jobStatusChangedListeners);
 
         // set the basic properties
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index fb1deadff6e..408bf00fd11 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -46,6 +46,7 @@ import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.lineage.LineageGraph;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import org.apache.flink.streaming.api.operators.OutputFormatOperatorFactory;
 import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
@@ -134,6 +135,7 @@ public class StreamGraph implements Pipeline {
     private boolean dynamic;
 
     private boolean autoParallelismEnabled;
+    private LineageGraph lineageGraph;
 
     public StreamGraph(
             Configuration jobConfiguration,
@@ -228,6 +230,10 @@ public class StreamGraph implements Pipeline {
         this.timeCharacteristic = timeCharacteristic;
     }
 
+    public LineageGraph getLineageGraph() {
+        return lineageGraph;
+    }
+
     public GlobalStreamExchangeMode getGlobalStreamExchangeMode() {
         return globalExchangeMode;
     }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/DefaultJobCreatedEvent.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/DefaultJobCreatedEvent.java
new file mode 100644
index 00000000000..34ff064137c
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/DefaultJobCreatedEvent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.execution;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.lineage.LineageGraph;
+
+/** Default implementation for {@link JobCreatedEvent}. */
+@Internal
+public class DefaultJobCreatedEvent implements JobCreatedEvent {
+    private final JobID jobId;
+    private final String jobName;
+    private final LineageGraph lineageGraph;
+    private final RuntimeExecutionMode executionMode;
+
+    public DefaultJobCreatedEvent(
+            JobID jobId,
+            String jobName,
+            LineageGraph lineageGraph,
+            RuntimeExecutionMode executionMode) {
+        this.jobId = jobId;
+        this.jobName = jobName;
+        this.lineageGraph = lineageGraph;
+        this.executionMode = executionMode;
+    }
+
+    @Override
+    public JobID jobId() {
+        return jobId;
+    }
+
+    @Override
+    public String jobName() {
+        return jobName;
+    }
+
+    @Override
+    public LineageGraph lineageGraph() {
+        return lineageGraph;
+    }
+
+    @Override
+    public RuntimeExecutionMode executionMode() {
+        return executionMode;
+    }
+}
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/JobCreatedEvent.java
similarity index 55%
copy from 
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
copy to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/JobCreatedEvent.java
index 761d3cc9141..1d20dcd2108 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/RemoteExecutor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/execution/JobCreatedEvent.java
@@ -16,21 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.client.deployment.executors;
+package org.apache.flink.streaming.runtime.execution;
 
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.client.deployment.StandaloneClientFactory;
-import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.core.execution.PipelineExecutor;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.execution.JobStatusChangedEvent;
+import org.apache.flink.streaming.api.lineage.LineageGraph;
 
-/** The {@link PipelineExecutor} to be used when executing a job on an already 
running cluster. */
-@Internal
-public class RemoteExecutor
-        extends AbstractSessionClusterExecutor<StandaloneClusterId, 
StandaloneClientFactory> {
+/** Basic job created event. */
+@PublicEvolving
+public interface JobCreatedEvent extends JobStatusChangedEvent {
 
-    public static final String NAME = "remote";
+    /* Lineage for the current job. */
+    LineageGraph lineageGraph();
 
-    public RemoteExecutor() {
-        super(new StandaloneClientFactory());
-    }
+    /* Runtime execution mode for the job, STREAMING/BATCH/AUTOMATIC. */
+    RuntimeExecutionMode executionMode();
 }
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
index c86639f76b3..ddfe8a1a8f4 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterPipelineExecutorServiceLoader.java
@@ -27,6 +27,8 @@ import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.core.execution.CacheSupportedPipelineExecutor;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobStatusChangedListener;
+import org.apache.flink.core.execution.JobStatusChangedListenerUtils;
 import org.apache.flink.core.execution.PipelineExecutor;
 import org.apache.flink.core.execution.PipelineExecutorFactory;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
@@ -38,6 +40,7 @@ import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterJobClient;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,8 +49,11 @@ import java.io.IOException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.util.Collection;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Stream;
 
 /**
@@ -145,11 +151,19 @@ public class MiniClusterPipelineExecutorServiceLoader 
implements PipelineExecuto
     }
 
     private static class MiniClusterExecutor implements 
CacheSupportedPipelineExecutor {
-
+        private final ExecutorService executorService =
+                Executors.newFixedThreadPool(
+                        1, new 
ExecutorThreadFactory("Flink-MiniClusterExecutor-IO"));
         private final MiniCluster miniCluster;
+        private final List<JobStatusChangedListener> jobStatusChangedListeners;
 
         public MiniClusterExecutor(MiniCluster miniCluster) {
             this.miniCluster = miniCluster;
+            this.jobStatusChangedListeners =
+                    
JobStatusChangedListenerUtils.createJobStatusChangedListeners(
+                            Thread.currentThread().getContextClassLoader(),
+                            miniCluster.getConfiguration(),
+                            executorService);
         }
 
         @Override
@@ -165,6 +179,17 @@ public class MiniClusterPipelineExecutorServiceLoader 
implements PipelineExecuto
             }
             return miniCluster
                     .submitJob(jobGraph)
+                    .whenComplete(
+                            (ignored, throwable) -> {
+                                if (throwable == null) {
+                                    
PipelineExecutorUtils.notifyJobStatusListeners(
+                                            pipeline, jobGraph, 
jobStatusChangedListeners);
+                                } else {
+                                    LOG.error(
+                                            "Failed to submit job graph to 
mini cluster.",
+                                            throwable);
+                                }
+                            })
                     .thenApply(
                             result ->
                                     new MiniClusterJobClient(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
new file mode 100644
index 00000000000..dd58ef184dc
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.execution;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent;
+import org.apache.flink.core.execution.JobExecutionStatusEvent;
+import org.apache.flink.core.execution.JobStatusChangedEvent;
+import org.apache.flink.core.execution.JobStatusChangedListener;
+import org.apache.flink.core.execution.JobStatusChangedListenerFactory;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runners.MethodSorters;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForAllTaskRunning;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for job status changed listener. */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class JobStatusChangedListenerITCase extends TestLogger {
+    private static final int PARALLELISM = 4;
+    @ClassRule public static final TemporaryFolder TMP_FOLDER = new 
TemporaryFolder();
+
+    @ClassRule
+    public static final MiniClusterWithClientResource MINI_CLUSTER =
+            new MiniClusterWithClientResource(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setConfiguration(createConfiguration())
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(PARALLELISM)
+                            .build());
+
+    private static List<JobStatusChangedEvent> statusChangedEvents = new 
ArrayList<>();
+
+    @Before
+    public void setup() {
+        statusChangedEvents.clear();
+    }
+
+    @Test
+    public void testJobStatusChangedForSucceededApplication() throws Exception 
{
+        Configuration configuration = createConfiguration();
+        try (StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration)) {
+            List<String> sourceValues = Arrays.asList("a", "b", "c");
+            List<String> resultValues = new ArrayList<>();
+            try (CloseableIterator<String> iterator =
+                    env.fromCollection(sourceValues).executeAndCollect()) {
+                while (iterator.hasNext()) {
+                    resultValues.add(iterator.next());
+                }
+            }
+            
assertThat(resultValues).containsExactlyInAnyOrder(sourceValues.toArray(new 
String[0]));
+        }
+
+        verifyEventMetaData();
+        statusChangedEvents.forEach(
+                event -> {
+                    if (event instanceof DefaultJobExecutionStatusEvent) {
+                        JobExecutionStatusEvent status = 
(JobExecutionStatusEvent) event;
+                        assertThat(
+                                        (status.oldStatus() == 
JobStatus.CREATED
+                                                        && status.newStatus() 
== JobStatus.RUNNING)
+                                                || (status.oldStatus() == 
JobStatus.RUNNING
+                                                        && status.newStatus()
+                                                                == 
JobStatus.FINISHED))
+                                .isTrue();
+                    } else {
+                        DefaultJobCreatedEvent createdEvent = 
(DefaultJobCreatedEvent) event;
+                        assertThat(createdEvent.executionMode())
+                                .isEqualTo(RuntimeExecutionMode.STREAMING);
+                    }
+                });
+    }
+
+    @Test
+    public void testJobStatusChangedForFailedApplication() throws Exception {
+        Configuration configuration = createConfiguration();
+
+        try (StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration)) {
+            env.setParallelism(PARALLELISM);
+            env.getConfig().setRestartStrategy(RestartStrategies.noRestart());
+            env.addSource(new FastFailureSourceFunction()).addSink(new 
SleepingSink());
+
+            StreamGraph streamGraph = env.getStreamGraph();
+            JobGraph jobGraph = streamGraph.getJobGraph();
+
+            ClusterClient<?> client = MINI_CLUSTER.getClusterClient();
+            JobID jobID = client.submitJob(jobGraph).get();
+            while (!client.getJobStatus(jobID).get().equals(JobStatus.FAILED)) 
{}
+        } catch (Exception e) {
+            // Expected failure due to exception.
+        }
+
+        verifyEventMetaData();
+        statusChangedEvents.forEach(
+                event -> {
+                    JobExecutionStatusEvent status = (JobExecutionStatusEvent) 
event;
+                    assertThat(
+                                    (status.oldStatus() == JobStatus.CREATED
+                                                    && status.newStatus() == 
JobStatus.RUNNING)
+                                            || (status.oldStatus() == 
JobStatus.RUNNING
+                                                    && status.newStatus() == 
JobStatus.FAILING)
+                                            || (status.oldStatus() == 
JobStatus.FAILING
+                                                    && status.newStatus() == 
JobStatus.FAILED))
+                            .isTrue();
+                });
+    }
+
+    @Test
+    public void testJobStatusChangedForCancelledApplication() throws Exception 
{
+        Configuration configuration = createConfiguration();
+
+        try (StreamExecutionEnvironment env =
+                
StreamExecutionEnvironment.getExecutionEnvironment(configuration)) {
+            final DataStream<Long> source = env.addSource(new 
InfiniteLongSourceFunction());
+            source.addSink(new SleepingSink());
+
+            StreamGraph streamGraph = env.getStreamGraph();
+            JobGraph jobGraph = streamGraph.getJobGraph();
+
+            ClusterClient<?> client = MINI_CLUSTER.getClusterClient();
+            JobID jobID = client.submitJob(jobGraph).get();
+            waitForAllTaskRunning(MINI_CLUSTER.getMiniCluster(), jobID, false);
+
+            Thread.sleep(100);
+            client.cancel(jobID).get();
+            while 
(!client.getJobStatus(jobID).get().equals(JobStatus.CANCELED)) {}
+        }
+
+        verifyEventMetaData();
+        statusChangedEvents.forEach(
+                event -> {
+                    JobExecutionStatusEvent status = (JobExecutionStatusEvent) 
event;
+                    assertThat(
+                                    (status.oldStatus() == JobStatus.CREATED
+                                                    && status.newStatus() == 
JobStatus.RUNNING)
+                                            || (status.oldStatus() == 
JobStatus.RUNNING
+                                                    && status.newStatus() == 
JobStatus.CANCELLING)
+                                            || (status.oldStatus() == 
JobStatus.CANCELLING
+                                                    && status.newStatus() == 
JobStatus.CANCELED))
+                            .isTrue();
+                });
+    }
+
+    void verifyEventMetaData() {
+        assertThat(statusChangedEvents.size()).isEqualTo(3);
+        assertThat(statusChangedEvents.get(0).jobId())
+                .isEqualTo(statusChangedEvents.get(1).jobId());
+        assertThat(statusChangedEvents.get(0).jobName())
+                .isEqualTo(statusChangedEvents.get(1).jobName());
+
+        assertThat(statusChangedEvents.get(1).jobId())
+                .isEqualTo(statusChangedEvents.get(2).jobId());
+        assertThat(statusChangedEvents.get(1).jobName())
+                .isEqualTo(statusChangedEvents.get(2).jobName());
+    }
+
+    /** Testing job status changed listener factory. */
+    public static class TestingJobStatusChangedListenerFactory
+            implements JobStatusChangedListenerFactory {
+
+        @Override
+        public JobStatusChangedListener createListener(Context context) {
+            return new TestingJobStatusChangedListener();
+        }
+    }
+
+    /** Testing job status changed listener. */
+    private static class TestingJobStatusChangedListener implements 
JobStatusChangedListener {
+
+        @Override
+        public void onEvent(JobStatusChangedEvent event) {
+            statusChangedEvents.add(event);
+        }
+    }
+
+    private static Configuration createConfiguration() {
+        Configuration configuration = new Configuration();
+        configuration.set(
+                JOB_STATUS_CHANGED_LISTENERS,
+                
Collections.singletonList(TestingJobStatusChangedListenerFactory.class.getName()));
+
+        return configuration;
+    }
+
+    private static class FastFailureSourceFunction implements 
SourceFunction<Long> {
+
+        @Override
+        public void run(SourceContext<Long> ctx) throws Exception {
+            throw new RuntimeException("Failed to execute.");
+        }
+
+        @Override
+        public void cancel() {}
+    }
+
+    private static class InfiniteLongSourceFunction implements 
SourceFunction<Long> {
+        private volatile boolean running = true;
+
+        @Override
+        public void run(SourceContext<Long> ctx) throws Exception {
+            long next = 0;
+            while (running) {
+                synchronized (ctx.getCheckpointLock()) {
+                    ctx.collect(next++);
+                }
+            }
+        }
+
+        @Override
+        public void cancel() {
+            running = false;
+        }
+    }
+
+    private static class SleepingSink implements SinkFunction<Long> {
+        @Override
+        public void invoke(Long value, Context context) throws Exception {
+            Thread.sleep(1_000);
+        }
+    }
+}
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
index b14618e0515..065772daee8 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -20,6 +20,7 @@ package org.apache.flink.yarn.executors;
 
 import org.apache.flink.annotation.Internal;
 import 
org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.PipelineExecutor;
 import org.apache.flink.yarn.YarnClusterClientFactory;
 import org.apache.flink.yarn.configuration.YarnDeploymentTarget;
@@ -33,7 +34,7 @@ public class YarnSessionClusterExecutor
 
     public static final String NAME = YarnDeploymentTarget.SESSION.getName();
 
-    public YarnSessionClusterExecutor() {
-        super(new YarnClusterClientFactory());
+    public YarnSessionClusterExecutor(Configuration configuration) {
+        super(new YarnClusterClientFactory(), configuration);
     }
 }
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
index e94a230b4cd..3c7cb0f9044 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
@@ -45,7 +45,7 @@ public class YarnSessionClusterExecutorFactory implements 
PipelineExecutorFactor
     @Override
     public PipelineExecutor getExecutor(@Nonnull final Configuration 
configuration) {
         try {
-            return new YarnSessionClusterExecutor();
+            return new YarnSessionClusterExecutor(configuration);
         } catch (NoClassDefFoundError e) {
             throw new 
IllegalStateException(YarnDeploymentTarget.ERROR_MESSAGE);
         }

Reply via email to