[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #239: [FLINK-27714] Migrate to java-operator-sdk v3
morhidi commented on code in PR #239: URL: https://github.com/apache/flink-kubernetes-operator/pull/239#discussion_r883435675 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java: ## @@ -63,7 +63,7 @@ public class FlinkConfigManager { private final AtomicLong defaultConfigVersion = new AtomicLong(0); private final LoadingCache cache; -private Set namespaces = OperatorUtils.getWatchedNamespaces(); +private final Set namespaces = EnvUtils.getWatchedNamespaces(); Review Comment: https://issues.apache.org/jira/browse/FLINK-27812 <- Follow up ticket for dynamic namespaces -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #239: [FLINK-27714] Migrate to java-operator-sdk v3
morhidi commented on code in PR #239: URL: https://github.com/apache/flink-kubernetes-operator/pull/239#discussion_r883435675 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java: ## @@ -63,7 +63,7 @@ public class FlinkConfigManager { private final AtomicLong defaultConfigVersion = new AtomicLong(0); private final LoadingCache cache; -private Set namespaces = OperatorUtils.getWatchedNamespaces(); +private final Set namespaces = EnvUtils.getWatchedNamespaces(); Review Comment: https://issues.apache.org/jira/browse/FLINK-27812 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #239: [FLINK-27714] Migrate to java-operator-sdk v3
morhidi commented on code in PR #239: URL: https://github.com/apache/flink-kubernetes-operator/pull/239#discussion_r882895802 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/informer/InformerManager.java: ## @@ -39,21 +43,24 @@ public class InformerManager { private final KubernetesClient kubernetesClient; private volatile Map> sessionJobInformers; private volatile Map> flinkDepInformers; +public static final String CLUSTER_ID_INDEX = "clusterId_index"; public InformerManager(Set watchedNamespaces, KubernetesClient kubernetesClient) { this.watchedNamespaces = watchedNamespaces; this.kubernetesClient = kubernetesClient; LOG.info( "Created informer manager with watchedNamespaces: {}", watchedNamespaces.isEmpty() -? "[" + OperatorUtils.ALL_NAMESPACE + "]" +? "[" + Constants.WATCH_ALL_NAMESPACES + "]" : watchedNamespaces); } public SharedIndexInformer getSessionJobInformer(String namespace) { Review Comment: Yes, you're correct -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #239: [FLINK-27714] Migrate to java-operator-sdk v3
morhidi commented on code in PR #239: URL: https://github.com/apache/flink-kubernetes-operator/pull/239#discussion_r882885678 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/deployment/AbstractDeploymentObserver.java: ## @@ -199,11 +199,7 @@ protected void observeJmDeployment( } private Optional getSecondaryResource(FlinkDeployment flinkApp, Context context) { -return context.getSecondaryResource( -Deployment.class, - configManager.getOperatorConfiguration().getWatchedNamespaces().size() > 1 -? flinkApp.getMetadata().getNamespace() -: null); +return context.getSecondaryResource(Deployment.class); Review Comment: No, I removed it most of the places. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #239: [FLINK-27714] Migrate to java-operator-sdk v3
morhidi commented on code in PR #239: URL: https://github.com/apache/flink-kubernetes-operator/pull/239#discussion_r882854951 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java: ## @@ -63,7 +63,7 @@ public class FlinkConfigManager { private final AtomicLong defaultConfigVersion = new AtomicLong(0); private final LoadingCache cache; -private Set namespaces = OperatorUtils.getWatchedNamespaces(); +private final Set namespaces = EnvUtils.getWatchedNamespaces(); Review Comment: Correct, that's a larger change, will open a separate ticket for that. I tried to keep the current PR as minimal as possible. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] morhidi commented on a diff in pull request #239: [FLINK-27714] Migrate to java-operator-sdk v3
morhidi commented on code in PR #239: URL: https://github.com/apache/flink-kubernetes-operator/pull/239#discussion_r880370389 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.java: ## @@ -152,26 +160,71 @@ private void handleDeploymentFailed(FlinkDeployment flinkApp, DeploymentFailedEx } @Override -public List prepareEventSources(EventSourceContext ctx) { -if (effectiveNamespaces.isEmpty()) { -return List.of(OperatorUtils.createJmDepInformerEventSource(kubernetesClient)); -} else { -return effectiveNamespaces.stream() -.map(ns -> OperatorUtils.createJmDepInformerEventSource(kubernetesClient, ns)) -.collect(Collectors.toList()); -} +public Map prepareEventSources( +EventSourceContext context) { + +return EventSourceInitializer.nameEventSources( +getSessionJobInformerEventSource(context), +getFlinkDeploymentInformerEventSource(context)); +} + +private InformerEventSource getFlinkDeploymentInformerEventSource( +EventSourceContext context) { +final String labelSelector = +Map.of( +Constants.LABEL_TYPE_KEY, +Constants.LABEL_TYPE_NATIVE_TYPE, +Constants.LABEL_COMPONENT_KEY, +Constants.LABEL_COMPONENT_JOB_MANAGER) +.entrySet().stream() +.map(Object::toString) +.collect(Collectors.joining(",")); + +var configuration = +InformerConfiguration.from(Deployment.class, context) +.withLabelSelector(labelSelector) + .withSecondaryToPrimaryMapper(Mappers.fromLabel(Constants.LABEL_APP_KEY)) +.withNamespacesInheritedFromController(context) +.followNamespaceChanges(true) +.build(); + +return new InformerEventSource<>(configuration, context); } -@VisibleForTesting -public void setEffectiveNamespaces(Set effectiveNamespaces) { -this.effectiveNamespaces = effectiveNamespaces; +private InformerEventSource getSessionJobInformerEventSource( +EventSourceContext context) { + +final SecondaryToPrimaryMapper secondaryToPrimaryMapper = +sessionJob -> +context.getPrimaryCache() Review Comment: thx! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org