Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
gyfora merged PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
AncyRominus commented on PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#issuecomment-1891799251 Thank you @gyfora ! I have incorporated the review comments. Please re-review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
gyfora commented on code in PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1452023246 ## flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java: ## @@ -17,55 +17,82 @@ package org.apache.flink.kubernetes.operator.admission.mutator; +import org.apache.flink.kubernetes.operator.admission.informer.InformerManager; import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator; import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.Cache; import io.javaoperatorsdk.webhook.admission.NotAllowedException; import io.javaoperatorsdk.webhook.admission.Operation; import io.javaoperatorsdk.webhook.admission.mutation.Mutator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; +import java.util.Optional; +import java.util.Set; /** The default mutator. */ public class FlinkMutator implements Mutator { private static final Logger LOG = LoggerFactory.getLogger(FlinkMutator.class); private static final ObjectMapper mapper = new ObjectMapper(); +private final Set mutators; +private final InformerManager informerManager; + +public FlinkMutator(Set mutators, InformerManager informerManager) { +this.mutators = mutators; +this.informerManager = informerManager; +} @Override public HasMetadata mutate(HasMetadata resource, Operation operation) throws NotAllowedException { -if (operation == Operation.CREATE) { +if (operation == Operation.CREATE || operation == Operation.UPDATE) { LOG.debug("Mutating resource {}", resource); - if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) { -try { -var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); -setSessionTargetLabel(sessionJob); -return sessionJob; -} catch (Exception e) { -throw new RuntimeException(e); -} +return mutateSessionJob(resource); +} +if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) { +return mutateDeployment(resource); } } return resource; } -private void setSessionTargetLabel(FlinkSessionJob flinkSessionJob) { -var labels = flinkSessionJob.getMetadata().getLabels(); -if (labels == null) { -labels = new HashMap<>(); +private FlinkSessionJob mutateSessionJob(HasMetadata resource) { +try { +var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); +var namespace = sessionJob.getMetadata().getNamespace(); +var deploymentName = sessionJob.getSpec().getDeploymentName(); +var key = Cache.namespaceKeyFunc(namespace, deploymentName); +var deployment = + informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key); + +for (FlinkResourceMutator mutator : mutators) { +FlinkSessionJob flinkSessionJob = +mutator.mutateSessionJob(sessionJob, Optional.ofNullable(deployment)); +sessionJob = flinkSessionJob; Review Comment: Simplify to : ``` sessionJob = mutator.mutateSessionJob(sessionJob, Optional.ofNullable(deployment)); ``` ## flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java: ## @@ -17,55 +17,82 @@ package org.apache.flink.kubernetes.operator.admission.mutator; +import org.apache.flink.kubernetes.operator.admission.informer.InformerManager; import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator; import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.Cache; import io.javaoperatorsdk.webhook.admission.NotAllowedException; import io.javaoperatorsdk.webhook.admission.Operation; import io.javaoperatorsdk.webhook.admission.mutation.Mutator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; +import java.util.Optional; +import java.util.Set; /** The default mutator. */ public class FlinkMutator implements Mutator { private static final Logger LOG =
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
gyfora commented on code in PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1444204137 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/MutatorUtils.java: ## @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.operator.utils; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.core.plugin.PluginUtils; +import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; +import org.apache.flink.kubernetes.operator.mutator.DefaultFlinkMutator; +import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Set; + +/** Mutator utilities. */ +public final class MutatorUtils { + +private static final Logger LOG = LoggerFactory.getLogger(MutatorUtils.class); + +/** + * discovers mutators. + * + * @param configManager Flink Config manager + * @return Set of FlinkResourceMutator + */ +public static Set discoverMutators(FlinkConfigManager configManager) { Review Comment: We should have a test for this similar to the `ValidatorUtilsTest` alternatively it would be even better to refactor the code and eliminate the overlapping logic between discovering the validator and mutator and then we can have a single test ## flink-kubernetes-webhook/src/test/java/org/apache/flink/kubernetes/operator/admission/AdmissionHandlerTest.java: ## @@ -136,6 +140,10 @@ public void testHandleValidateRequestWithAdmissionReview() throws IOException { public void testMutateHandler() throws Exception { final EmbeddedChannel embeddedChannel = new EmbeddedChannel(admissionHandler); var sessionJob = new FlinkSessionJob(); +ObjectMeta objectMeta = new ObjectMeta(); Review Comment: We should also add a test that has a custom mutator and make sure it is called. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
AncyRominus commented on code in PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1440100481 ## flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java: ## @@ -17,43 +17,88 @@ package org.apache.flink.kubernetes.operator.admission.mutator; +import org.apache.flink.kubernetes.operator.admission.informer.InformerManager; import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator; import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.Cache; import io.javaoperatorsdk.webhook.admission.NotAllowedException; import io.javaoperatorsdk.webhook.admission.Operation; import io.javaoperatorsdk.webhook.admission.mutation.Mutator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.Optional; +import java.util.Set; /** The default mutator. */ public class FlinkMutator implements Mutator { private static final Logger LOG = LoggerFactory.getLogger(FlinkMutator.class); private static final ObjectMapper mapper = new ObjectMapper(); +private final Set mutators; +private final InformerManager informerManager; + +public FlinkMutator(Set mutators, InformerManager informerManager) { +this.mutators = mutators; +this.informerManager = informerManager; +} @Override public HasMetadata mutate(HasMetadata resource, Operation operation) throws NotAllowedException { -if (operation == Operation.CREATE) { +if (operation == Operation.CREATE || operation == Operation.UPDATE) { LOG.debug("Mutating resource {}", resource); - if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) { -try { -var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); -setSessionTargetLabel(sessionJob); -return sessionJob; -} catch (Exception e) { -throw new RuntimeException(e); -} +return mutateSessionJob(resource); +} +if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) { +return mutateDeployment(resource); } } return resource; } +private FlinkSessionJob mutateSessionJob(HasMetadata resource) { +try { +var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); +var namespace = sessionJob.getMetadata().getNamespace(); +var deploymentName = sessionJob.getSpec().getDeploymentName(); +var key = Cache.namespaceKeyFunc(namespace, deploymentName); +var deployment = + informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key); + +setSessionTargetLabel(sessionJob); Review Comment: Thanks @gyfora ! Ys, that's a good point. I have added the logic for setting session target label as part of default mutator logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
AncyRominus commented on code in PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1440100481 ## flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java: ## @@ -17,43 +17,88 @@ package org.apache.flink.kubernetes.operator.admission.mutator; +import org.apache.flink.kubernetes.operator.admission.informer.InformerManager; import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator; import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.Cache; import io.javaoperatorsdk.webhook.admission.NotAllowedException; import io.javaoperatorsdk.webhook.admission.Operation; import io.javaoperatorsdk.webhook.admission.mutation.Mutator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.Optional; +import java.util.Set; /** The default mutator. */ public class FlinkMutator implements Mutator { private static final Logger LOG = LoggerFactory.getLogger(FlinkMutator.class); private static final ObjectMapper mapper = new ObjectMapper(); +private final Set mutators; +private final InformerManager informerManager; + +public FlinkMutator(Set mutators, InformerManager informerManager) { +this.mutators = mutators; +this.informerManager = informerManager; +} @Override public HasMetadata mutate(HasMetadata resource, Operation operation) throws NotAllowedException { -if (operation == Operation.CREATE) { +if (operation == Operation.CREATE || operation == Operation.UPDATE) { LOG.debug("Mutating resource {}", resource); - if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) { -try { -var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); -setSessionTargetLabel(sessionJob); -return sessionJob; -} catch (Exception e) { -throw new RuntimeException(e); -} +return mutateSessionJob(resource); +} +if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) { +return mutateDeployment(resource); } } return resource; } +private FlinkSessionJob mutateSessionJob(HasMetadata resource) { +try { +var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); +var namespace = sessionJob.getMetadata().getNamespace(); +var deploymentName = sessionJob.getSpec().getDeploymentName(); +var key = Cache.namespaceKeyFunc(namespace, deploymentName); +var deployment = + informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key); + +setSessionTargetLabel(sessionJob); Review Comment: Thanks @gyfora ! Added the logic for setting session target label as part of default mutator logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
gyfora commented on code in PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1435156526 ## flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java: ## @@ -17,43 +17,88 @@ package org.apache.flink.kubernetes.operator.admission.mutator; +import org.apache.flink.kubernetes.operator.admission.informer.InformerManager; import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator; import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.Cache; import io.javaoperatorsdk.webhook.admission.NotAllowedException; import io.javaoperatorsdk.webhook.admission.Operation; import io.javaoperatorsdk.webhook.admission.mutation.Mutator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.Optional; +import java.util.Set; /** The default mutator. */ public class FlinkMutator implements Mutator { private static final Logger LOG = LoggerFactory.getLogger(FlinkMutator.class); private static final ObjectMapper mapper = new ObjectMapper(); +private final Set mutators; +private final InformerManager informerManager; + +public FlinkMutator(Set mutators, InformerManager informerManager) { +this.mutators = mutators; +this.informerManager = informerManager; +} @Override public HasMetadata mutate(HasMetadata resource, Operation operation) throws NotAllowedException { -if (operation == Operation.CREATE) { +if (operation == Operation.CREATE || operation == Operation.UPDATE) { LOG.debug("Mutating resource {}", resource); - if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) { -try { -var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); -setSessionTargetLabel(sessionJob); -return sessionJob; -} catch (Exception e) { -throw new RuntimeException(e); -} +return mutateSessionJob(resource); +} +if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) { +return mutateDeployment(resource); } } return resource; } +private FlinkSessionJob mutateSessionJob(HasMetadata resource) { +try { +var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); +var namespace = sessionJob.getMetadata().getNamespace(); +var deploymentName = sessionJob.getSpec().getDeploymentName(); +var key = Cache.namespaceKeyFunc(namespace, deploymentName); +var deployment = + informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key); + +setSessionTargetLabel(sessionJob); Review Comment: I mean `setSessionTargetLabel(sessionJob);` should be in the default mutator -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
gyfora commented on code in PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1435156017 ## flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java: ## @@ -17,43 +17,88 @@ package org.apache.flink.kubernetes.operator.admission.mutator; +import org.apache.flink.kubernetes.operator.admission.informer.InformerManager; import org.apache.flink.kubernetes.operator.api.CrdConstants; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.api.FlinkSessionJob; +import org.apache.flink.kubernetes.operator.mutator.FlinkResourceMutator; import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.Cache; import io.javaoperatorsdk.webhook.admission.NotAllowedException; import io.javaoperatorsdk.webhook.admission.Operation; import io.javaoperatorsdk.webhook.admission.mutation.Mutator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.Optional; +import java.util.Set; /** The default mutator. */ public class FlinkMutator implements Mutator { private static final Logger LOG = LoggerFactory.getLogger(FlinkMutator.class); private static final ObjectMapper mapper = new ObjectMapper(); +private final Set mutators; +private final InformerManager informerManager; + +public FlinkMutator(Set mutators, InformerManager informerManager) { +this.mutators = mutators; +this.informerManager = informerManager; +} @Override public HasMetadata mutate(HasMetadata resource, Operation operation) throws NotAllowedException { -if (operation == Operation.CREATE) { +if (operation == Operation.CREATE || operation == Operation.UPDATE) { LOG.debug("Mutating resource {}", resource); - if (CrdConstants.KIND_SESSION_JOB.equals(resource.getKind())) { -try { -var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); -setSessionTargetLabel(sessionJob); -return sessionJob; -} catch (Exception e) { -throw new RuntimeException(e); -} +return mutateSessionJob(resource); +} +if (CrdConstants.KIND_FLINK_DEPLOYMENT.equals(resource.getKind())) { +return mutateDeployment(resource); } } return resource; } +private FlinkSessionJob mutateSessionJob(HasMetadata resource) { +try { +var sessionJob = mapper.convertValue(resource, FlinkSessionJob.class); +var namespace = sessionJob.getMetadata().getNamespace(); +var deploymentName = sessionJob.getSpec().getDeploymentName(); +var key = Cache.namespaceKeyFunc(namespace, deploymentName); +var deployment = + informerManager.getFlinkDepInformer(namespace).getStore().getByKey(key); + +setSessionTargetLabel(sessionJob); Review Comment: I think if we are adding a mutator abstraction, this logic should be part of the default mutator for session jobs -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
tagarr commented on PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#issuecomment-1864167599 @gyfora We had considered adding mutation into the controller loop in the same manner that the validator is done, but we believe that mutation of the CR should only really be done on it’s creation or user modified updates. Changes within the reconcile could cause unknown issues and confuse the logic within the controller -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
AncyRominus commented on PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#issuecomment-1859901718 Thanks for the comments @tagarr and @gyfora !!! The modify labels are set in FlinkMutator.java and the other mutator changes (if any) be added to the default or the custom mutators. The default/custom mutator sessionJob changes are iterated and the mutateSessionJobs are processed. Yes, we can add the sessionjob mutation for Update as well as for Create operations. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
AncyRominus commented on code in PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1429772463 ## flink-kubernetes-operator/pom.xml: ## @@ -208,6 +208,18 @@ under the License. junit-jupiter-params test + + +io.javaoperatorsdk +kubernetes-webhooks-framework-core +${operator.sdk.webhook-framework.version} + + +* +* + + + Review Comment: This dependency is for the Kubernetes-webhooks-framework. This can be removed as we don't have any strong relation with this dependency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
AncyRominus commented on code in PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1429764133 ## flink-kubernetes-webhook/pom.xml: ## @@ -127,6 +132,11 @@ under the License. + +org.apache.maven.plugins +maven-surefire-plugin +${surefire-plugin.version} Review Comment: This plugin is required for the successful execution of the test AdmissionHandlerTest.testMutateHandler. Else it will throw <500 Internal Server Error> while mutating the SessionJob as it’s using the deployment from the informerManager which uses Kubernetes Client. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
gyfora commented on PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#issuecomment-1855797398 There is already a mutator implementation `FlinkMutator` I don't see that mentioned or touched anywhere here. Could we also make sure that it is covered by the new interfaces as well? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
gyfora commented on code in PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#discussion_r1426674867 ## flink-kubernetes-webhook/pom.xml: ## @@ -127,6 +132,11 @@ under the License. + +org.apache.maven.plugins +maven-surefire-plugin +${surefire-plugin.version} Review Comment: Why do we need this new plugin? ## flink-kubernetes-operator/pom.xml: ## @@ -208,6 +208,18 @@ under the License. junit-jupiter-params test + + +io.javaoperatorsdk +kubernetes-webhooks-framework-core +${operator.sdk.webhook-framework.version} + + +* +* + + + Review Comment: Why do we need this new dependency inside the operator? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
gyfora commented on PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#issuecomment-1855787058 ``` Brief change log No change in logs ``` This section should ideally be filled out with the list of changes that were made to implement the given feature / bug fix :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
tagarr commented on PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#issuecomment-1855604122 @gyfora would love your opinion on this... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
tagarr commented on PR #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733#issuecomment-1855602604 Looks good. My only question would be to the committers. 1. Would we like the sessionjob changes (i.e. modify labels) be done in flink-kubernetes-webhook/src/main/java/org/apache/flink/kubernetes/operator/admission/mutator/FlinkMutator.java as it currently is or pushed down to the default mutator ? 2. Do we want to add sessionjob mutation for Update as well as for Create operations ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
AncyRominus closed pull request #732: [FLINK-33632] Adding custom flink mutator URL: https://github.com/apache/flink-kubernetes-operator/pull/732 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33632] Adding custom flink mutator [flink-kubernetes-operator]
AncyRominus opened a new pull request, #733: URL: https://github.com/apache/flink-kubernetes-operator/pull/733 ## What is the purpose of the change Adding Custom Flink Mutator ## Brief change log No change in logs ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Manually verified the change by running a 4 node cluster with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changes to the `CustomResourceDescriptors`: (yes / no) - Core observer or reconciler logic that is regularly executed: (yes / no) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org