dmvk commented on a change in pull request #17000:
URL: https://github.com/apache/flink/pull/17000#discussion_r698686629
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -147,35 +146,34 @@ public void stop() {
*/
private CompletableFuture<Acknowledge>
runApplicationAndShutdownClusterAsync(
final DispatcherGateway dispatcherGateway) {
- return applicationCompletionFuture
- .handle(
- (ignored, t) -> {
- if (t == null) {
- LOG.info("Application completed SUCCESSFULLY");
- return dispatcherGateway.shutDownCluster(
- ApplicationStatus.SUCCEEDED);
- }
-
- final Optional<UnsuccessfulExecutionException>
maybeException =
- ExceptionUtils.findThrowable(
- t,
UnsuccessfulExecutionException.class);
- if (maybeException.isPresent()) {
- final ApplicationStatus applicationStatus =
- maybeException.get().getStatus();
- if (applicationStatus ==
ApplicationStatus.CANCELED
- || applicationStatus ==
ApplicationStatus.FAILED) {
- LOG.info("Application {}: ",
applicationStatus, t);
- return
dispatcherGateway.shutDownCluster(applicationStatus);
- }
- }
-
- LOG.warn("Application failed unexpectedly: ", t);
- this.errorHandler.onFatalError(
- new FlinkException("Application failed
unexpectedly.", t));
-
- return
FutureUtils.<Acknowledge>completedExceptionally(t);
- })
- .thenCompose(Function.identity());
+ final CompletableFuture<Acknowledge> shutdownFuture =
+ applicationCompletionFuture
+ .handle(
+ (ignored, t) -> {
+ if (t == null) {
+ LOG.info("Application completed
SUCCESSFULLY");
+ return
dispatcherGateway.shutDownCluster(
+ ApplicationStatus.SUCCEEDED);
+ }
+ final
Optional<UnsuccessfulExecutionException> maybeException =
+ ExceptionUtils.findThrowable(
+ t,
UnsuccessfulExecutionException.class);
+ if (maybeException.isPresent()) {
+ final ApplicationStatus
applicationStatus =
+
maybeException.get().getStatus();
+ if (applicationStatus ==
ApplicationStatus.CANCELED
+ || applicationStatus ==
ApplicationStatus.FAILED) {
+ LOG.info("Application {}: ",
applicationStatus, t);
+ return
dispatcherGateway.shutDownCluster(
+ applicationStatus);
+ }
+ }
+ LOG.warn("Application failed unexpectedly:
", t);
+ return
dispatcherGateway.shutDownClusterExceptionally(t);
Review comment:
You're right. Another approach could be giving up leadership /
restarting dispatcher, but this could be tricky to do π€ Not sure if that would
actually improve anything, probably not worth the effort.
##########
File path:
flink-clients/src/test/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrapTest.java
##########
@@ -779,6 +816,22 @@ public void testDuplicateJobSubmissionWithRunningJobId()
throws Throwable {
assertFalse(maybeDuplicate.get().isGloballyTerminated());
}
+ @Test
+ public void testClusterShutsDownGracefullyOnJobDriverError() throws
Exception {
+ final MiniClusterConfiguration cfg = new
MiniClusterConfiguration.Builder().build();
+ try (final MiniCluster miniCluster = new ApplicationMiniCluster(cfg,
getFailingProgram())) {
+ miniCluster.start();
+ final CompletableFuture<ApplicationStatus> shutdownFuture =
+
Iterables.getOnlyElement(miniCluster.getDispatcherShutdownFutures());
+ try {
+ shutdownFuture.get();
+ fail("Exception expected.");
+ } catch (Exception e) {
+ assertThat(e,
FlinkMatchers.containsCause(FailingJob.FailingJobException.class));
+ }
+ }
+ }
Review comment:
π Also I'll change the test to reflect described scenario (dispatcher
loses leadership).
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationMiniCluster.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
+import
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
+import
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+/** An extension to {@link MiniCluster} that runs packaged program in
application mode. */
+public class ApplicationMiniCluster extends MiniCluster {
Review comment:
Agreed, but this is hard to do without breaking mini cluster public API π€
I did attempt with AbstractMiniCluster and it didn't look right. π’ Huge
change-set that is breaking public API (eg. enum for HAService would have to
move to abstract class, configurations, ...). I think this could be done
properly, but it would definitely take some time (~ 2MD?), there should be an
easier way.
What I can do is to allow passing custom
DispatcherResourceManagerComponentFactory into `TestingMiniCluster` via. User
would be still able to access methods like `submitJob`, but that would be no
different from real world scenario, where this methods are accessible via RPC.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationMiniCluster.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.application;
+
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.runtime.dispatcher.SessionDispatcherFactory;
+import
org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherRunnerFactory;
+import
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
+import
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import
org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
+import org.apache.flink.runtime.rest.JobRestEndpointFactory;
+
+import java.util.concurrent.CompletableFuture;
+
+/** An extension to {@link MiniCluster} that runs packaged program in
application mode. */
+public class ApplicationMiniCluster extends MiniCluster {
Review comment:
Agreed, but this is hard to do without breaking mini cluster public API π€
I did attempt with AbstractMiniCluster and it didn't look right. π’ Huge
change-set that is breaking public API (eg. enum for HAService would have to
move to abstract class, configurations, ...). I think this could be done
properly, but it would definitely take some time (~ 2MD?), there should be an
easier way.
What I can do is to allow passing custom
DispatcherResourceManagerComponentFactory into `TestingMiniCluster` (very
similar to what we already do with HighAvailabilityServices). User would be
still able to access methods like `submitJob`, but that would be no different
from real world scenario, where this methods are accessible via RPC.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
##########
@@ -247,6 +247,14 @@ public ClusterInformation getClusterInformation() {
}
}
+ public List<CompletableFuture<ApplicationStatus>>
getDispatcherShutdownFutures() {
+ synchronized (this) {
+ return dispatcherResourceManagerComponents.stream()
+ .map(DispatcherResourceManagerComponent::getShutDownFuture)
+ .collect(Collectors.toList());
+ }
+ }
Review comment:
removed
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -710,6 +710,12 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph,
long initializationTi
return CompletableFuture.completedFuture(Acknowledge.get());
}
+ @Override
+ public CompletableFuture<Acknowledge>
shutDownClusterExceptionally(Throwable throwable) {
Review comment:
removed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]