Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2024-01-15 Thread via GitHub


gyfora merged PR #733:
URL: https://github.com/apache/flink-kubernetes-operator/pull/733


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2024-01-15 Thread via GitHub


AncyRominus commented on PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#issuecomment-1891799251

   Thank you @gyfora ! I have incorporated the review comments. Please re-review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2024-01-14 Thread via GitHub


gyfora commented on code in PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1452023246


##
flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java:
##
@@ -17,55 +17,82 @@
 
 package org.apache.flink.kubernetes.operator.admission.mutator;
 
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.informers.cache.Cache;
 import io.javaoperatorsdk.webhook.admission.NotAllowedException;
 import io.javaoperatorsdk.webhook.admission.Operation;
 import io.javaoperatorsdk.webhook.admission.mutation.Mutator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
 
 /** The default mutator. */
 public class FlinkMutator implements Mutator {
 private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMutator.class);
 private static final ObjectMapper mapper = new ObjectMapper();
+private final Set mutators;
+private final InformerManager informerManager;
+
+public FlinkMutator(Set mutators, InformerManager 
informerManager) {
+this.mutators = mutators;
+this.informerManager = informerManager;
+}
 
 @Override
 public HasMetadata mutate(HasMetadata resource, Operation operation)
 throws NotAllowedException {
-if (operation == Operation.CREATE) {
+if (operation == Operation.CREATE || operation == Operation.UPDATE) {
 LOG.debug("Mutating resource {}", resource);
-
 if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) {
-try {
-var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
-setSessionTargetLabel(sessionJob);
-return sessionJob;
-} catch (Exception e) {
-throw new RuntimeException(e);
-}
+return mutateSessionJob(resource);
+}
+if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) 
{
+return mutateDeployment(resource);
 }
 }
 return resource;
 }
 
-private void setSessionTargetLabel(FlinkSessionJob flinkSessionJob) {
-var labels = flinkSessionJob.getMetadata().getLabels();
-if (labels == null) {
-labels = new HashMap<>();
+private FlinkSessionJob mutateSessionJob(HasMetadata resource) {
+try {
+var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
+var namespace = sessionJob.getMetadata().getNamespace();
+var deploymentName = sessionJob.getSpec().getDeploymentName();
+var key = Cache.namespaceKeyFunc(namespace, deploymentName);
+var deployment =
+
informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key);
+
+for (FlinkResourceMutator mutator : mutators) {
+FlinkSessionJob flinkSessionJob =
+mutator.mutateSessionJob(sessionJob, 
Optional.ofNullable(deployment));
+sessionJob = flinkSessionJob;

Review Comment:
   Simplify to : 
   ```
   sessionJob =  mutator.mutateSessionJob(sessionJob, 
Optional.ofNullable(deployment));
   ```



##
flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java:
##
@@ -17,55 +17,82 @@
 
 package org.apache.flink.kubernetes.operator.admission.mutator;
 
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.informers.cache.Cache;
 import io.javaoperatorsdk.webhook.admission.NotAllowedException;
 import io.javaoperatorsdk.webhook.admission.Operation;
 import io.javaoperatorsdk.webhook.admission.mutation.Mutator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
 
 /** The default mutator. */
 public class FlinkMutator implements Mutator {
 private static final Logger LOG = 

Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2024-01-07 Thread via GitHub


gyfora commented on code in PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1444204137


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/MutatorUtils.java:
##
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.kubernetes.operator.utils;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
+import org.apache.flink.kubernetes.operator.mutator.DefaultFlinkMutator;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/** Mutator utilities. */
+public final class MutatorUtils {
+
+private static final Logger LOG = 
LoggerFactory.getLogger(MutatorUtils.class);
+
+/**
+ * discovers mutators.
+ *
+ * @param configManager Flink Config manager
+ * @return Set of FlinkResourceMutator
+ */
+public static Set 
discoverMutators(FlinkConfigManager configManager) {

Review Comment:
   We should have a test for this similar to the `ValidatorUtilsTest` 
alternatively it would be even better to refactor the code and eliminate the 
overlapping logic between discovering the validator and mutator and then we can 
have a single test



##
flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java:
##
@@ -136,6 +140,10 @@ public void testHandleValidateRequestWithAdmissionReview() 
throws IOException {
 public void testMutateHandler() throws Exception {
 final EmbeddedChannel embeddedChannel = new 
EmbeddedChannel(admissionHandler);
 var sessionJob = new FlinkSessionJob();
+ObjectMeta objectMeta = new ObjectMeta();

Review Comment:
   We should also add a test that has a custom mutator and make sure it is 
called.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2024-01-02 Thread via GitHub


AncyRominus commented on code in PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1440100481


##
flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java:
##
@@ -17,43 +17,88 @@
 
 package org.apache.flink.kubernetes.operator.admission.mutator;
 
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.informers.cache.Cache;
 import io.javaoperatorsdk.webhook.admission.NotAllowedException;
 import io.javaoperatorsdk.webhook.admission.Operation;
 import io.javaoperatorsdk.webhook.admission.mutation.Mutator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
 
 /** The default mutator. */
 public class FlinkMutator implements Mutator {
 private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMutator.class);
 private static final ObjectMapper mapper = new ObjectMapper();
+private final Set mutators;
+private final InformerManager informerManager;
+
+public FlinkMutator(Set mutators, InformerManager 
informerManager) {
+this.mutators = mutators;
+this.informerManager = informerManager;
+}
 
 @Override
 public HasMetadata mutate(HasMetadata resource, Operation operation)
 throws NotAllowedException {
-if (operation == Operation.CREATE) {
+if (operation == Operation.CREATE || operation == Operation.UPDATE) {
 LOG.debug("Mutating resource {}", resource);
-
 if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) {
-try {
-var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
-setSessionTargetLabel(sessionJob);
-return sessionJob;
-} catch (Exception e) {
-throw new RuntimeException(e);
-}
+return mutateSessionJob(resource);
+}
+if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) 
{
+return mutateDeployment(resource);
 }
 }
 return resource;
 }
 
+private FlinkSessionJob mutateSessionJob(HasMetadata resource) {
+try {
+var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
+var namespace = sessionJob.getMetadata().getNamespace();
+var deploymentName = sessionJob.getSpec().getDeploymentName();
+var key = Cache.namespaceKeyFunc(namespace, deploymentName);
+var deployment =
+
informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key);
+
+setSessionTargetLabel(sessionJob);

Review Comment:
   Thanks @gyfora ! 
   Ys, that's a good point. I have added the logic for setting session target 
label as part of default mutator logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2024-01-02 Thread via GitHub


AncyRominus commented on code in PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1440100481


##
flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java:
##
@@ -17,43 +17,88 @@
 
 package org.apache.flink.kubernetes.operator.admission.mutator;
 
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.informers.cache.Cache;
 import io.javaoperatorsdk.webhook.admission.NotAllowedException;
 import io.javaoperatorsdk.webhook.admission.Operation;
 import io.javaoperatorsdk.webhook.admission.mutation.Mutator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
 
 /** The default mutator. */
 public class FlinkMutator implements Mutator {
 private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMutator.class);
 private static final ObjectMapper mapper = new ObjectMapper();
+private final Set mutators;
+private final InformerManager informerManager;
+
+public FlinkMutator(Set mutators, InformerManager 
informerManager) {
+this.mutators = mutators;
+this.informerManager = informerManager;
+}
 
 @Override
 public HasMetadata mutate(HasMetadata resource, Operation operation)
 throws NotAllowedException {
-if (operation == Operation.CREATE) {
+if (operation == Operation.CREATE || operation == Operation.UPDATE) {
 LOG.debug("Mutating resource {}", resource);
-
 if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) {
-try {
-var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
-setSessionTargetLabel(sessionJob);
-return sessionJob;
-} catch (Exception e) {
-throw new RuntimeException(e);
-}
+return mutateSessionJob(resource);
+}
+if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) 
{
+return mutateDeployment(resource);
 }
 }
 return resource;
 }
 
+private FlinkSessionJob mutateSessionJob(HasMetadata resource) {
+try {
+var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
+var namespace = sessionJob.getMetadata().getNamespace();
+var deploymentName = sessionJob.getSpec().getDeploymentName();
+var key = Cache.namespaceKeyFunc(namespace, deploymentName);
+var deployment =
+
informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key);
+
+setSessionTargetLabel(sessionJob);

Review Comment:
   Thanks @gyfora ! Added the logic for setting session target label as part of 
default mutator logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-22 Thread via GitHub


gyfora commented on code in PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1435156526


##
flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java:
##
@@ -17,43 +17,88 @@
 
 package org.apache.flink.kubernetes.operator.admission.mutator;
 
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.informers.cache.Cache;
 import io.javaoperatorsdk.webhook.admission.NotAllowedException;
 import io.javaoperatorsdk.webhook.admission.Operation;
 import io.javaoperatorsdk.webhook.admission.mutation.Mutator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
 
 /** The default mutator. */
 public class FlinkMutator implements Mutator {
 private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMutator.class);
 private static final ObjectMapper mapper = new ObjectMapper();
+private final Set mutators;
+private final InformerManager informerManager;
+
+public FlinkMutator(Set mutators, InformerManager 
informerManager) {
+this.mutators = mutators;
+this.informerManager = informerManager;
+}
 
 @Override
 public HasMetadata mutate(HasMetadata resource, Operation operation)
 throws NotAllowedException {
-if (operation == Operation.CREATE) {
+if (operation == Operation.CREATE || operation == Operation.UPDATE) {
 LOG.debug("Mutating resource {}", resource);
-
 if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) {
-try {
-var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
-setSessionTargetLabel(sessionJob);
-return sessionJob;
-} catch (Exception e) {
-throw new RuntimeException(e);
-}
+return mutateSessionJob(resource);
+}
+if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) 
{
+return mutateDeployment(resource);
 }
 }
 return resource;
 }
 
+private FlinkSessionJob mutateSessionJob(HasMetadata resource) {
+try {
+var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
+var namespace = sessionJob.getMetadata().getNamespace();
+var deploymentName = sessionJob.getSpec().getDeploymentName();
+var key = Cache.namespaceKeyFunc(namespace, deploymentName);
+var deployment =
+
informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key);
+
+setSessionTargetLabel(sessionJob);

Review Comment:
   I mean `setSessionTargetLabel(sessionJob);`  should be in the default mutator



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-22 Thread via GitHub


gyfora commented on code in PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1435156017


##
flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java:
##
@@ -17,43 +17,88 @@
 
 package org.apache.flink.kubernetes.operator.admission.mutator;
 
+import org.apache.flink.kubernetes.operator.admission.informer.InformerManager;
 import org.apache.flink.kubernetes.operator.api.CrdConstants;
+import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
 import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
+import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.fabric8.kubernetes.client.informers.cache.Cache;
 import io.javaoperatorsdk.webhook.admission.NotAllowedException;
 import io.javaoperatorsdk.webhook.admission.Operation;
 import io.javaoperatorsdk.webhook.admission.mutation.Mutator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
+import java.util.Optional;
+import java.util.Set;
 
 /** The default mutator. */
 public class FlinkMutator implements Mutator {
 private static final Logger LOG = 
LoggerFactory.getLogger(FlinkMutator.class);
 private static final ObjectMapper mapper = new ObjectMapper();
+private final Set mutators;
+private final InformerManager informerManager;
+
+public FlinkMutator(Set mutators, InformerManager 
informerManager) {
+this.mutators = mutators;
+this.informerManager = informerManager;
+}
 
 @Override
 public HasMetadata mutate(HasMetadata resource, Operation operation)
 throws NotAllowedException {
-if (operation == Operation.CREATE) {
+if (operation == Operation.CREATE || operation == Operation.UPDATE) {
 LOG.debug("Mutating resource {}", resource);
-
 if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) {
-try {
-var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
-setSessionTargetLabel(sessionJob);
-return sessionJob;
-} catch (Exception e) {
-throw new RuntimeException(e);
-}
+return mutateSessionJob(resource);
+}
+if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) 
{
+return mutateDeployment(resource);
 }
 }
 return resource;
 }
 
+private FlinkSessionJob mutateSessionJob(HasMetadata resource) {
+try {
+var sessionJob = mapper.convertValue(resource, 
FlinkSessionJob.class);
+var namespace = sessionJob.getMetadata().getNamespace();
+var deploymentName = sessionJob.getSpec().getDeploymentName();
+var key = Cache.namespaceKeyFunc(namespace, deploymentName);
+var deployment =
+
informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key);
+
+setSessionTargetLabel(sessionJob);

Review Comment:
   I think if we are adding a mutator abstraction, this logic should be part of 
the default mutator for session jobs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-20 Thread via GitHub


tagarr commented on PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#issuecomment-1864167599

   @gyfora We had considered adding mutation into the controller loop in the 
same manner that the validator is done, but we believe that mutation of the CR 
should only really be done on it’s creation or user modified updates. Changes 
within the reconcile could cause unknown issues and confuse the logic within 
the controller


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-18 Thread via GitHub


AncyRominus commented on PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#issuecomment-1859901718

   Thanks for the comments @tagarr and @gyfora !!! 
   The modify labels are set in FlinkMutator.java and the other mutator changes 
(if any) be added to the default or the custom mutators. The default/custom 
mutator sessionJob changes are iterated and the mutateSessionJobs are processed.
   
   Yes, we can add the sessionjob mutation for Update as well as for Create 
operations. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-18 Thread via GitHub


AncyRominus commented on code in PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1429772463


##
flink-kubernetes-operator/pom.xml:
##
@@ -208,6 +208,18 @@ under the License.
 junit-jupiter-params
 test
 
+
+
+io.javaoperatorsdk
+kubernetes-webhooks-framework-core
+${operator.sdk.webhook-framework.version}
+
+
+*
+*
+
+
+

Review Comment:
   This  dependency is for the Kubernetes-webhooks-framework. This can be 
removed as we don't have any strong relation with this dependency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-18 Thread via GitHub


AncyRominus commented on code in PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1429764133


##
flink-kubernetes-webhook/pom.xml:
##
@@ -127,6 +132,11 @@ under the License.
 
 
 
+
+org.apache.maven.plugins
+maven-surefire-plugin
+${surefire-plugin.version}

Review Comment:
This plugin is required for the successful execution of the test 
AdmissionHandlerTest.testMutateHandler. Else it will throw <500 Internal Server 
Error> while mutating the SessionJob as it’s using the deployment from the 
informerManager which uses Kubernetes Client.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


gyfora commented on PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#issuecomment-1855797398

   There is already a mutator implementation `FlinkMutator` I don't see that 
mentioned or touched anywhere here. Could we also make sure that it is covered 
by the new interfaces as well? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


gyfora commented on code in PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1426674867


##
flink-kubernetes-webhook/pom.xml:
##
@@ -127,6 +132,11 @@ under the License.
 
 
 
+
+org.apache.maven.plugins
+maven-surefire-plugin
+${surefire-plugin.version}

Review Comment:
   Why do we need this new plugin?



##
flink-kubernetes-operator/pom.xml:
##
@@ -208,6 +208,18 @@ under the License.
 junit-jupiter-params
 test
 
+
+
+io.javaoperatorsdk
+kubernetes-webhooks-framework-core
+${operator.sdk.webhook-framework.version}
+
+
+*
+*
+
+
+

Review Comment:
   Why do we need this new dependency inside the operator?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


gyfora commented on PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#issuecomment-1855787058

   ```
   Brief change log
   
   No change in logs
   ```
   
   This section should ideally be filled out with the list of changes that were 
made to implement the given feature / bug fix :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


tagarr commented on PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#issuecomment-1855604122

   @gyfora would love your opinion on this...


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-14 Thread via GitHub


tagarr commented on PR #733:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/733#issuecomment-1855602604

   Looks good. My only question would be to the committers.
   1. Would we like the sessionjob changes (i.e. modify labels) be done in 
flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java
 as it currently is or pushed down to the default mutator ?
   2. Do we want to add sessionjob mutation for Update as well as for Create 
operations ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-13 Thread via GitHub


AncyRominus closed pull request #732: [FLINK-33632] Adding custom flink mutator
URL: https://github.com/apache/flink-kubernetes-operator/pull/732


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]

2023-12-13 Thread via GitHub


AncyRominus opened a new pull request, #733:
URL: https://github.com/apache/flink-kubernetes-operator/pull/733

   
   
   ## What is the purpose of the change
   
   Adding Custom Flink Mutator
   
   ## Brief change log
   
   No change in logs
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Manually verified the change by running a 4 node cluster with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changes to the `CustomResourceDescriptors`: 
(yes / no)
 - Core observer or reconciler logic that is regularly executed: (yes / no)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org