wangyang0918 commented on a change in pull request #11903: URL: https://github.com/apache/flink/pull/11903#discussion_r417066142
########## File path: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ########## @@ -165,6 +168,41 @@ public Options getCustomCommandLineOptions() { // Execute Actions // -------------------------------------------------------------------------------------------- + protected void runApplication(String[] args) throws Exception { + LOG.info("Running 'run-application' command."); + + final Options commandOptions = CliFrontendParser.getRunCommandOptions(); + final CommandLine commandLine = getCommandLine(commandOptions, args, true); + + final ProgramOptions programOptions = new ProgramOptions(commandLine); + + if (commandLine.hasOption(HELP_OPTION.getOpt())) { + CliFrontendParser.printHelpForRun(customCommandLines); + return; + } + + final ApplicationDeployer deployer = + new ApplicationClusterDeployer(clusterClientServiceLoader); + + final PackagedProgram program = + getPackagedProgram(programOptions); + + final ApplicationConfiguration applicationConfiguration = + new ApplicationConfiguration(program.getArguments(), program.getMainClassName()); + + try { + final List<URL> jobJars = program.getJobJarAndDependencies(); Review comment: I think the reason we need to build a `PackagedProgram` here is to `getJobJarAndDependencies`. And then apply to the configuration option. I am not sure whether we should do this in the application mode. What will happen if we move the building `PackagedProgram` to cluster side, which means we will extract the jar in the `ApplicationEntrypoint`. We just ship the user jar and all other things will be done in the cluster. Benefit from this, the user jar could be "local" schema(e.g. `local:///opt/flink/examples/streaming/WindowJoin.jar`). ########## File path: flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java ########## @@ -691,11 +716,15 @@ private ApplicationReport startAppMaster( 1)); } - final Set<File> userJarFiles = (jobGraph == null) - // not per-job submission - ? Collections.emptySet() - // add user code jars from the provided JobGraph - : jobGraph.getUserJars().stream().map(f -> f.toUri()).map(File::new).collect(Collectors.toSet()); + final Set<File> userJarFiles = new HashSet<>(); + if (jobGraph != null) { + userJarFiles.addAll(jobGraph.getUserJars().stream().map(f -> f.toUri()).map(File::new).collect(Collectors.toSet())); + } + + final List<URI> jarUrls = ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create); + if (jarUrls != null && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) { Review comment: You are right. For perjob mode, we also add the user jars to the `userJarFiles`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org