wangyang0918 commented on a change in pull request #11903:
URL: https://github.com/apache/flink/pull/11903#discussion_r417066142



##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
##########
@@ -165,6 +168,41 @@ public Options getCustomCommandLineOptions() {
        //  Execute Actions
        // 
--------------------------------------------------------------------------------------------
 
+       protected void runApplication(String[] args) throws Exception {
+               LOG.info("Running 'run-application' command.");
+
+               final Options commandOptions = 
CliFrontendParser.getRunCommandOptions();
+               final CommandLine commandLine = getCommandLine(commandOptions, 
args, true);
+
+               final ProgramOptions programOptions = new 
ProgramOptions(commandLine);
+
+               if (commandLine.hasOption(HELP_OPTION.getOpt())) {
+                       CliFrontendParser.printHelpForRun(customCommandLines);
+                       return;
+               }
+
+               final ApplicationDeployer deployer =
+                               new 
ApplicationClusterDeployer(clusterClientServiceLoader);
+
+               final PackagedProgram program =
+                               getPackagedProgram(programOptions);
+
+               final ApplicationConfiguration applicationConfiguration =
+                               new 
ApplicationConfiguration(program.getArguments(), program.getMainClassName());
+
+               try {
+                       final List<URL> jobJars = 
program.getJobJarAndDependencies();

Review comment:
       I think the reason we need to build a `PackagedProgram` here is to 
`getJobJarAndDependencies`. And then apply to the configuration option. I am 
not sure whether we should do this in the application mode. What will happen if 
we move the building `PackagedProgram` to cluster side, which means we will 
extract the jar in the `ApplicationEntrypoint`. We just ship the user jar and 
all other things will be done in the cluster.
   
   Benefit from this, the user jar could be "local" schema(e.g. 
`local:///opt/flink/examples/streaming/WindowJoin.jar`).

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java
##########
@@ -691,11 +716,15 @@ private ApplicationReport startAppMaster(
                                                        1));
                }
 
-               final Set<File> userJarFiles = (jobGraph == null)
-                               // not per-job submission
-                               ? Collections.emptySet()
-                               // add user code jars from the provided JobGraph
-                               : jobGraph.getUserJars().stream().map(f -> 
f.toUri()).map(File::new).collect(Collectors.toSet());
+               final Set<File> userJarFiles = new HashSet<>();
+               if (jobGraph != null) {
+                       
userJarFiles.addAll(jobGraph.getUserJars().stream().map(f -> 
f.toUri()).map(File::new).collect(Collectors.toSet()));
+               }
+
+               final List<URI> jarUrls = 
ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, 
URI::create);
+               if (jarUrls != null && 
YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) 
{

Review comment:
       You are right. For perjob mode, we also add the user jars to the 
`userJarFiles`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to