[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field

2022-05-15 Thread GitBox


gyfora commented on code in PR #202:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r873198656


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##
@@ -141,4 +142,11 @@ public class KubernetesOperatorConfigOptions {
 .noDefaultValue()
 .withDescription(
 "Whether to enable recovery of missing/deleted 
jobmanager deployments. False by default for Flink 1.14, true for newer Flink 
version.");
+
+public static final ConfigOption> 
JAR_ARTIFACT_HTTP_HEADER =
+ConfigOptions.key("kubernetes.operator.user.artifacts.http.header")
+.mapType()
+.noDefaultValue()
+.withDescription(
+"Custom HTTP header for a Flink job. Expected 
format: headerKey1:headerValue1,headerKey2:headerValue2.");

Review Comment:
   Could we change this to: 
   ```
   Custom HTTP header for HttpArtifactFetcher. The header will be applied when 
getting the session job artifacts.  Expected format: 
headerKey1:headerValue1,headerKey2:headerValue2.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field

2022-05-13 Thread GitBox


gyfora commented on code in PR #202:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872744006


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##
@@ -141,4 +142,15 @@ public class KubernetesOperatorConfigOptions {
 .noDefaultValue()
 .withDescription(
 "Whether to enable recovery of missing/deleted 
jobmanager deployments. False by default for Flink 1.14, true for newer Flink 
version.");
+
+public static final ConfigOption> 
JAR_ARTIFACT_HTTP_HEADER =
+ConfigOptions.key("kubernetes.operator.user.artifacts.http.header")
+.mapType()
+.noDefaultValue()
+.withDescription(
+"Custom HTTP header for a Flink job. If configured 
in cluster level, headers will be applied to all jobs within"

Review Comment:
   I think the description should detail what the config is good for. Does it 
affect artifact downloading?
   
   It doesn't need to explain how configurations work, we have general docs for 
that. So I would completely remove this part:
   ```
   If configured in cluster level, headers will be applied to all jobs within 
the cluster. This field can also be configured under 
spec.job.flinkConfiguration for a specific session job within a session 
cluster. If configured at session job level, it will override the cluster level 
configuration.
   ```



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##
@@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment 
deployment) {
 return getConfig(deployment.getMetadata(), 
ReconciliationUtils.getDeployedSpec(deployment));
 }
 
+public Configuration getSessionJobObserveConfig(

Review Comment:
   I think it's a bit confusing to call it `getSessionJobObserveConfig`. I 
would prefer to call it simply `getSessionJobConfig`



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##
@@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment 
deployment) {
 return getConfig(deployment.getMetadata(), 
ReconciliationUtils.getDeployedSpec(deployment));
 }
 
+public Configuration getSessionJobObserveConfig(
+FlinkDeployment deployment, FlinkSessionJob flinkSessionJob) {
+Configuration sessionJobConfig = getObserveConfig(deployment);
+
+// merge session job specific config
+Map sessionJobFlinkConfiguration =
+flinkSessionJob.getSpec().getFlinkConfiguration();
+if (sessionJobFlinkConfiguration != null && 
!sessionJobFlinkConfiguration.isEmpty()) {

Review Comment:
   The `!sessionJobFlinkConfiguration.isEmpty()` check is not necessary



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java:
##
@@ -41,6 +42,7 @@ public class FlinkOperatorConfiguration {
 Duration flinkCancelJobTimeout;
 Duration flinkShutdownClusterTimeout;
 String artifactsBaseDir;
+Map artifactHttpHeader;

Review Comment:
   We shouldn't add this extra field here, it's not an operator configuration 
and it's also not used anywhere



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/FileSystemBasedArtifactFetcher.java:
##
@@ -33,7 +34,8 @@ public class FileSystemBasedArtifactFetcher implements 
ArtifactFetcher {
 new FileSystemBasedArtifactFetcher();
 
 @Override
-public File fetch(String uri, File targetDir) throws Exception {
+public File fetch(String uri, Configuration flinkConfiguration, File 
targetDir)

Review Comment:
   Why did you add the config field here? Is it used?



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##
@@ -404,4 +409,21 @@ private Optional validateServiceAccount(String 
serviceAccount) {
 }
 return Optional.empty();
 }
+
+private Optional validateFlinkSessionJobConfig(
+Map flinkSessionJobConfig) {
+if (flinkSessionJobConfig == null) {
+return Optional.empty();
+}
+
+for (String key : flinkSessionJobConfig.keySet()) {
+if (!ALLOWED_FLINK_SESSION_JOB_CONF_KEYS.contains(key)) {

Review Comment:
   Could we please add a simple test for this into the validator test?



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/HttpArtifactFetcher.java:
##
@@ -32,12 +37,26 @@ public class HttpArtifactFetcher implements ArtifactFetcher 
{
 public static final HttpArtifactFetcher INSTANCE = new 
HttpArtifactFetcher();
 
 @Override
- 

[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field

2022-05-13 Thread GitBox


gyfora commented on code in PR #202:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872488147


##
docs/content/docs/operations/configuration.md:
##
@@ -53,3 +53,11 @@ To learn more about metrics and logging configuration please 
refer to the dedica
 ## Operator Configuration Reference
 
 {{< generated/kubernetes_operator_config_configuration >}}
+
+## Job Specific Configuration Reference
+Job specific configuration can be configured under 
`spec.job.flinkConfiguration` and it will override flink configurations defined 
in `flink-conf.yaml`.
+
+- For application clusters, `spec.job.flinkConfiguration` will be located in 
`FlinkDeployment` CustomResource.
+- For session clusters, configuring `spec.job.flinkConfiguration` in parent 
`FlinkDeployment` will be applied to all session jobs within the session 
cluster.
+You can also configure `spec.job.flinkConfiguration` in `FlinkSessionJob` 
CustomResource for a specific session job. 
+The session job level configuration will override the parent session cluster's 
Flink configuration.

Review Comment:
   If we can enumerate the possible configuration options and keys we can 
simply add a method to the validator that checks that the user doesn't try to 
set something that won't take effect



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field

2022-05-11 Thread GitBox


gyfora commented on code in PR #202:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r869906352


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorSessionJobConfigOptions.java:
##
@@ -0,0 +1,44 @@
+package org.apache.flink.kubernetes.operator.config;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/** List supported session job specific configurations */
+public enum KubernetesOperatorSessionJobConfigOptions {
+
+
SESSION_JOB_HTTP_JAR_HEADERS("kubernetes.operator.user.artifacts.http.header");

Review Comment:
   But I would allow users to put the header conf in the default Flink conf 
yaml to have some default value applied in case the session job doesn’t want to 
ovverride it 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field

2022-05-11 Thread GitBox


gyfora commented on code in PR #202:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r869905741


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorSessionJobConfigOptions.java:
##
@@ -0,0 +1,44 @@
+package org.apache.flink.kubernetes.operator.config;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/** List supported session job specific configurations */
+public enum KubernetesOperatorSessionJobConfigOptions {
+
+
SESSION_JOB_HTTP_JAR_HEADERS("kubernetes.operator.user.artifacts.http.header");

Review Comment:
   I am not suggesting to change the logic, just the location of the constant 
you defined to not create unnecessary new classes and spread the options too 
much :)
   
   The operator should always use the config defined in the resource (session 
job/deployment) no difference. We usually apply this logic on top of the 
default config defined in the operator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [flink-kubernetes-operator] gyfora commented on a diff in pull request #202: [FLINK-27483]Add session job config field

2022-05-10 Thread GitBox


gyfora commented on code in PR #202:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r869878137


##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorSessionJobConfigOptions.java:
##
@@ -0,0 +1,44 @@
+package org.apache.flink.kubernetes.operator.config;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+/** List supported session job specific configurations */
+public enum KubernetesOperatorSessionJobConfigOptions {
+
+
SESSION_JOB_HTTP_JAR_HEADERS("kubernetes.operator.user.artifacts.http.header");

Review Comment:
   I think this all can be simply part of the 
`KuberenetesOperatorConfigOptions` no need for a new class



##
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/crd/spec/FlinkSessionJobSpec.java:
##
@@ -38,4 +40,7 @@ public class FlinkSessionJobSpec extends AbstractFlinkSpec {
 
 /** The name of the target session cluster deployment. */
 private String deploymentName;
+
+/** Session job specific configuration */
+private Map sessionJobFlinkConfiguration;

Review Comment:
   Instead of specificng a new config here, lets move the `flinkConfiguration` 
field of the `FlinkDeploymentSpec` up to `AbstractFlinkSpec` that way it will 
be inherited



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org