wangyang0918 commented on a change in pull request #14629:
URL: https://github.com/apache/flink/pull/14629#discussion_r584124798
##########
File path:
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/InitJobManagerDecorator.java
##########
@@ -49,64 +50,104 @@
public class InitJobManagerDecorator extends AbstractKubernetesStepDecorator {
private final KubernetesJobManagerParameters
kubernetesJobManagerParameters;
+ private final Configuration flinkConfig;
public InitJobManagerDecorator(KubernetesJobManagerParameters
kubernetesJobManagerParameters) {
this.kubernetesJobManagerParameters =
checkNotNull(kubernetesJobManagerParameters);
+ this.flinkConfig =
checkNotNull(kubernetesJobManagerParameters.getFlinkConfiguration());
}
@Override
public FlinkPod decorateFlinkPod(FlinkPod flinkPod) {
- final Pod basicPod =
- new PodBuilder(flinkPod.getPod())
- .withApiVersion(API_VERSION)
- .editOrNewMetadata()
- .withLabels(kubernetesJobManagerParameters.getLabels())
-
.withAnnotations(kubernetesJobManagerParameters.getAnnotations())
- .endMetadata()
- .editOrNewSpec()
-
.withServiceAccountName(kubernetesJobManagerParameters.getServiceAccount())
-
.withImagePullSecrets(kubernetesJobManagerParameters.getImagePullSecrets())
-
.withNodeSelector(kubernetesJobManagerParameters.getNodeSelector())
- .withTolerations(
-
kubernetesJobManagerParameters.getTolerations().stream()
- .map(
- e ->
-
KubernetesToleration.fromMap(e)
-
.getInternalResource())
- .collect(Collectors.toList()))
- .endSpec()
- .build();
+ final PodBuilder basicPodBuilder = new
PodBuilder(flinkPod.getPodWithoutMainContainer());
+
+ // Overwrite fields
+ final String serviceAccountName =
+ KubernetesUtils.resolveUserDefinedValue(
+ flinkConfig,
+ KubernetesConfigOptions.JOB_MANAGER_SERVICE_ACCOUNT,
+ kubernetesJobManagerParameters.getServiceAccount(),
+
flinkPod.getPodWithoutMainContainer().getSpec().getServiceAccount(),
+ "service account name");
+ if (flinkPod.getPodWithoutMainContainer().getSpec().getRestartPolicy()
!= null) {
+ logger.info(
+ "The restart policy of JobManager pod will be overwritten
to 'always' "
+ + "since it is controlled by the Kubernetes
deployment.");
Review comment:
Since we are using the deployment for the Flink JobManager, the restart
policy of pod will be overwritten to always anyway. Refer the K8s
documentation[1] for more information.
[1].
https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#pod-template
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]