wangyang0918 commented on a change in pull request #14629:
URL: https://github.com/apache/flink/pull/14629#discussion_r584124798



##########
File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
##########
@@ -49,64 +50,104 @@
 public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator {
 
     private final KubernetesJobManagerParameters 
kubernetesJobManagerParameters;
+    private final Configuration flinkConfig;
 
     public InitJobManagerDecorator(KubernetesJobManagerParameters 
kubernetesJobManagerParameters) {
         this.kubernetesJobManagerParameters = 
checkNotNull(kubernetesJobManagerParameters);
+        this.flinkConfig = 
checkNotNull(kubernetesJobManagerParameters.getFlinkConfiguration());
     }
 
     @Override
     public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
-        final Pod basicPod =
-                new PodBuilder(flinkPod.getPod())
-                        .withApiVersion(API_VERSION)
-                        .editOrNewMetadata()
-                        .withLabels(kubernetesJobManagerParameters.getLabels())
-                        
.withAnnotations(kubernetesJobManagerParameters.getAnnotations())
-                        .endMetadata()
-                        .editOrNewSpec()
-                        
.withServiceAccountName(kubernetesJobManagerParameters.getServiceAccount())
-                        
.withImagePullSecrets(kubernetesJobManagerParameters.getImagePullSecrets())
-                        
.withNodeSelector(kubernetesJobManagerParameters.getNodeSelector())
-                        .withTolerations(
-                                
kubernetesJobManagerParameters.getTolerations().stream()
-                                        .map(
-                                                e ->
-                                                        
KubernetesToleration.fromMap(e)
-                                                                
.getInternalResource())
-                                        .collect(Collectors.toList()))
-                        .endSpec()
-                        .build();
+        final PodBuilder basicPodBuilder = new 
PodBuilder(flinkPod.getPodWithoutMainContainer());
+
+        // Overwrite fields
+        final String serviceAccountName =
+                KubernetesUtils.resolveUserDefinedValue(
+                        flinkConfig,
+                        KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT,
+                        kubernetesJobManagerParameters.getServiceAccount(),
+                        
flinkPod.getPodWithoutMainContainer().getSpec().getServiceAccount(),
+                        "service account name");
+        if (flinkPod.getPodWithoutMainContainer().getSpec().getRestartPolicy() 
!= null) {
+            logger.info(
+                    "The restart policy of JobManager pod will be overwritten 
to 'always' "
+                            + "since it is controlled by the Kubernetes 
deployment.");

Review comment:
       Since we are using the deployment for the Flink JobManager, the restart 
policy of pod will be overwritten to always anyway. Refer the K8s 
documentation[1] for more information.
   
   [1]. 
https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#pod-template




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to