Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-28 Thread via GitHub


HuangZhenQiu commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1618086605


##
flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.execution;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent;
+import org.apache.flink.core.execution.JobExecutionStatusEvent;
+import org.apache.flink.core.execution.JobStatusChangedEvent;
+import org.apache.flink.core.execution.JobStatusChangedListener;
+import org.apache.flink.core.execution.JobStatusChangedListenerFactory;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for job status changed listener. */
+public class JobStatusChangedListenerITCase {
+private static List statusChangedEvents = new 
ArrayList<>();
+
+@Test
+void testJobStatusChanged() throws Exception {
+Configuration configuration = new Configuration();
+configuration.set(
+JOB_STATUS_CHANGED_LISTENERS,
+
Collections.singletonList(TestingJobStatusChangedListenerFactory.class.getName()));
+try (StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration)) {
+List sourceValues = Arrays.asList("a", "b", "c");
+List resultValues = new ArrayList<>();
+try (CloseableIterator iterator =
+env.fromCollection(sourceValues).executeAndCollect()) {
+while (iterator.hasNext()) {
+resultValues.add(iterator.next());
+}
+}
+
assertThat(resultValues).containsExactlyInAnyOrder(sourceValues.toArray(new 
String[0]));
+}
+assertThat(statusChangedEvents.size()).isEqualTo(3);
+assertThat(statusChangedEvents.get(0).jobId())
+.isEqualTo(statusChangedEvents.get(1).jobId());
+assertThat(statusChangedEvents.get(0).jobName())
+.isEqualTo(statusChangedEvents.get(1).jobName());
+
+assertThat(statusChangedEvents.get(1).jobId())
+.isEqualTo(statusChangedEvents.get(2).jobId());
+assertThat(statusChangedEvents.get(1).jobName())
+.isEqualTo(statusChangedEvents.get(2).jobName());
+
+statusChangedEvents.forEach(
+event -> {
+if (event instanceof DefaultJobExecutionStatusEvent) {
+JobExecutionStatusEvent status = 
(JobExecutionStatusEvent) event;
+assertThat(
+(status.oldStatus() == 
JobStatus.CREATED
+&& status.newStatus() 
== JobStatus.RUNNING)

Review Comment:
   Good suggestion. Added test cases accordingly. It actually help me to find 
out that the job status changed listener should be added for 
MiniClusterExecutor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-28 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1617923919


##
flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java:
##
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.execution;
+
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent;
+import org.apache.flink.core.execution.JobExecutionStatusEvent;
+import org.apache.flink.core.execution.JobStatusChangedEvent;
+import org.apache.flink.core.execution.JobStatusChangedListener;
+import org.apache.flink.core.execution.JobStatusChangedListenerFactory;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for job status changed listener. */
+public class JobStatusChangedListenerITCase {
+private static List statusChangedEvents = new 
ArrayList<>();
+
+@Test
+void testJobStatusChanged() throws Exception {
+Configuration configuration = new Configuration();
+configuration.set(
+JOB_STATUS_CHANGED_LISTENERS,
+
Collections.singletonList(TestingJobStatusChangedListenerFactory.class.getName()));
+try (StreamExecutionEnvironment env =
+
StreamExecutionEnvironment.getExecutionEnvironment(configuration)) {
+List sourceValues = Arrays.asList("a", "b", "c");
+List resultValues = new ArrayList<>();
+try (CloseableIterator iterator =
+env.fromCollection(sourceValues).executeAndCollect()) {
+while (iterator.hasNext()) {
+resultValues.add(iterator.next());
+}
+}
+
assertThat(resultValues).containsExactlyInAnyOrder(sourceValues.toArray(new 
String[0]));
+}
+assertThat(statusChangedEvents.size()).isEqualTo(3);
+assertThat(statusChangedEvents.get(0).jobId())
+.isEqualTo(statusChangedEvents.get(1).jobId());
+assertThat(statusChangedEvents.get(0).jobName())
+.isEqualTo(statusChangedEvents.get(1).jobName());
+
+assertThat(statusChangedEvents.get(1).jobId())
+.isEqualTo(statusChangedEvents.get(2).jobId());
+assertThat(statusChangedEvents.get(1).jobName())
+.isEqualTo(statusChangedEvents.get(2).jobName());
+
+statusChangedEvents.forEach(
+event -> {
+if (event instanceof DefaultJobExecutionStatusEvent) {
+JobExecutionStatusEvent status = 
(JobExecutionStatusEvent) event;
+assertThat(
+(status.oldStatus() == 
JobStatus.CREATED
+&& status.newStatus() 
== JobStatus.RUNNING)

Review Comment:
   can we add a test that check for a :`FAILING` / `FAILED` and for 
`CANCELLING`:`CANCELED`:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-28 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1617917890


##
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java:
##
@@ -153,7 +173,18 @@ private CompletableFuture 
submitAndGetJobClientFuture(
 return jobId;
 }))
 .thenApplyAsync(
-jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader));
+jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader))
+.whenCompleteAsync(
+(jobClient, throwable) -> {
+if (throwable == null) {
+PipelineExecutorUtils.notifyJobStatusListeners(
+pipeline, jobGraph, 
jobStatusChangedListeners);
+} else {
+LOG.error(
+"Fail to submit job graph to 
application cluster",

Review Comment:
   nit: Fail => Failed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-28 Thread via GitHub


HuangZhenQiu commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1606207458


##
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##
@@ -55,11 +65,21 @@
 public class AbstractSessionClusterExecutor<
 ClusterID, ClientFactory extends 
ClusterClientFactory>
 implements CacheSupportedPipelineExecutor {
+private final ExecutorService executorService =
+Executors.newFixedThreadPool(
+4, new 
ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));
 
 private final ClientFactory clusterClientFactory;
+private final Configuration configuration;
+private final List jobStatusChangedListeners;
 
-public AbstractSessionClusterExecutor(@Nonnull final ClientFactory 
clusterClientFactory) {
+public AbstractSessionClusterExecutor(
+@Nonnull final ClientFactory clusterClientFactory, Configuration 
configuration) {
 this.clusterClientFactory = checkNotNull(clusterClientFactory);
+this.configuration = configuration;
+this.jobStatusChangedListeners =
+JobStatusChangedListenerUtils.createJobStatusChangedListeners(
+this.getClass().getClassLoader(), configuration, 
executorService);

Review Comment:
   We basically need to load the job status changed listeners in flink libs or 
plugins here. Yes, thread context class loader makes more sense. 



##
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java:
##
@@ -153,7 +173,14 @@ private CompletableFuture 
submitAndGetJobClientFuture(
 return jobId;
 }))
 .thenApplyAsync(
-jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader));
+jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader))
+.whenCompleteAsync(
+(jobClient, throwable) -> {
+if (throwable == null) {

Review Comment:
   Discussed with @davidradl offline. The throwable is not able to rethrow in 
whenCompleteAsync. Thus, log the exception rather than swallow the throwable.
   
https://stackoverflow.com/questions/71668871/completablefuture-whencompleteasync-does-not-let-me-re-throw-an-exception



##
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java:
##
@@ -355,7 +368,7 @@ public CompletableFuture 
requestJobResult(@Nonnull JobID jobId) {
 }
 
 @Override
-public CompletableFuture submitJob(@Nonnull JobGraph jobGraph) {

Review Comment:
   With Gyula's suggestion, we don't need to change the API now.



##
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java:
##
@@ -454,6 +467,24 @@ public CompletableFuture submitJob(@Nonnull 
JobGraph jobGraph) {
 receiver,
 error);
 } else {
+RuntimeExecutionMode executionMode 
=
+
jobGraph.getJobConfiguration()
+
.get(ExecutionOptions.RUNTIME_MODE);
+if 
(jobStatusChangedListeners.size() > 0) {
+
jobStatusChangedListeners.forEach(
+listener ->
+
listener.onEvent(
+new 
DefaultJobCreatedEvent(
+   
 jobGraph.getJobID(),
+   
 jobGraph.getName(),
+   
 pipeline == null
+   
 ? null
+   
 : ((StreamGraph)
+   
 pipeline)
+   
 .getLineageGraph(),
+   
 executionMode)));
+}
+
 LOG.info(
 "Successfully submitted 
job '{}' ({}) to '{}'.",

Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-22 Thread via GitHub


HuangZhenQiu commented on PR #24754:
URL: https://github.com/apache/flink/pull/24754#issuecomment-2125397607

   @davidradl 
   The throwable in executors are caught already in Execution environment. If 
there is a better idea to provide extra info for customers, I am glad to adopt. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-22 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1609844360


##
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java:
##
@@ -81,7 +95,14 @@ public CompletableFuture execute(
 final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, 
userCodeClassloader);
 
 return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, 
miniClusterFactory)
-.submitJob(jobGraph, userCodeClassloader);
+.submitJob(jobGraph, userCodeClassloader)
+.whenComplete(
+(ignored, throwable) -> {
+if (throwable == null) {

Review Comment:
   same comment as above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-22 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1609843669


##
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##
@@ -97,7 +117,26 @@ public CompletableFuture execute(
 clusterClientProvider,
 jobID,
 userCodeClassloader))
-.whenCompleteAsync((ignored1, ignored2) -> 
clusterClient.close());
+.whenCompleteAsync(
+(jobClient, throwable) -> {
+if (throwable == null) {
+RuntimeExecutionMode executionMode =
+jobGraph.getJobConfiguration()
+
.get(ExecutionOptions.RUNTIME_MODE);
+if (jobStatusChangedListeners.size() > 0) {
+jobStatusChangedListeners.forEach(
+listener ->
+listener.onEvent(
+new 
DefaultJobCreatedEvent(
+
jobGraph.getJobID(),
+
jobGraph.getName(),
+
((StreamGraph) pipeline)
+   
 .getLineageGraph(),
+
executionMode)));
+}
+}
+clusterClient.close();

Review Comment:
   I see this conversation is resolved - but I did not see an answer to what 
looks like a valid concern 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-22 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1609838609


##
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java:
##
@@ -153,7 +173,14 @@ private CompletableFuture 
submitAndGetJobClientFuture(
 return jobId;
 }))
 .thenApplyAsync(
-jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader));
+jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader))
+.whenCompleteAsync(
+(jobClient, throwable) -> {
+if (throwable == null) {

Review Comment:
   Should we do something if there is a throwable . Maybe notify to the job 
status changed listeners that there was an error and log the error. If there is 
a good reason to swallow the throwable here, then a comment explaining would be 
good.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-22 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1609838609


##
flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java:
##
@@ -153,7 +173,14 @@ private CompletableFuture 
submitAndGetJobClientFuture(
 return jobId;
 }))
 .thenApplyAsync(
-jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader));
+jobID -> jobClientCreator.getJobClient(actualJobId, 
userCodeClassloader))
+.whenCompleteAsync(
+(jobClient, throwable) -> {
+if (throwable == null) {

Review Comment:
   Should we do something if there is a throwable . Maybe notify to the job 
status changed listeners that there was an error and log the error. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-21 Thread via GitHub


JingGe commented on PR #24754:
URL: https://github.com/apache/flink/pull/24754#issuecomment-2122930606

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-20 Thread via GitHub


HuangZhenQiu commented on PR #24754:
URL: https://github.com/apache/flink/pull/24754#issuecomment-2121563878

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-19 Thread via GitHub


HuangZhenQiu commented on PR #24754:
URL: https://github.com/apache/flink/pull/24754#issuecomment-2119645723

   @gyfora @JingGe 
   Thanks for your review. I have partially resolved your comments. The lineage 
graph info is set into StreamGraph in this correlated PR 
https://github.com/apache/flink/pull/24618/files. I would also like to get more 
suggestion about how to make PRs more self-contained.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-14 Thread via GitHub


JingGe commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1600605397


##
docs/content/docs/deployment/advanced/job_status_listener.md:
##
@@ -0,0 +1,81 @@
+
+---
+title: "Job Status Changed Listener"
+nav-title: job-status-listener
+nav-parent_id: advanced
+nav-pos: 3
+---
+
+
+## Job status changed listener
+Flink provides a pluggable interface for users to register their custom logic 
for handling with the job status changes in which lineage info about 
source/sink is provided.

Review Comment:
   If I am not mistaken, you are implementing the second part of FLIP-314. Does 
it make sense to update the Jira ticket and the PR description with the info 
and the content of this md? It will be easier for others to quickly understand 
the context and join the discussion/review. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-13 Thread via GitHub


gyfora commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1598242464


##
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java:
##
@@ -228,6 +230,14 @@ public void setTimeCharacteristic(TimeCharacteristic 
timeCharacteristic) {
 this.timeCharacteristic = timeCharacteristic;
 }
 
+public void setLineageGraph(LineageGraph lineageGraph) {

Review Comment:
   This method is not called anywhere, how did we test this functionality?



##
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##
@@ -97,7 +117,26 @@ public CompletableFuture execute(
 clusterClientProvider,
 jobID,
 userCodeClassloader))
-.whenCompleteAsync((ignored1, ignored2) -> 
clusterClient.close());
+.whenCompleteAsync(
+(jobClient, throwable) -> {
+if (throwable == null) {
+RuntimeExecutionMode executionMode =
+jobGraph.getJobConfiguration()
+
.get(ExecutionOptions.RUNTIME_MODE);
+if (jobStatusChangedListeners.size() > 0) {
+jobStatusChangedListeners.forEach(
+listener ->
+listener.onEvent(
+new 
DefaultJobCreatedEvent(
+
jobGraph.getJobID(),
+
jobGraph.getName(),
+
((StreamGraph) pipeline)
+   
 .getLineageGraph(),
+
executionMode)));
+}
+}
+clusterClient.close();

Review Comment:
   Should this be in a finally block in case the listeners throws an error?



##
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##
@@ -55,11 +65,21 @@
 public class AbstractSessionClusterExecutor<
 ClusterID, ClientFactory extends 
ClusterClientFactory>
 implements CacheSupportedPipelineExecutor {
+private final ExecutorService executorService =
+Executors.newFixedThreadPool(
+4, new 
ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));
 
 private final ClientFactory clusterClientFactory;
+private final Configuration configuration;
+private final List jobStatusChangedListeners;
 
-public AbstractSessionClusterExecutor(@Nonnull final ClientFactory 
clusterClientFactory) {
+public AbstractSessionClusterExecutor(
+@Nonnull final ClientFactory clusterClientFactory, Configuration 
configuration) {
 this.clusterClientFactory = checkNotNull(clusterClientFactory);
+this.configuration = configuration;
+this.jobStatusChangedListeners =
+JobStatusChangedListenerUtils.createJobStatusChangedListeners(
+this.getClass().getClassLoader(), configuration, 
executorService);

Review Comment:
   Should we use the class class loader or the thread context class loader here?



##
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##
@@ -55,11 +65,21 @@
 public class AbstractSessionClusterExecutor<
 ClusterID, ClientFactory extends 
ClusterClientFactory>
 implements CacheSupportedPipelineExecutor {
+private final ExecutorService executorService =
+Executors.newFixedThreadPool(
+4, new 
ExecutorThreadFactory("Flink-SessionClusterExecutor-IO"));

Review Comment:
   Why do we need 4 threads? Do we expect concurrent calls here?



##
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##
@@ -97,7 +117,26 @@ public CompletableFuture execute(
 clusterClientProvider,
 jobID,
 userCodeClassloader))
-.whenCompleteAsync((ignored1, ignored2) -> 
clusterClient.close());
+.whenCompleteAsync(
+   

Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-08 Thread via GitHub


gyfora commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1594287283


##
flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java:
##
@@ -74,7 +75,7 @@ public Configuration getFlinkConfiguration() {
 }
 
 @Override
-public CompletableFuture submitJob(@Nonnull JobGraph jobGraph) {
+public CompletableFuture submitJob(@Nonnull JobGraph jobGraph, 
Pipeline pipeline) {

Review Comment:
   We discussed this with @HuangZhenQiu offline and we should remove this api 
change and the lineage logic will be applied on the future instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-06 Thread via GitHub


HuangZhenQiu commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1591686363


##
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##
@@ -80,7 +80,7 @@ public CompletableFuture execute(
 clusterDescriptor.retrieve(clusterID);
 ClusterClient clusterClient = 
clusterClientProvider.getClusterClient();
 return clusterClient
-.submitJob(jobGraph)
+.submitJob(jobGraph, pipeline)

Review Comment:
   Yes, but there is a concern about the size of lineage info if put into job 
graph.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-03 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1588942354


##
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java:
##
@@ -281,7 +281,7 @@ void testJobSubmitCancel() throws Exception {
 try (RestClusterClient restClusterClient =
 
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
 assertThat(submitHandler.jobSubmitted).isFalse();
-restClusterClient.submitJob(jobGraph).get();

Review Comment:
   I would leave this as  restClusterClient.submitJob(jobGraph).get();



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-03 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1588942354


##
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java:
##
@@ -281,7 +281,7 @@ void testJobSubmitCancel() throws Exception {
 try (RestClusterClient restClusterClient =
 
createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) {
 assertThat(submitHandler.jobSubmitted).isFalse();
-restClusterClient.submitJob(jobGraph).get();

Review Comment:
   I would leave this as  restClusterClient.submitJob(jobGraph).get();
   same of the other instances.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-03 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1588941192


##
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java:
##
@@ -454,6 +467,24 @@ public CompletableFuture submitJob(@Nonnull 
JobGraph jobGraph) {
 receiver,
 error);
 } else {
+RuntimeExecutionMode executionMode 
=
+
jobGraph.getJobConfiguration()
+
.get(ExecutionOptions.RUNTIME_MODE);
+if 
(jobStatusChangedListeners.size() > 0) {
+
jobStatusChangedListeners.forEach(
+listener ->
+
listener.onEvent(
+new 
DefaultJobCreatedEvent(
+   
 jobGraph.getJobID(),
+   
 jobGraph.getName(),
+   
 pipeline == null
+   
 ? null
+   
 : ((StreamGraph)
+   
 pipeline)
+   
 .getLineageGraph(),
+   
 executionMode)));
+}
+
 LOG.info(
 "Successfully submitted 
job '{}' ({}) to '{}'.",
 jobGraph.getName(),

Review Comment:
add the name/id of the pipeline here for debug.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-03 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1588938587


##
flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java:
##
@@ -80,7 +80,7 @@ public CompletableFuture execute(
 clusterDescriptor.retrieve(clusterID);
 ClusterClient clusterClient = 
clusterClientProvider.getClusterClient();
 return clusterClient
-.submitJob(jobGraph)
+.submitJob(jobGraph, pipeline)

Review Comment:
   I think that you are saying that the original submitJob with one parameter 
still works. As well as the new method.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-03 Thread via GitHub


davidradl commented on code in PR #24754:
URL: https://github.com/apache/flink/pull/24754#discussion_r1588939108


##
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java:
##
@@ -94,7 +95,18 @@ public interface ClusterClient extends AutoCloseable {
  * @param jobGraph to submit
  * @return {@link JobID} of the submitted job
  */
-CompletableFuture submitJob(JobGraph jobGraph);
+default CompletableFuture submitJob(JobGraph jobGraph) {
+return submitJob(jobGraph, null);

Review Comment:
   cant we leave this as return submitJob(jobGraph(jobGraph);



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-01 Thread via GitHub


flinkbot commented on PR #24754:
URL: https://github.com/apache/flink/pull/24754#issuecomment-2088785395

   
   ## CI report:
   
   * 6b5b8889d2417c2510b1372a32c332bc3962cb99 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]

2024-05-01 Thread via GitHub


HuangZhenQiu opened a new pull request, #24754:
URL: https://github.com/apache/flink/pull/24754

   ## What is the purpose of the change
   Add job status changed listener for lineage. Need to use this PR to discuss 
with community about whether to add lineage graph to JobGraph for web 
submission and failure recovery cases.
   
   ## Brief change log
 -  Add interfaces for job status change listener  
 -  Add Events and configs for job status listener.
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
 - The end to end test is covered by JobStatusListenerITCase 
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)
 - The serializers: (no)
 - The runtime per-record code paths (performance sensitive): (no)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
 - The S3 file system connector: (no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes)
 - If yes, how is the feature documented? (not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org