HuangZhenQiu commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1606207458


##########
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##########
@@ -55,11 +65,21 @@
 public class AbstractSessionClusterExecutor<
                 ClusterID, ClientFactory extends 
ClusterClientFactory<ClusterID>>
         implements CacheSupportedPipelineExecutor {
+    private final ExecutorService executorService =
+            Executors.newFixedThreadPool(
+                    4, new 
ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));
 
     private final ClientFactory clusterClientFactory;
+    private final Configuration configuration;
+    private final List<JobStatusChangedListener> jobStatusChangedListeners;
 
-    public AbstractSessionClusterExecutor(@Nonnull final ClientFactory 
clusterClientFactory) {
+    public AbstractSessionClusterExecutor(
+            @Nonnull final ClientFactory clusterClientFactory, Configuration 
configuration) {
         this.clusterClientFactory = checkNotNull(clusterClientFactory);
+        this.configuration = configuration;
+        this.jobStatusChangedListeners =
+                JobStatusChangedListenerUtils.createJobStatusChangedListeners(
+                        this.getClass().getClassLoader(), configuration, 
executorService);

Review Comment:
   We basically need to load the job status changed listeners in flink libs or 
plugins here. Yes, thread context class loader makes more sense. 



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java:
##########
@@ -153,7 +173,14 @@ private CompletableFuture<JobClient> 
submitAndGetJobClientFuture(
                                     return jobId;
                                 }))
                 .thenApplyAsync(
-                        jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader));
+                        jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader))
+                .whenCompleteAsync(
+                        (jobClient, throwable) -> {
+                            if (throwable == null) {

Review Comment:
   Discussed with @davidradl offline. The throwable is not able to rethrow in 
whenCompleteAsync. Thus, log the exception rather than swallow the throwable.
   
https://stackoverflow.com/questions/71668871/completablefuture-whencompleteasync-does-not-let-me-re-throw-an-exception



##########
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java:
##########
@@ -355,7 +368,7 @@ public CompletableFuture<JobResult> 
requestJobResult(@Nonnull JobID jobId) {
     }
 
     @Override
-    public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {

Review Comment:
   With Gyula's suggestion, we don't need to change the API now.



##########
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java:
##########
@@ -454,6 +467,24 @@ public CompletableFuture<JobID> submitJob(@Nonnull 
JobGraph jobGraph) {
                                                     receiver,
                                                     error);
                                         } else {
+                                            RuntimeExecutionMode executionMode 
=
+                                                    
jobGraph.getJobConfiguration()
+                                                            
.get(ExecutionOptions.RUNTIME_MODE);
+                                            if 
(jobStatusChangedListeners.size() > 0) {
+                                                
jobStatusChangedListeners.forEach(
+                                                        listener ->
+                                                                
listener.onEvent(
+                                                                        new 
DefaultJobCreatedEvent(
+                                                                               
 jobGraph.getJobID(),
+                                                                               
 jobGraph.getName(),
+                                                                               
 pipeline == null
+                                                                               
         ? null
+                                                                               
         : ((StreamGraph)
+                                                                               
                         pipeline)
+                                                                               
                 .getLineageGraph(),
+                                                                               
 executionMode)));
+                                            }
+
                                             LOG.info(
                                                     "Successfully submitted 
job '{}' ({}) to '{}'.",
                                                     jobGraph.getName(),

Review Comment:
   Pipeline is an empty interface. StreamGraph only has job name info which is 
the same with job graph.



##########
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java:
##########
@@ -94,7 +95,18 @@ public interface ClusterClient<T> extends AutoCloseable {
      * @param jobGraph to submit
      * @return {@link JobID} of the submitted job
      */
-    CompletableFuture<JobID> submitJob(JobGraph jobGraph);
+    default CompletableFuture<JobID> submitJob(JobGraph jobGraph) {
+        return submitJob(jobGraph, null);

Review Comment:
   Would you please elaborate a little bit more?



##########
flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java:
##########
@@ -74,7 +75,7 @@ public Configuration getFlinkConfiguration() {
     }
 
     @Override
-    public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
+    public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph, 
Pipeline pipeline) {

Review Comment:
   Yes, It is a reasonable way to publish the initial JobCreatedEvent. 



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##########
@@ -55,11 +65,21 @@
 public class AbstractSessionClusterExecutor<
                 ClusterID, ClientFactory extends 
ClusterClientFactory<ClusterID>>
         implements CacheSupportedPipelineExecutor {
+    private final ExecutorService executorService =
+            Executors.newFixedThreadPool(
+                    4, new 
ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));

Review Comment:
   As the submission happens in flink client, there is not parallel requests to 
handle. I think 1 will be enough.



##########
docs/content/docs/deployment/advanced/job_status_listener.md:
##########
@@ -0,0 +1,81 @@
+
+---
+title: "Job Status Changed Listener"
+nav-title: job-status-listener
+nav-parent_id: advanced
+nav-pos: 3
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+## Job status changed listener
+Flink provides a pluggable interface for users to register their custom logic 
for handling with the job status changes in which lineage info about 
source/sink is provided.

Review Comment:
   Yes, you are right. It is for the implementation FLIP-314 after the initial 
PR for interfaces. Thanks for the suggestion. I have updated the PR description 
and Jira info. For the MD change, I would prefer to get the code review in good 
shape, Then I will compile all reviewers's suggestion and  change accordingly.



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##########
@@ -97,7 +117,26 @@ public CompletableFuture<JobClient> execute(
                                                     clusterClientProvider,
                                                     jobID,
                                                     userCodeClassloader))
-                    .whenCompleteAsync((ignored1, ignored2) -> 
clusterClient.close());
+                    .whenCompleteAsync(
+                            (jobClient, throwable) -> {
+                                if (throwable == null) {
+                                    RuntimeExecutionMode executionMode =
+                                            jobGraph.getJobConfiguration()
+                                                    
.get(ExecutionOptions.RUNTIME_MODE);
+                                    if (jobStatusChangedListeners.size() > 0) {
+                                        jobStatusChangedListeners.forEach(
+                                                listener ->
+                                                        listener.onEvent(
+                                                                new 
DefaultJobCreatedEvent(
+                                                                        
jobGraph.getJobID(),
+                                                                        
jobGraph.getName(),
+                                                                        
((StreamGraph) pipeline)
+                                                                               
 .getLineageGraph(),
+                                                                        
executionMode)));
+                                    }
+                                }
+                                clusterClient.close();

Review Comment:
   Thanks. The throwable should be caught for each of the listerners.



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##########
@@ -228,6 +230,14 @@ public void setTimeCharacteristic(TimeCharacteristic 
timeCharacteristic) {
         this.timeCharacteristic = timeCharacteristic;
     }
 
+    public void setLineageGraph(LineageGraph lineageGraph) {

Review Comment:
   The lineage graph set function is used in a different PR 
https://github.com/apache/flink/pull/24618. @gyfora Would you please provide 
some suggestion about how to link them together. Actually, should I simply 
remove it here?



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java:
##########
@@ -153,7 +173,14 @@ private CompletableFuture<JobClient> 
submitAndGetJobClientFuture(
                                     return jobId;
                                 }))
                 .thenApplyAsync(
-                        jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader));
+                        jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader))
+                .whenCompleteAsync(
+                        (jobClient, throwable) -> {
+                            if (throwable == null) {

Review Comment:
   The throwable is also wrapped as FlinkRuntimeException and further throw out.



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java:
##########
@@ -153,7 +173,14 @@ private CompletableFuture<JobClient> 
submitAndGetJobClientFuture(
                                     return jobId;
                                 }))
                 .thenApplyAsync(
-                        jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader));
+                        jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader))
+                .whenCompleteAsync(
+                        (jobClient, throwable) -> {
+                            if (throwable == null) {

Review Comment:
   @davidradl Thanks for providing the suggestion. If the throwable is not 
null, then it will be caught in the Execution environment. 
https://github.com/apache/flink/blob/master/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java#L309.
 I feel it is enough for users to find logs and submission errors. 
   
   On the other hand, if a job is not submitted successfully. Then, the job 
hasn't been in a valid job status. In this case, we probably don't need to 
notify anything to listener. How do you think? 
   



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##########
@@ -97,7 +116,14 @@ public CompletableFuture<JobClient> execute(
                                                     clusterClientProvider,
                                                     jobID,
                                                     userCodeClassloader))
-                    .whenCompleteAsync((ignored1, ignored2) -> 
clusterClient.close());
+                    .whenCompleteAsync(
+                            (jobClient, throwable) -> {
+                                if (throwable == null) {

Review Comment:
   Replied above.



##########
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java:
##########
@@ -81,7 +95,14 @@ public CompletableFuture<JobClient> execute(
         final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, 
userCodeClassloader);
 
         return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, 
miniClusterFactory)
-                .submitJob(jobGraph, userCodeClassloader);
+                .submitJob(jobGraph, userCodeClassloader)
+                .whenComplete(
+                        (ignored, throwable) -> {
+                            if (throwable == null) {

Review Comment:
   Replied above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to