wangyang0918 commented on a change in pull request #14629:
URL: https://github.com/apache/flink/pull/14629#discussion_r584124388



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
##########
@@ -396,6 +405,74 @@ public static String getCommonStartCommand(
                 .collect(Collectors.toList());
     }
 
+    public static FlinkPod loadPodFromTemplateFile(
+            FlinkKubeClient kubeClient, File podTemplateFile, String 
mainContainerName) {
+        final KubernetesPod pod = 
kubeClient.loadPodFromTemplateFile(podTemplateFile);
+        final List<Container> otherContainers = new ArrayList<>();
+        Container mainContainer = null;
+
+        for (Container container : 
pod.getInternalResource().getSpec().getContainers()) {
+            if (mainContainerName.equals(container.getName())) {
+                mainContainer = container;
+            } else {
+                otherContainers.add(container);
+            }
+        }
+
+        if (mainContainer == null) {
+            LOG.info(
+                    "Could not find main container {} in pod template, using 
empty one to initialize.",
+                    mainContainerName);
+            mainContainer = new ContainerBuilder().build();
+        }
+
+        pod.getInternalResource().getSpec().setContainers(otherContainers);
+        return new FlinkPod(pod.getInternalResource(), mainContainer);
+    }
+
+    public static File getTaskManagerPodTemplateFileInPod() {
+        return new File(
+                Constants.POD_TEMPLATE_DIR_IN_POD, 
Constants.TASK_MANAGER_POD_TEMPLATE_FILE_NAME);
+    }
+
+    /**
+     * Resolve the user defined value with the precedence. First an explicit 
config option value is
+     * taken, then the value in pod template and at last the default value of 
a config option if
+     * nothing is specified.
+     *
+     * @param flinkConfig flink configuration
+     * @param configOption the config option to define the Kubernetes fields
+     * @param valueOfConfigOptionOrDefault the value defined by explicit 
config option or default
+     * @param valueOfPodTemplate the value defined in the pod template
+     * @param fieldDescription Kubernetes fields description
+     * @param <T> The type of value associated with the configuration option.
+     * @return the resolved value
+     */
+    public static <T> String resolveUserDefinedValue(
+            Configuration flinkConfig,
+            ConfigOption<T> configOption,
+            String valueOfConfigOptionOrDefault,

Review comment:
       The reason I do not extract this value from 
`flinkConfig.get(configOption)` because I want to extract them from 
`KubernetesParameters`. Then we could reuse the pre condition check(e.g. empty 
string, length, maybe more in the future) and fallback logic(e.g. 
`getServiceAccount`).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to