schevalley2 commented on code in PR #24065:
URL: https://github.com/apache/flink/pull/24065#discussion_r1457276451


##########
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypoint.java:
##########
@@ -111,14 +126,45 @@ private static PackagedProgramRetriever 
getPackagedProgramRetriever(
         // No need to do pipelineJars validation if it is a PyFlink job.
         if (!(PackagedProgramUtils.isPython(jobClassName)
                 || PackagedProgramUtils.isPython(programArguments))) {
-            final List<File> pipelineJars =
-                    
KubernetesUtils.checkJarFileForApplicationMode(configuration);
-            Preconditions.checkArgument(pipelineJars.size() == 1, "Should only 
have one jar");
+            final ArtifactFetchManager.Result fetchRes = 
fetchArtifacts(configuration);
+
             return DefaultPackagedProgramRetriever.create(
-                    userLibDir, pipelineJars.get(0), jobClassName, 
programArguments, configuration);
+                    userLibDir,
+                    fetchRes.getUserArtifactDir(),
+                    fetchRes.getJobJar(),
+                    jobClassName,
+                    programArguments,
+                    configuration);
         }
 
         return DefaultPackagedProgramRetriever.create(
                 userLibDir, jobClassName, programArguments, configuration);
     }
+
+    private static ArtifactFetchManager.Result fetchArtifacts(Configuration 
configuration) {
+        try {
+            String targetDir = generateJarDir(configuration);
+            ArtifactFetchManager fetchMgr = new 
ArtifactFetchManager(configuration, targetDir);
+
+            List<String> uris = configuration.get(PipelineOptions.JARS);
+            checkArgument(uris.size() == 1, "Should only have one jar");
+            List<String> additionalUris =
+                    configuration
+                            .getOptional(ArtifactFetchOptions.ARTIFACT_LIST)
+                            .orElse(Collections.emptyList());
+
+            return fetchMgr.fetchArtifacts(uris.get(0), additionalUris);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    static String generateJarDir(Configuration configuration) {
+        return String.join(
+                File.separator,
+                new 
File(configuration.get(ArtifactFetchOptions.ARTIFACT_BASE_DIR))
+                        .getAbsolutePath(),
+                configuration.get(KubernetesConfigOptions.NAMESPACE),
+                configuration.get(KubernetesConfigOptions.CLUSTER_ID));
+    }

Review Comment:
   Oh I did not notice you did not write this part, sorry. Thank you for the 
explanation, that make complete sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to