HuangZhenQiu commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1606207458
########## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ########## @@ -55,11 +65,21 @@ public class AbstractSessionClusterExecutor< ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements CacheSupportedPipelineExecutor { + private final ExecutorService executorService = + Executors.newFixedThreadPool( + 4, new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO")); private final ClientFactory clusterClientFactory; + private final Configuration configuration; + private final List<JobStatusChangedListener> jobStatusChangedListeners; - public AbstractSessionClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) { + public AbstractSessionClusterExecutor( + @Nonnull final ClientFactory clusterClientFactory, Configuration configuration) { this.clusterClientFactory = checkNotNull(clusterClientFactory); + this.configuration = configuration; + this.jobStatusChangedListeners = + JobStatusChangedListenerUtils.createJobStatusChangedListeners( + this.getClass().getClassLoader(), configuration, executorService); Review Comment: We basically need to load the job status changed listeners in flink libs or plugins here. Yes, thread context class loader makes more sense. ########## flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java: ########## @@ -153,7 +173,14 @@ private CompletableFuture<JobClient> submitAndGetJobClientFuture( return jobId; })) .thenApplyAsync( - jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)); + jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)) + .whenCompleteAsync( + (jobClient, throwable) -> { + if (throwable == null) { Review Comment: Discussed with @davidradl offline. The throwable is not able to rethrow in whenCompleteAsync. Thus, log the exception rather than swallow the throwable. https://stackoverflow.com/questions/71668871/completablefuture-whencompleteasync-does-not-let-me-re-throw-an-exception ########## flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java: ########## @@ -355,7 +368,7 @@ public CompletableFuture<JobResult> requestJobResult(@Nonnull JobID jobId) { } @Override - public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) { Review Comment: With Gyula's suggestion, we don't need to change the API now. ########## flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java: ########## @@ -454,6 +467,24 @@ public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) { receiver, error); } else { + RuntimeExecutionMode executionMode = + jobGraph.getJobConfiguration() + .get(ExecutionOptions.RUNTIME_MODE); + if (jobStatusChangedListeners.size() > 0) { + jobStatusChangedListeners.forEach( + listener -> + listener.onEvent( + new DefaultJobCreatedEvent( + jobGraph.getJobID(), + jobGraph.getName(), + pipeline == null + ? null + : ((StreamGraph) + pipeline) + .getLineageGraph(), + executionMode))); + } + LOG.info( "Successfully submitted job '{}' ({}) to '{}'.", jobGraph.getName(), Review Comment: Pipeline is an empty interface. StreamGraph only has job name info which is the same with job graph. ########## flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java: ########## @@ -94,7 +95,18 @@ public interface ClusterClient<T> extends AutoCloseable { * @param jobGraph to submit * @return {@link JobID} of the submitted job */ - CompletableFuture<JobID> submitJob(JobGraph jobGraph); + default CompletableFuture<JobID> submitJob(JobGraph jobGraph) { + return submitJob(jobGraph, null); Review Comment: Would you please elaborate a little bit more? ########## flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java: ########## @@ -74,7 +75,7 @@ public Configuration getFlinkConfiguration() { } @Override - public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) { + public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph, Pipeline pipeline) { Review Comment: Yes, It is a reasonable way to publish the initial JobCreatedEvent. ########## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ########## @@ -55,11 +65,21 @@ public class AbstractSessionClusterExecutor< ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements CacheSupportedPipelineExecutor { + private final ExecutorService executorService = + Executors.newFixedThreadPool( + 4, new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO")); Review Comment: As the submission happens in flink client, there is not parallel requests to handle. I think 1 will be enough. ########## docs/content/docs/deployment/advanced/job_status_listener.md: ########## @@ -0,0 +1,81 @@ + +--- +title: "Job Status Changed Listener" +nav-title: job-status-listener +nav-parent_id: advanced +nav-pos: 3 +--- +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> + +## Job status changed listener +Flink provides a pluggable interface for users to register their custom logic for handling with the job status changes in which lineage info about source/sink is provided. Review Comment: Yes, you are right. It is for the implementation FLIP-314 after the initial PR for interfaces. Thanks for the suggestion. I have updated the PR description and Jira info. For the MD change, I would prefer to get the code review in good shape, Then I will compile all reviewers's suggestion and change accordingly. ########## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ########## @@ -97,7 +117,26 @@ public CompletableFuture<JobClient> execute( clusterClientProvider, jobID, userCodeClassloader)) - .whenCompleteAsync((ignored1, ignored2) -> clusterClient.close()); + .whenCompleteAsync( + (jobClient, throwable) -> { + if (throwable == null) { + RuntimeExecutionMode executionMode = + jobGraph.getJobConfiguration() + .get(ExecutionOptions.RUNTIME_MODE); + if (jobStatusChangedListeners.size() > 0) { + jobStatusChangedListeners.forEach( + listener -> + listener.onEvent( + new DefaultJobCreatedEvent( + jobGraph.getJobID(), + jobGraph.getName(), + ((StreamGraph) pipeline) + .getLineageGraph(), + executionMode))); + } + } + clusterClient.close(); Review Comment: Thanks. The throwable should be caught for each of the listerners. ########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ########## @@ -228,6 +230,14 @@ public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { this.timeCharacteristic = timeCharacteristic; } + public void setLineageGraph(LineageGraph lineageGraph) { Review Comment: The lineage graph set function is used in a different PR https://github.com/apache/flink/pull/24618. @gyfora Would you please provide some suggestion about how to link them together. Actually, should I simply remove it here? ########## flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java: ########## @@ -153,7 +173,14 @@ private CompletableFuture<JobClient> submitAndGetJobClientFuture( return jobId; })) .thenApplyAsync( - jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)); + jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)) + .whenCompleteAsync( + (jobClient, throwable) -> { + if (throwable == null) { Review Comment: The throwable is also wrapped as FlinkRuntimeException and further throw out. ########## flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java: ########## @@ -153,7 +173,14 @@ private CompletableFuture<JobClient> submitAndGetJobClientFuture( return jobId; })) .thenApplyAsync( - jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)); + jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)) + .whenCompleteAsync( + (jobClient, throwable) -> { + if (throwable == null) { Review Comment: @davidradl Thanks for providing the suggestion. If the throwable is not null, then it will be caught in the Execution environment. https://github.com/apache/flink/blob/master/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java#L309. I feel it is enough for users to find logs and submission errors. On the other hand, if a job is not submitted successfully. Then, the job hasn't been in a valid job status. In this case, we probably don't need to notify anything to listener. How do you think? ########## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ########## @@ -97,7 +116,14 @@ public CompletableFuture<JobClient> execute( clusterClientProvider, jobID, userCodeClassloader)) - .whenCompleteAsync((ignored1, ignored2) -> clusterClient.close()); + .whenCompleteAsync( + (jobClient, throwable) -> { + if (throwable == null) { Review Comment: Replied above. ########## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java: ########## @@ -81,7 +95,14 @@ public CompletableFuture<JobClient> execute( final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader); return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory) - .submitJob(jobGraph, userCodeClassloader); + .submitJob(jobGraph, userCodeClassloader) + .whenComplete( + (ignored, throwable) -> { + if (throwable == null) { Review Comment: Replied above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org