Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
HuangZhenQiu commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1618086605 ## flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.execution; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent; +import org.apache.flink.core.execution.JobExecutionStatusEvent; +import org.apache.flink.core.execution.JobStatusChangedEvent; +import org.apache.flink.core.execution.JobStatusChangedListener; +import org.apache.flink.core.execution.JobStatusChangedListenerFactory; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for job status changed listener. */ +public class JobStatusChangedListenerITCase { +private static List statusChangedEvents = new ArrayList<>(); + +@Test +void testJobStatusChanged() throws Exception { +Configuration configuration = new Configuration(); +configuration.set( +JOB_STATUS_CHANGED_LISTENERS, + Collections.singletonList(TestingJobStatusChangedListenerFactory.class.getName())); +try (StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration)) { +List sourceValues = Arrays.asList("a", "b", "c"); +List resultValues = new ArrayList<>(); +try (CloseableIterator iterator = +env.fromCollection(sourceValues).executeAndCollect()) { +while (iterator.hasNext()) { +resultValues.add(iterator.next()); +} +} + assertThat(resultValues).containsExactlyInAnyOrder(sourceValues.toArray(new String[0])); +} +assertThat(statusChangedEvents.size()).isEqualTo(3); +assertThat(statusChangedEvents.get(0).jobId()) +.isEqualTo(statusChangedEvents.get(1).jobId()); +assertThat(statusChangedEvents.get(0).jobName()) +.isEqualTo(statusChangedEvents.get(1).jobName()); + +assertThat(statusChangedEvents.get(1).jobId()) +.isEqualTo(statusChangedEvents.get(2).jobId()); +assertThat(statusChangedEvents.get(1).jobName()) +.isEqualTo(statusChangedEvents.get(2).jobName()); + +statusChangedEvents.forEach( +event -> { +if (event instanceof DefaultJobExecutionStatusEvent) { +JobExecutionStatusEvent status = (JobExecutionStatusEvent) event; +assertThat( +(status.oldStatus() == JobStatus.CREATED +&& status.newStatus() == JobStatus.RUNNING) Review Comment: Good suggestion. Added test cases accordingly. It actually help me to find out that the job status changed listener should be added for MiniClusterExecutor. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
davidradl commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1617923919 ## flink-tests/src/test/java/org/apache/flink/test/execution/JobStatusChangedListenerITCase.java: ## @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.execution; + +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.DefaultJobExecutionStatusEvent; +import org.apache.flink.core.execution.JobExecutionStatusEvent; +import org.apache.flink.core.execution.JobStatusChangedEvent; +import org.apache.flink.core.execution.JobStatusChangedListener; +import org.apache.flink.core.execution.JobStatusChangedListenerFactory; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.runtime.execution.DefaultJobCreatedEvent; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.configuration.DeploymentOptions.JOB_STATUS_CHANGED_LISTENERS; +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for job status changed listener. */ +public class JobStatusChangedListenerITCase { +private static List statusChangedEvents = new ArrayList<>(); + +@Test +void testJobStatusChanged() throws Exception { +Configuration configuration = new Configuration(); +configuration.set( +JOB_STATUS_CHANGED_LISTENERS, + Collections.singletonList(TestingJobStatusChangedListenerFactory.class.getName())); +try (StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(configuration)) { +List sourceValues = Arrays.asList("a", "b", "c"); +List resultValues = new ArrayList<>(); +try (CloseableIterator iterator = +env.fromCollection(sourceValues).executeAndCollect()) { +while (iterator.hasNext()) { +resultValues.add(iterator.next()); +} +} + assertThat(resultValues).containsExactlyInAnyOrder(sourceValues.toArray(new String[0])); +} +assertThat(statusChangedEvents.size()).isEqualTo(3); +assertThat(statusChangedEvents.get(0).jobId()) +.isEqualTo(statusChangedEvents.get(1).jobId()); +assertThat(statusChangedEvents.get(0).jobName()) +.isEqualTo(statusChangedEvents.get(1).jobName()); + +assertThat(statusChangedEvents.get(1).jobId()) +.isEqualTo(statusChangedEvents.get(2).jobId()); +assertThat(statusChangedEvents.get(1).jobName()) +.isEqualTo(statusChangedEvents.get(2).jobName()); + +statusChangedEvents.forEach( +event -> { +if (event instanceof DefaultJobExecutionStatusEvent) { +JobExecutionStatusEvent status = (JobExecutionStatusEvent) event; +assertThat( +(status.oldStatus() == JobStatus.CREATED +&& status.newStatus() == JobStatus.RUNNING) Review Comment: can we add a test that check for a :`FAILING` / `FAILED` and for `CANCELLING`:`CANCELED`: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
davidradl commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1617917890 ## flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java: ## @@ -153,7 +173,18 @@ private CompletableFuture submitAndGetJobClientFuture( return jobId; })) .thenApplyAsync( -jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)); +jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)) +.whenCompleteAsync( +(jobClient, throwable) -> { +if (throwable == null) { +PipelineExecutorUtils.notifyJobStatusListeners( +pipeline, jobGraph, jobStatusChangedListeners); +} else { +LOG.error( +"Fail to submit job graph to application cluster", Review Comment: nit: Fail => Failed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
HuangZhenQiu commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1606207458 ## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ## @@ -55,11 +65,21 @@ public class AbstractSessionClusterExecutor< ClusterID, ClientFactory extends ClusterClientFactory> implements CacheSupportedPipelineExecutor { +private final ExecutorService executorService = +Executors.newFixedThreadPool( +4, new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO")); private final ClientFactory clusterClientFactory; +private final Configuration configuration; +private final List jobStatusChangedListeners; -public AbstractSessionClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) { +public AbstractSessionClusterExecutor( +@Nonnull final ClientFactory clusterClientFactory, Configuration configuration) { this.clusterClientFactory = checkNotNull(clusterClientFactory); +this.configuration = configuration; +this.jobStatusChangedListeners = +JobStatusChangedListenerUtils.createJobStatusChangedListeners( +this.getClass().getClassLoader(), configuration, executorService); Review Comment: We basically need to load the job status changed listeners in flink libs or plugins here. Yes, thread context class loader makes more sense. ## flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java: ## @@ -153,7 +173,14 @@ private CompletableFuture submitAndGetJobClientFuture( return jobId; })) .thenApplyAsync( -jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)); +jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)) +.whenCompleteAsync( +(jobClient, throwable) -> { +if (throwable == null) { Review Comment: Discussed with @davidradl offline. The throwable is not able to rethrow in whenCompleteAsync. Thus, log the exception rather than swallow the throwable. https://stackoverflow.com/questions/71668871/completablefuture-whencompleteasync-does-not-let-me-re-throw-an-exception ## flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java: ## @@ -355,7 +368,7 @@ public CompletableFuture requestJobResult(@Nonnull JobID jobId) { } @Override -public CompletableFuture submitJob(@Nonnull JobGraph jobGraph) { Review Comment: With Gyula's suggestion, we don't need to change the API now. ## flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java: ## @@ -454,6 +467,24 @@ public CompletableFuture submitJob(@Nonnull JobGraph jobGraph) { receiver, error); } else { +RuntimeExecutionMode executionMode = + jobGraph.getJobConfiguration() + .get(ExecutionOptions.RUNTIME_MODE); +if (jobStatusChangedListeners.size() > 0) { + jobStatusChangedListeners.forEach( +listener -> + listener.onEvent( +new DefaultJobCreatedEvent( + jobGraph.getJobID(), + jobGraph.getName(), + pipeline == null + ? null + : ((StreamGraph) + pipeline) + .getLineageGraph(), + executionMode))); +} + LOG.info( "Successfully submitted job '{}' ({}) to '{}'.",
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
HuangZhenQiu commented on PR #24754: URL: https://github.com/apache/flink/pull/24754#issuecomment-2125397607 @davidradl The throwable in executors are caught already in Execution environment. If there is a better idea to provide extra info for customers, I am glad to adopt. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
davidradl commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1609844360 ## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java: ## @@ -81,7 +95,14 @@ public CompletableFuture execute( final JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig, userCodeClassloader); return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory) -.submitJob(jobGraph, userCodeClassloader); +.submitJob(jobGraph, userCodeClassloader) +.whenComplete( +(ignored, throwable) -> { +if (throwable == null) { Review Comment: same comment as above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
davidradl commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1609843669 ## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ## @@ -97,7 +117,26 @@ public CompletableFuture execute( clusterClientProvider, jobID, userCodeClassloader)) -.whenCompleteAsync((ignored1, ignored2) -> clusterClient.close()); +.whenCompleteAsync( +(jobClient, throwable) -> { +if (throwable == null) { +RuntimeExecutionMode executionMode = +jobGraph.getJobConfiguration() + .get(ExecutionOptions.RUNTIME_MODE); +if (jobStatusChangedListeners.size() > 0) { +jobStatusChangedListeners.forEach( +listener -> +listener.onEvent( +new DefaultJobCreatedEvent( + jobGraph.getJobID(), + jobGraph.getName(), + ((StreamGraph) pipeline) + .getLineageGraph(), + executionMode))); +} +} +clusterClient.close(); Review Comment: I see this conversation is resolved - but I did not see an answer to what looks like a valid concern -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
davidradl commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1609838609 ## flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java: ## @@ -153,7 +173,14 @@ private CompletableFuture submitAndGetJobClientFuture( return jobId; })) .thenApplyAsync( -jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)); +jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)) +.whenCompleteAsync( +(jobClient, throwable) -> { +if (throwable == null) { Review Comment: Should we do something if there is a throwable . Maybe notify to the job status changed listeners that there was an error and log the error. If there is a good reason to swallow the throwable here, then a comment explaining would be good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
davidradl commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1609838609 ## flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java: ## @@ -153,7 +173,14 @@ private CompletableFuture submitAndGetJobClientFuture( return jobId; })) .thenApplyAsync( -jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)); +jobID -> jobClientCreator.getJobClient(actualJobId, userCodeClassloader)) +.whenCompleteAsync( +(jobClient, throwable) -> { +if (throwable == null) { Review Comment: Should we do something if there is a throwable . Maybe notify to the job status changed listeners that there was an error and log the error. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
JingGe commented on PR #24754: URL: https://github.com/apache/flink/pull/24754#issuecomment-2122930606 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
HuangZhenQiu commented on PR #24754: URL: https://github.com/apache/flink/pull/24754#issuecomment-2121563878 @flinkbot run azure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
HuangZhenQiu commented on PR #24754: URL: https://github.com/apache/flink/pull/24754#issuecomment-2119645723 @gyfora @JingGe Thanks for your review. I have partially resolved your comments. The lineage graph info is set into StreamGraph in this correlated PR https://github.com/apache/flink/pull/24618/files. I would also like to get more suggestion about how to make PRs more self-contained. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
JingGe commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1600605397 ## docs/content/docs/deployment/advanced/job_status_listener.md: ## @@ -0,0 +1,81 @@ + +--- +title: "Job Status Changed Listener" +nav-title: job-status-listener +nav-parent_id: advanced +nav-pos: 3 +--- + + +## Job status changed listener +Flink provides a pluggable interface for users to register their custom logic for handling with the job status changes in which lineage info about source/sink is provided. Review Comment: If I am not mistaken, you are implementing the second part of FLIP-314. Does it make sense to update the Jira ticket and the PR description with the info and the content of this md? It will be easier for others to quickly understand the context and join the discussion/review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
gyfora commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1598242464 ## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java: ## @@ -228,6 +230,14 @@ public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { this.timeCharacteristic = timeCharacteristic; } +public void setLineageGraph(LineageGraph lineageGraph) { Review Comment: This method is not called anywhere, how did we test this functionality? ## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ## @@ -97,7 +117,26 @@ public CompletableFuture execute( clusterClientProvider, jobID, userCodeClassloader)) -.whenCompleteAsync((ignored1, ignored2) -> clusterClient.close()); +.whenCompleteAsync( +(jobClient, throwable) -> { +if (throwable == null) { +RuntimeExecutionMode executionMode = +jobGraph.getJobConfiguration() + .get(ExecutionOptions.RUNTIME_MODE); +if (jobStatusChangedListeners.size() > 0) { +jobStatusChangedListeners.forEach( +listener -> +listener.onEvent( +new DefaultJobCreatedEvent( + jobGraph.getJobID(), + jobGraph.getName(), + ((StreamGraph) pipeline) + .getLineageGraph(), + executionMode))); +} +} +clusterClient.close(); Review Comment: Should this be in a finally block in case the listeners throws an error? ## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ## @@ -55,11 +65,21 @@ public class AbstractSessionClusterExecutor< ClusterID, ClientFactory extends ClusterClientFactory> implements CacheSupportedPipelineExecutor { +private final ExecutorService executorService = +Executors.newFixedThreadPool( +4, new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO")); private final ClientFactory clusterClientFactory; +private final Configuration configuration; +private final List jobStatusChangedListeners; -public AbstractSessionClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) { +public AbstractSessionClusterExecutor( +@Nonnull final ClientFactory clusterClientFactory, Configuration configuration) { this.clusterClientFactory = checkNotNull(clusterClientFactory); +this.configuration = configuration; +this.jobStatusChangedListeners = +JobStatusChangedListenerUtils.createJobStatusChangedListeners( +this.getClass().getClassLoader(), configuration, executorService); Review Comment: Should we use the class class loader or the thread context class loader here? ## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ## @@ -55,11 +65,21 @@ public class AbstractSessionClusterExecutor< ClusterID, ClientFactory extends ClusterClientFactory> implements CacheSupportedPipelineExecutor { +private final ExecutorService executorService = +Executors.newFixedThreadPool( +4, new ExecutorThreadFactory("Flink-SessionClusterExecutor-IO")); Review Comment: Why do we need 4 threads? Do we expect concurrent calls here? ## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ## @@ -97,7 +117,26 @@ public CompletableFuture execute( clusterClientProvider, jobID, userCodeClassloader)) -.whenCompleteAsync((ignored1, ignored2) -> clusterClient.close()); +.whenCompleteAsync( +
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
gyfora commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1594287283 ## flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java: ## @@ -74,7 +75,7 @@ public Configuration getFlinkConfiguration() { } @Override -public CompletableFuture submitJob(@Nonnull JobGraph jobGraph) { +public CompletableFuture submitJob(@Nonnull JobGraph jobGraph, Pipeline pipeline) { Review Comment: We discussed this with @HuangZhenQiu offline and we should remove this api change and the lineage logic will be applied on the future instead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
HuangZhenQiu commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1591686363 ## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ## @@ -80,7 +80,7 @@ public CompletableFuture execute( clusterDescriptor.retrieve(clusterID); ClusterClient clusterClient = clusterClientProvider.getClusterClient(); return clusterClient -.submitJob(jobGraph) +.submitJob(jobGraph, pipeline) Review Comment: Yes, but there is a concern about the size of lineage info if put into job graph. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
davidradl commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1588942354 ## flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java: ## @@ -281,7 +281,7 @@ void testJobSubmitCancel() throws Exception { try (RestClusterClient restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) { assertThat(submitHandler.jobSubmitted).isFalse(); -restClusterClient.submitJob(jobGraph).get(); Review Comment: I would leave this as restClusterClient.submitJob(jobGraph).get(); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
davidradl commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1588942354 ## flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java: ## @@ -281,7 +281,7 @@ void testJobSubmitCancel() throws Exception { try (RestClusterClient restClusterClient = createRestClusterClient(restServerEndpoint.getServerAddress().getPort())) { assertThat(submitHandler.jobSubmitted).isFalse(); -restClusterClient.submitJob(jobGraph).get(); Review Comment: I would leave this as restClusterClient.submitJob(jobGraph).get(); same of the other instances. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
davidradl commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1588941192 ## flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java: ## @@ -454,6 +467,24 @@ public CompletableFuture submitJob(@Nonnull JobGraph jobGraph) { receiver, error); } else { +RuntimeExecutionMode executionMode = + jobGraph.getJobConfiguration() + .get(ExecutionOptions.RUNTIME_MODE); +if (jobStatusChangedListeners.size() > 0) { + jobStatusChangedListeners.forEach( +listener -> + listener.onEvent( +new DefaultJobCreatedEvent( + jobGraph.getJobID(), + jobGraph.getName(), + pipeline == null + ? null + : ((StreamGraph) + pipeline) + .getLineageGraph(), + executionMode))); +} + LOG.info( "Successfully submitted job '{}' ({}) to '{}'.", jobGraph.getName(), Review Comment: add the name/id of the pipeline here for debug. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
davidradl commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1588938587 ## flink-clients/src/main/java/org/apache/flink/client/deployment/executors/AbstractSessionClusterExecutor.java: ## @@ -80,7 +80,7 @@ public CompletableFuture execute( clusterDescriptor.retrieve(clusterID); ClusterClient clusterClient = clusterClientProvider.getClusterClient(); return clusterClient -.submitJob(jobGraph) +.submitJob(jobGraph, pipeline) Review Comment: I think that you are saying that the original submitJob with one parameter still works. As well as the new method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
davidradl commented on code in PR #24754: URL: https://github.com/apache/flink/pull/24754#discussion_r1588939108 ## flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java: ## @@ -94,7 +95,18 @@ public interface ClusterClient extends AutoCloseable { * @param jobGraph to submit * @return {@link JobID} of the submitted job */ -CompletableFuture submitJob(JobGraph jobGraph); +default CompletableFuture submitJob(JobGraph jobGraph) { +return submitJob(jobGraph, null); Review Comment: cant we leave this as return submitJob(jobGraph(jobGraph); -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
flinkbot commented on PR #24754: URL: https://github.com/apache/flink/pull/24754#issuecomment-2088785395 ## CI report: * 6b5b8889d2417c2510b1372a32c332bc3962cb99 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33212][runtime] add job status changed listener for lineage [flink]
HuangZhenQiu opened a new pull request, #24754: URL: https://github.com/apache/flink/pull/24754 ## What is the purpose of the change Add job status changed listener for lineage. Need to use this PR to discuss with community about whether to add lineage graph to JobGraph for web submission and failure recovery cases. ## Brief change log - Add interfaces for job status change listener - Add Events and configs for job status listener. ## Verifying this change This change added tests and can be verified as follows: - The end to end test is covered by JobStatusListenerITCase ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (yes) - If yes, how is the feature documented? (not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org