gyfora commented on code in PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1452023246


##########
flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java:
##########
@@ -17,55 +17,82 @@
 
 package org.apache.flink.kubernetes.operator.admission.mutator;
 
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.informers.cache.Cache;
 import io.javaoperatorsdk.webhook.admission.NotAllowedException;
 import io.javaoperatorsdk.webhook.admission.Operation;
 import io.javaoperatorsdk.webhook.admission.mutation.Mutator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
 
 /** The default mutator. */
 public class FlinkMutator implements Mutator<HasMetadata> {
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMutator.class);
     private static final ObjectMapper mapper = new ObjectMapper();
+    private final Set<FlinkResourceMutator> mutators;
+    private final InformerManager informerManager;
+
+    public FlinkMutator(Set<FlinkResourceMutator> mutators, InformerManager 
informerManager) {
+        this.mutators = mutators;
+        this.informerManager = informerManager;
+    }
 
     @Override
     public HasMetadata mutate(HasMetadata resource, Operation operation)
             throws NotAllowedException {
-        if (operation == Operation.CREATE) {
+        if (operation == Operation.CREATE || operation == Operation.UPDATE) {
             LOG.debug("Mutating resource {}", resource);
-
             if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) {
-                try {
-                    var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
-                    setSessionTargetLabel(sessionJob);
-                    return sessionJob;
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
+                return mutateSessionJob(resource);
+            }
+            if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) 
{
+                return mutateDeployment(resource);
             }
         }
         return resource;
     }
 
-    private void setSessionTargetLabel(FlinkSessionJob flinkSessionJob) {
-        var labels = flinkSessionJob.getMetadata().getLabels();
-        if (labels == null) {
-            labels = new HashMap<>();
+    private FlinkSessionJob mutateSessionJob(HasMetadata resource) {
+        try {
+            var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
+            var namespace = sessionJob.getMetadata().getNamespace();
+            var deploymentName = sessionJob.getSpec().getDeploymentName();
+            var key = Cache.namespaceKeyFunc(namespace, deploymentName);
+            var deployment =
+                    
informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key);
+
+            for (FlinkResourceMutator mutator : mutators) {
+                FlinkSessionJob flinkSessionJob =
+                        mutator.mutateSessionJob(sessionJob, 
Optional.ofNullable(deployment));
+                sessionJob = flinkSessionJob;

Review Comment:
   Simplify to : 
   ```
   sessionJob =  mutator.mutateSessionJob(sessionJob, 
Optional.ofNullable(deployment));
   ```



##########
flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java:
##########
@@ -17,55 +17,82 @@
 
 package org.apache.flink.kubernetes.operator.admission.mutator;
 
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.informers.cache.Cache;
 import io.javaoperatorsdk.webhook.admission.NotAllowedException;
 import io.javaoperatorsdk.webhook.admission.Operation;
 import io.javaoperatorsdk.webhook.admission.mutation.Mutator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
 
 /** The default mutator. */
 public class FlinkMutator implements Mutator<HasMetadata> {
     private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMutator.class);
     private static final ObjectMapper mapper = new ObjectMapper();
+    private final Set<FlinkResourceMutator> mutators;
+    private final InformerManager informerManager;
+
+    public FlinkMutator(Set<FlinkResourceMutator> mutators, InformerManager 
informerManager) {
+        this.mutators = mutators;
+        this.informerManager = informerManager;
+    }
 
     @Override
     public HasMetadata mutate(HasMetadata resource, Operation operation)
             throws NotAllowedException {
-        if (operation == Operation.CREATE) {
+        if (operation == Operation.CREATE || operation == Operation.UPDATE) {
             LOG.debug("Mutating resource {}", resource);
-
             if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) {
-                try {
-                    var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
-                    setSessionTargetLabel(sessionJob);
-                    return sessionJob;
-                } catch (Exception e) {
-                    throw new RuntimeException(e);
-                }
+                return mutateSessionJob(resource);
+            }
+            if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) 
{
+                return mutateDeployment(resource);
             }
         }
         return resource;
     }
 
-    private void setSessionTargetLabel(FlinkSessionJob flinkSessionJob) {
-        var labels = flinkSessionJob.getMetadata().getLabels();
-        if (labels == null) {
-            labels = new HashMap<>();
+    private FlinkSessionJob mutateSessionJob(HasMetadata resource) {
+        try {
+            var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
+            var namespace = sessionJob.getMetadata().getNamespace();
+            var deploymentName = sessionJob.getSpec().getDeploymentName();
+            var key = Cache.namespaceKeyFunc(namespace, deploymentName);
+            var deployment =
+                    
informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key);
+
+            for (FlinkResourceMutator mutator : mutators) {
+                FlinkSessionJob flinkSessionJob =
+                        mutator.mutateSessionJob(sessionJob, 
Optional.ofNullable(deployment));
+                sessionJob = flinkSessionJob;
+            }
+
+            return sessionJob;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
-        var deploymentName = flinkSessionJob.getSpec().getDeploymentName();
-        if (deploymentName != null
-                && 
!deploymentName.equals(labels.get(CrdConstants.LABEL_TARGET_SESSION))) {
-            labels.put(
-                    CrdConstants.LABEL_TARGET_SESSION,
-                    flinkSessionJob.getSpec().getDeploymentName());
-            flinkSessionJob.getMetadata().setLabels(labels);
+    }
+
+    private FlinkDeployment mutateDeployment(HasMetadata resource) {
+        try {
+            var flinkDeployment = mapper.convertValue(resource, 
FlinkDeployment.class);
+            for (FlinkResourceMutator mutator : mutators) {
+                FlinkDeployment deployment = 
mutator.mutateDeployment(flinkDeployment);

Review Comment:
   Simplify to:
   ```
   flinkDeployment = mutator.mutateDeployment(flinkDeployment);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to