schevalley2 commented on code in PR #24065: URL: https://github.com/apache/flink/pull/24065#discussion_r1457276451
########## flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java: ########## @@ -111,14 +126,45 @@ private static PackagedProgramRetriever getPackagedProgramRetriever( // No need to do pipelineJars validation if it is a PyFlink job. if (!(PackagedProgramUtils.isPython(jobClassName) || PackagedProgramUtils.isPython(programArguments))) { - final List<File> pipelineJars = - KubernetesUtils.checkJarFileForApplicationMode(configuration); - Preconditions.checkArgument(pipelineJars.size() == 1, "Should only have one jar"); + final ArtifactFetchManager.Result fetchRes = fetchArtifacts(configuration); + return DefaultPackagedProgramRetriever.create( - userLibDir, pipelineJars.get(0), jobClassName, programArguments, configuration); + userLibDir, + fetchRes.getUserArtifactDir(), + fetchRes.getJobJar(), + jobClassName, + programArguments, + configuration); } return DefaultPackagedProgramRetriever.create( userLibDir, jobClassName, programArguments, configuration); } + + private static ArtifactFetchManager.Result fetchArtifacts(Configuration configuration) { + try { + String targetDir = generateJarDir(configuration); + ArtifactFetchManager fetchMgr = new ArtifactFetchManager(configuration, targetDir); + + List<String> uris = configuration.get(PipelineOptions.JARS); + checkArgument(uris.size() == 1, "Should only have one jar"); + List<String> additionalUris = + configuration + .getOptional(ArtifactFetchOptions.ARTIFACT_LIST) + .orElse(Collections.emptyList()); + + return fetchMgr.fetchArtifacts(uris.get(0), additionalUris); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + static String generateJarDir(Configuration configuration) { + return String.join( + File.separator, + new File(configuration.get(ArtifactFetchOptions.ARTIFACT_BASE_DIR)) + .getAbsolutePath(), + configuration.get(KubernetesConfigOptions.NAMESPACE), + configuration.get(KubernetesConfigOptions.CLUSTER_ID)); + } Review Comment: Oh I did not notice you did not write this part, sorry. Thank you for the explanation, that make complete sense. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org