JingGe commented on a change in pull request #17995:
URL: https://github.com/apache/flink/pull/17995#discussion_r762194790



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/deployment/application/ApplicationDispatcherBootstrap.java
##########
@@ -159,6 +160,15 @@ public void stop() {
                         })
                 .exceptionally(
                         t -> {
+                            final Optional<JobStartupFailedException> 
jobStartupFailedOpt =
+                                    ExceptionUtils.findThrowable(
+                                            t, 
JobStartupFailedException.class);
+                            if (jobStartupFailedOpt.isPresent()) {
+                                final JobStartupFailedException error = 
jobStartupFailedOpt.get();
+                                dispatcherGateway.submitFailedJob(error);
+                                LOG.info("Application failed: ", error);

Review comment:
       ```suggestion
                                   LOG.info("Application failed expectedly: ", 
error);
   ```

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
##########
@@ -2477,6 +2511,28 @@ public void registerCachedFile(String filePath, String 
name, boolean executable)
                         name, new 
DistributedCache.DistributedCacheEntry(filePath, executable)));
     }
 
+    private List<ConfigurationNotAllowedError> checkNotAllowedConfigurations() 
{

Review comment:
       since the returned errors will only be checked whether it is empty, how 
about just let this method return the boolean value?

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
##########
@@ -161,6 +189,16 @@ public static void setAsContext(
             final boolean suppressSysout) {
         StreamExecutionEnvironmentFactory factory =
                 conf -> {
+                    final List<JobValidationError> errors = new ArrayList<>();
+                    final boolean allowConfigurations =
+                            configuration.getBoolean(
+                                    
DeploymentOptions.ALLOW_CLIENT_JOB_CONFIGURATIONS);
+                    if (!allowConfigurations && !conf.toMap().isEmpty()) {
+                        conf.toMap()
+                                .forEach(
+                                        (k, v) ->
+                                                errors.add(new 
ConfigurationNotAllowedError(k, v)));

Review comment:
       do we really need to create a new ConfigurationNotAllowedError instance 
for each kv config? With only one ConfigurationNotAllowedError could already 
make sure the errors is not empty.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to