gyfora commented on code in PR #202:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/202#discussion_r872744006


##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java:
##########
@@ -141,4 +142,15 @@ public class KubernetesOperatorConfigOptions {
                     .noDefaultValue()
                     .withDescription(
                             "Whether to enable recovery of missing/deleted 
jobmanager deployments. False by default for Flink 1.14, true for newer Flink 
version.");
+
+    public static final ConfigOption<Map<String, String>> 
JAR_ARTIFACT_HTTP_HEADER =
+            ConfigOptions.key("kubernetes.operator.user.artifacts.http.header")
+                    .mapType()
+                    .noDefaultValue()
+                    .withDescription(
+                            "Custom HTTP header for a Flink job. If configured 
in cluster level, headers will be applied to all jobs within"

Review Comment:
   I think the description should detail what the config is good for. Does it 
affect artifact downloading?
   
   It doesn't need to explain how configurations work, we have general docs for 
that. So I would completely remove this part:
   ```
   If configured in cluster level, headers will be applied to all jobs within 
the cluster. This field can also be configured under 
spec.job.flinkConfiguration for a specific session job within a session 
cluster. If configured at session job level, it will override the cluster level 
configuration.
   ```



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment 
deployment) {
         return getConfig(deployment.getMetadata(), 
ReconciliationUtils.getDeployedSpec(deployment));
     }
 
+    public Configuration getSessionJobObserveConfig(

Review Comment:
   I think it's a bit confusing to call it `getSessionJobObserveConfig`. I 
would prefer to call it simply `getSessionJobConfig`



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment 
deployment) {
         return getConfig(deployment.getMetadata(), 
ReconciliationUtils.getDeployedSpec(deployment));
     }
 
+    public Configuration getSessionJobObserveConfig(
+            FlinkDeployment deployment, FlinkSessionJob flinkSessionJob) {
+        Configuration sessionJobConfig = getObserveConfig(deployment);
+
+        // merge session job specific config
+        Map<String, String> sessionJobFlinkConfiguration =
+                flinkSessionJob.getSpec().getFlinkConfiguration();
+        if (sessionJobFlinkConfiguration != null && 
!sessionJobFlinkConfiguration.isEmpty()) {

Review Comment:
   The `!sessionJobFlinkConfiguration.isEmpty()` check is not necessary



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java:
##########
@@ -41,6 +42,7 @@ public class FlinkOperatorConfiguration {
     Duration flinkCancelJobTimeout;
     Duration flinkShutdownClusterTimeout;
     String artifactsBaseDir;
+    Map<String, String> artifactHttpHeader;

Review Comment:
   We shouldn't add this extra field here, it's not an operator configuration 
and it's also not used anywhere



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/FileSystemBasedArtifactFetcher.java:
##########
@@ -33,7 +34,8 @@ public class FileSystemBasedArtifactFetcher implements 
ArtifactFetcher {
             new FileSystemBasedArtifactFetcher();
 
     @Override
-    public File fetch(String uri, File targetDir) throws Exception {
+    public File fetch(String uri, Configuration flinkConfiguration, File 
targetDir)

Review Comment:
   Why did you add the config field here? Is it used?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java:
##########
@@ -404,4 +409,21 @@ private Optional<String> validateServiceAccount(String 
serviceAccount) {
         }
         return Optional.empty();
     }
+
+    private Optional<String> validateFlinkSessionJobConfig(
+            Map<String, String> flinkSessionJobConfig) {
+        if (flinkSessionJobConfig == null) {
+            return Optional.empty();
+        }
+
+        for (String key : flinkSessionJobConfig.keySet()) {
+            if (!ALLOWED_FLINK_SESSION_JOB_CONF_KEYS.contains(key)) {

Review Comment:
   Could we please add a simple test for this into the validator test?



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/artifact/HttpArtifactFetcher.java:
##########
@@ -32,12 +37,26 @@ public class HttpArtifactFetcher implements ArtifactFetcher 
{
     public static final HttpArtifactFetcher INSTANCE = new 
HttpArtifactFetcher();
 
     @Override
-    public File fetch(String uri, File targetDir) throws Exception {
+    public File fetch(String uri, Configuration flinkConfiguration, File 
targetDir)
+            throws Exception {
         var start = System.currentTimeMillis();
         URL url = new URL(uri);
+        HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+
+        // merged session job level header and cluster level header, session 
job level header take
+        // precedence.
+        Map<String, String> headers =
+                
flinkConfiguration.get(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER);
+
+        if (headers != null && headers.size() > 0) {

Review Comment:
   size check not necessary



##########
flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigManager.java:
##########
@@ -133,6 +135,19 @@ public Configuration getObserveConfig(FlinkDeployment 
deployment) {
         return getConfig(deployment.getMetadata(), 
ReconciliationUtils.getDeployedSpec(deployment));
     }
 
+    public Configuration getSessionJobObserveConfig(
+            FlinkDeployment deployment, FlinkSessionJob flinkSessionJob) {
+        Configuration sessionJobConfig = getObserveConfig(deployment);
+
+        // merge session job specific config
+        Map<String, String> sessionJobFlinkConfiguration =
+                flinkSessionJob.getSpec().getFlinkConfiguration();
+        if (sessionJobFlinkConfiguration != null && 
!sessionJobFlinkConfiguration.isEmpty()) {
+            sessionJobFlinkConfiguration.forEach(sessionJobConfig::setString);

Review Comment:
   I am wondering for this particular case, would it make sense to merge the 
HTTP headers instead of overwriting them if the base config also defined it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to