[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field
gyfora commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r873198656 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java: ## @@ -141,4 +142,11 @@ public class KubernetesOperatorConfigOptions { .noDefaultValue() .withDescription( "Whether to enable recovery of missing/deleted jobmanager deployments. False by default for Flink 1.14, true for newer Flink version."); + +public static final ConfigOption> JAR_ARTIFACT_HTTP_HEADER = +ConfigOptions.key("kubernetes.operator.user.artifacts.http.header") +.mapType() +.noDefaultValue() +.withDescription( +"Custom HTTP header for a Flink job. Expected format: headerKey1:headerValue1,headerKey2:headerValue2."); Review Comment: Could we change this to: ``` Custom HTTP header for HttpArtifactFetcher. The header will be applied when getting the session job artifacts. Expected format: headerKey1:headerValue1,headerKey2:headerValue2. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field
gyfora commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872744006 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java: ## @@ -141,4 +142,15 @@ public class KubernetesOperatorConfigOptions { .noDefaultValue() .withDescription( "Whether to enable recovery of missing/deleted jobmanager deployments. False by default for Flink 1.14, true for newer Flink version."); + +public static final ConfigOption> JAR_ARTIFACT_HTTP_HEADER = +ConfigOptions.key("kubernetes.operator.user.artifacts.http.header") +.mapType() +.noDefaultValue() +.withDescription( +"Custom HTTP header for a Flink job. If configured in cluster level, headers will be applied to all jobs within" Review Comment: I think the description should detail what the config is good for. Does it affect artifact downloading? It doesn't need to explain how configurations work, we have general docs for that. So I would completely remove this part: ``` If configured in cluster level, headers will be applied to all jobs within the cluster. This field can also be configured under spec.job.flinkConfiguration for a specific session job within a session cluster. If configured at session job level, it will override the cluster level configuration. ``` ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java: ## @@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment deployment) { return getConfig(deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment)); } +public Configuration getSessionJobObserveConfig( Review Comment: I think it's a bit confusing to call it `getSessionJobObserveConfig`. I would prefer to call it simply `getSessionJobConfig` ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java: ## @@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment deployment) { return getConfig(deployment.getMetadata(), ReconciliationUtils.getDeployedSpec(deployment)); } +public Configuration getSessionJobObserveConfig( +FlinkDeployment deployment, FlinkSessionJob flinkSessionJob) { +Configuration sessionJobConfig = getObserveConfig(deployment); + +// merge session job specific config +Map sessionJobFlinkConfiguration = +flinkSessionJob.getSpec().getFlinkConfiguration(); +if (sessionJobFlinkConfiguration != null && !sessionJobFlinkConfiguration.isEmpty()) { Review Comment: The `!sessionJobFlinkConfiguration.isEmpty()` check is not necessary ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java: ## @@ -41,6 +42,7 @@ public class FlinkOperatorConfiguration { Duration flinkCancelJobTimeout; Duration flinkShutdownClusterTimeout; String artifactsBaseDir; +Map artifactHttpHeader; Review Comment: We shouldn't add this extra field here, it's not an operator configuration and it's also not used anywhere ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/FileSystemBasedArtifactFetcher.java: ## @@ -33,7 +34,8 @@ public class FileSystemBasedArtifactFetcher implements ArtifactFetcher { new FileSystemBasedArtifactFetcher(); @Override -public File fetch(String uri, File targetDir) throws Exception { +public File fetch(String uri, Configuration flinkConfiguration, File targetDir) Review Comment: Why did you add the config field here? Is it used? ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java: ## @@ -404,4 +409,21 @@ private Optional validateServiceAccount(String serviceAccount) { } return Optional.empty(); } + +private Optional validateFlinkSessionJobConfig( +Map flinkSessionJobConfig) { +if (flinkSessionJobConfig == null) { +return Optional.empty(); +} + +for (String key : flinkSessionJobConfig.keySet()) { +if (!ALLOWED_FLINK_SESSION_JOB_CONF_KEYS.contains(key)) { Review Comment: Could we please add a simple test for this into the validator test? ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/HttpArtifactFetcher.java: ## @@ -32,12 +37,26 @@ public class HttpArtifactFetcher implements ArtifactFetcher { public static final HttpArtifactFetcher INSTANCE = new HttpArtifactFetcher(); @Override -
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field
gyfora commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872488147 ## docs/content/docs/operations/configuration.md: ## @@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please refer to the dedica ## Operator Configuration Reference {{< generated/kubernetes_operator_config_configuration >}} + +## Job Specific Configuration Reference +Job specific configuration can be configured under `spec.job.flinkConfiguration` and it will override flink configurations defined in `flink-conf.yaml`. + +- For application clusters, `spec.job.flinkConfiguration` will be located in `FlinkDeployment` CustomResource. +- For session clusters, configuring `spec.job.flinkConfiguration` in parent `FlinkDeployment` will be applied to all session jobs within the session cluster. +You can also configure `spec.job.flinkConfiguration` in `FlinkSessionJob` CustomResource for a specific session job. +The session job level configuration will override the parent session cluster's Flink configuration. Review Comment: If we can enumerate the possible configuration options and keys we can simply add a method to the validator that checks that the user doesn't try to set something that won't take effect -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field
gyfora commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r869906352 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorSessionJobConfigOptions.java: ## @@ -0,0 +1,44 @@ +package org.apache.flink.kubernetes.operator.config; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** List supported session job specific configurations */ +public enum KubernetesOperatorSessionJobConfigOptions { + + SESSION_JOB_HTTP_JAR_HEADERS("kubernetes.operator.user.artifacts.http.header"); Review Comment: But I would allow users to put the header conf in the default Flink conf yaml to have some default value applied in case the session job doesn’t want to ovverride it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field
gyfora commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r869905741 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorSessionJobConfigOptions.java: ## @@ -0,0 +1,44 @@ +package org.apache.flink.kubernetes.operator.config; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** List supported session job specific configurations */ +public enum KubernetesOperatorSessionJobConfigOptions { + + SESSION_JOB_HTTP_JAR_HEADERS("kubernetes.operator.user.artifacts.http.header"); Review Comment: I am not suggesting to change the logic, just the location of the constant you defined to not create unnecessary new classes and spread the options too much :) The operator should always use the config defined in the resource (session job/deployment) no difference. We usually apply this logic on top of the default config defined in the operator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field
gyfora commented on code in PR #202: URL: https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r869878137 ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorSessionJobConfigOptions.java: ## @@ -0,0 +1,44 @@ +package org.apache.flink.kubernetes.operator.config; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +/** List supported session job specific configurations */ +public enum KubernetesOperatorSessionJobConfigOptions { + + SESSION_JOB_HTTP_JAR_HEADERS("kubernetes.operator.user.artifacts.http.header"); Review Comment: I think this all can be simply part of the `KuberenetesOperatorConfigOptions` no need for a new class ## flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkSessionJobSpec.java: ## @@ -38,4 +40,7 @@ public class FlinkSessionJobSpec extends AbstractFlinkSpec { /** The name of the target session cluster deployment. */ private String deploymentName; + +/** Session job specific configuration */ +private Map sessionJobFlinkConfiguration; Review Comment: Instead of specificng a new config here, lets move the `flinkConfiguration` field of the `FlinkDeploymentSpec` up to `AbstractFlinkSpec` that way it will be inherited -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org