tillrohrmann commented on a change in pull request #17995:
URL: https://github.com/apache/flink/pull/17995#discussion_r761245171



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobStartupFailedException.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Exception signalling that an exception occurred during the execution of the 
jar's main method.
+ *
+ * <p>The job will transition to FAILED state, and it will not be recovered.
+ */
+@Internal
+public class JobStartupFailedException extends RuntimeException {
+
+    private final List<JobValidationError> errors;
+    private final JobID jobId;
+    private final String jobName;
+
+    public JobStartupFailedException(JobID jobID, String jobName, 
List<JobValidationError> errors) {
+        this.jobId = checkNotNull(jobID);

Review comment:
       nit: `jobId` and `jobID`. I would use `jobId`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobValidationError.java
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.Internal;
+
+/** Captures errors during the execution of the user jar's main method. */
+@Internal
+public interface JobValidationError {
+    String getErrorMessage();
+}

Review comment:
       Why does it make sense to use an interface here?

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
##########
@@ -64,7 +68,31 @@ public StreamContextEnvironment(
             final ClassLoader userCodeClassLoader,
             final boolean enforceSingleJobExecution,
             final boolean suppressSysout) {
-        super(executorServiceLoader, configuration, userCodeClassLoader);
+        this(
+                executorServiceLoader,
+                configuration,
+                userCodeClassLoader,
+                enforceSingleJobExecution,
+                suppressSysout,
+                true,
+                new ArrayList<>());
+    }
+
+    @Internal
+    public StreamContextEnvironment(
+            final PipelineExecutorServiceLoader executorServiceLoader,
+            final Configuration configuration,
+            final ClassLoader userCodeClassLoader,
+            final boolean enforceSingleJobExecution,
+            final boolean suppressSysout,
+            final boolean allowConfigurations,
+            final List<JobValidationError> errors) {

Review comment:
       Why do we pass in the list of errors? Can't this be a private field of 
the `StreamContextEnvironment`?

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -282,6 +292,9 @@ private void runApplicationEntryPoint(
             } else {
                 jobIdsFuture.complete(applicationJobIds);
             }
+        } catch (JobStartupFailedException e) {
+            jobIdsFuture.completeExceptionally(e);

Review comment:
       How does this work when there are multiple `execute` calls and we are 
only failing with `JobStartupFailedException` in the last call but the previous 
ones succeed?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -2477,6 +2511,28 @@ public void registerCachedFile(String filePath, String 
name, boolean executable)
                         name, new 
DistributedCache.DistributedCacheEntry(filePath, executable)));
     }
 
+    private List<ConfigurationNotAllowedError> checkNotAllowedConfigurations() 
{

Review comment:
       Is it important that this is a `List`?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -2081,6 +2101,20 @@ public JobClient executeAsync(StreamGraph streamGraph) 
throws Exception {
                 configuration.get(DeploymentOptions.TARGET),
                 "No execution.target specified in your configuration file.");
 
+        errors.addAll(checkNotAllowedConfigurations());
+        if (!errors.isEmpty()) {
+            // HACK: We shortcut the StreamGraph to jobgraph translation 
because we already
+            // know that the job needs to fail and can derive the jobId.
+
+            final JobID jobId =
+                    configuration
+                            
.getOptional(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID)
+                            .map(JobID::fromHexString)
+                            .orElse(new JobID());

Review comment:
       Can we factor this out into a common utility to avoid duplication?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobStartupFailedException.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Exception signalling that an exception occurred during the execution of the 
jar's main method.
+ *
+ * <p>The job will transition to FAILED state, and it will not be recovered.
+ */
+@Internal
+public class JobStartupFailedException extends RuntimeException {
+
+    private final List<JobValidationError> errors;

Review comment:
       Nit: Can this be a collection or do we need list semantics?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
##########
@@ -900,6 +901,41 @@ public void 
testJobDataAreCleanedUpInCorrectOrderOnFailedJob() throws Exception
         testJobDataAreCleanedUpInCorrectOrder(JobStatus.FAILED);
     }
 
+    @Test
+    public void testRetrieveJobResultAfterSubmissionOfFailedJob() throws 
Exception {
+        dispatcher =
+                createAndStartDispatcher(
+                        heartbeatServices,
+                        haServices,
+                        new ExpectedJobIdJobManagerRunnerFactory(
+                                jobId, createdJobManagerRunnerLatch));
+
+        final DispatcherGateway dispatcherGateway =
+                dispatcher.getSelfGateway(DispatcherGateway.class);
+
+        final JobID failedJobId = new JobID();
+
+        final String jobName = "Failed Streaming Job";
+        dispatcherGateway.submitFailedJob(
+                new JobStartupFailedException(
+                        failedJobId, jobName, Collections.singletonList(() -> 
"")));
+
+        final ExecutionGraphInfo executionGraphInfo =
+                dispatcherGateway.requestExecutionGraphInfo(failedJobId, 
TIMEOUT).get();
+        assertEquals(executionGraphInfo.getJobId(), failedJobId);
+        final ArchivedExecutionGraph archivedExecutionGraph =
+                executionGraphInfo.getArchivedExecutionGraph();
+        assertEquals(archivedExecutionGraph.getState(), JobStatus.FAILED);
+        assertEquals(archivedExecutionGraph.getJobName(), jobName);
+        assertThat(
+                archivedExecutionGraph
+                        .getFailureInfo()
+                        .getException()
+                        .deserializeError(ClassLoader.getSystemClassLoader())
+                        .toString(),
+                containsString(JobStartupFailedException.class.getName()));

Review comment:
       Could we use `FlinkMatchers.containsCause` here?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/JobStartupFailedException.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.dispatcher;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobID;
+
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Exception signalling that an exception occurred during the execution of the 
jar's main method.
+ *
+ * <p>The job will transition to FAILED state, and it will not be recovered.
+ */
+@Internal
+public class JobStartupFailedException extends RuntimeException {

Review comment:
       Why does this class extends a `RuntimeException`? Unchecked exceptions 
should usually not be caught.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to