This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new add7fb3a [hotfix] Validate jarURI in DefaultValidator
add7fb3a is described below

commit add7fb3aacd965a9d0c403b43549b96f48d53283
Author: Andrea Cosentino <[email protected]>
AuthorDate: Tue Apr 28 10:56:15 2026 +0200

    [hotfix] Validate jarURI in DefaultValidator
---
 docs/content.zh/docs/custom-resource/overview.md   |   6 ++
 docs/content/docs/custom-resource/overview.md      |   6 ++
 .../kubernetes_operator_config_configuration.html  |  12 +++
 .../shortcodes/generated/system_section.html       |  12 +++
 .../operator/api/utils/BaseTestUtils.java          |   3 +-
 .../config/FlinkOperatorConfiguration.java         |  14 ++-
 .../config/KubernetesOperatorConfigOptions.java    |  23 +++++
 .../operator/validation/DefaultValidator.java      |  78 +++++++++++++-
 .../operator/validation/DefaultValidatorTest.java  | 113 +++++++++++++++++++++
 9 files changed, 264 insertions(+), 3 deletions(-)

diff --git a/docs/content.zh/docs/custom-resource/overview.md 
b/docs/content.zh/docs/custom-resource/overview.md
index fecc4c93..e3dc5ed4 100644
--- a/docs/content.zh/docs/custom-resource/overview.md
+++ b/docs/content.zh/docs/custom-resource/overview.md
@@ -201,6 +201,12 @@ The job specification has the same structure in 
FlinkSessionJobs and FlinkDeploy
 It leverages the [Flink 
filesystem](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/overview/)
 mechanism to download the jar and submit to the session cluster.
 So the FlinkSessionJob must be run with an existing session cluster managed by 
the FlinkDeployment.
 
+By default the FlinkSessionJob `jarURI` is restricted to the `https` scheme.
+For `https`, hosts that resolve to loopback, link-local, site-local, wildcard 
or multicast addresses (e.g. cloud metadata services such as `169.254.169.254`) 
are also rejected.
+If you need to fetch artifacts via additional schemes such as `s3` or `hdfs`, 
extend the allowlist via `kubernetes.operator.user.artifacts.allowed-schemes` 
(and review `kubernetes.operator.user.artifacts.disallow-restricted-hosts` if 
you legitimately need to reach private addresses).
+Both options are operator-level and can only be set in the operator 
configuration; values supplied in a CR's `flinkConfiguration` are ignored for 
these keys.
+The check applies only to FlinkSessionJob — FlinkDeployment `jarURI` is not 
validated, since application clusters typically reference a JAR shipped inside 
the image (e.g. `local://`) and the operator does not fetch it.
+
 To support jar from different filesystems, you should extend the base docker 
image as below, and put the related filesystem jar to the plugin dir and deploy 
the operator.
 For example, to support the hadoop fs resource:
 
diff --git a/docs/content/docs/custom-resource/overview.md 
b/docs/content/docs/custom-resource/overview.md
index 86855814..303d0065 100644
--- a/docs/content/docs/custom-resource/overview.md
+++ b/docs/content/docs/custom-resource/overview.md
@@ -204,6 +204,12 @@ The job specification has the same structure in 
FlinkSessionJobs and FlinkDeploy
 It leverages the [Flink 
filesystem](https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/filesystems/overview/)
 mechanism to download the jar and submit to the session cluster.
 So the FlinkSessionJob must be run with an existing session cluster managed by 
the FlinkDeployment.
 
+By default the FlinkSessionJob `jarURI` is restricted to the `https` scheme.
+For `https`, hosts that resolve to loopback, link-local, site-local, wildcard 
or multicast addresses (e.g. cloud metadata services such as `169.254.169.254`) 
are also rejected.
+If you need to fetch artifacts via additional schemes such as `s3` or `hdfs`, 
extend the allowlist via `kubernetes.operator.user.artifacts.allowed-schemes` 
(and review `kubernetes.operator.user.artifacts.disallow-restricted-hosts` if 
you legitimately need to reach private addresses).
+Both options are operator-level and can only be set in the operator 
configuration; values supplied in a CR's `flinkConfiguration` are ignored for 
these keys.
+The check applies only to FlinkSessionJob — FlinkDeployment `jarURI` is not 
validated, since application clusters typically reference a JAR shipped inside 
the image (e.g. `local://`) and the operator does not fetch it.
+
 To support jar from different filesystems, you should extend the base docker 
image as below, and put the related filesystem jar to the plugin dir and deploy 
the operator.
 For example, to support the hadoop fs resource:
 
diff --git 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
index 7cac60f1..22a8da16 100644
--- 
a/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
+++ 
b/docs/layouts/shortcodes/generated/kubernetes_operator_config_configuration.html
@@ -458,12 +458,24 @@
             <td>Duration</td>
             <td>Operator shutdown timeout before reconciliation threads are 
killed.</td>
         </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.user.artifacts.allowed-schemes</h5></td>
+            <td style="word-wrap: break-word;">"https"</td>
+            <td>List&lt;String&gt;</td>
+            <td>Comma separated list of URI schemes that are allowed for the 
FlinkSessionJob jarURI. Only 'https' is allowed by default. Operators that need 
to fetch artifacts via other schemes (such as 's3' or 'hdfs') can extend this 
list. Scheme matching is case-insensitive.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.user.artifacts.base.dir</h5></td>
             <td style="word-wrap: break-word;">"/opt/flink/artifacts"</td>
             <td>String</td>
             <td>The base dir to put the session job artifacts.</td>
         </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.user.artifacts.disallow-restricted-hosts</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>If enabled, FlinkSessionJob jarURI hosts that resolve to 
loopback, link-local, site-local, wildcard or multicast addresses are rejected 
during validation. Disable only if the operator legitimately needs to fetch 
from such addresses.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.user.artifacts.http.header</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
diff --git a/docs/layouts/shortcodes/generated/system_section.html 
b/docs/layouts/shortcodes/generated/system_section.html
index 6add6cc0..b7cb7e22 100644
--- a/docs/layouts/shortcodes/generated/system_section.html
+++ b/docs/layouts/shortcodes/generated/system_section.html
@@ -140,12 +140,24 @@
             <td>Duration</td>
             <td>Max interval of retries on unhandled controller errors.</td>
         </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.user.artifacts.allowed-schemes</h5></td>
+            <td style="word-wrap: break-word;">"https"</td>
+            <td>List&lt;String&gt;</td>
+            <td>Comma separated list of URI schemes that are allowed for the 
FlinkSessionJob jarURI. Only 'https' is allowed by default. Operators that need 
to fetch artifacts via other schemes (such as 's3' or 'hdfs') can extend this 
list. Scheme matching is case-insensitive.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.user.artifacts.base.dir</h5></td>
             <td style="word-wrap: break-word;">"/opt/flink/artifacts"</td>
             <td>String</td>
             <td>The base dir to put the session job artifacts.</td>
         </tr>
+        <tr>
+            
<td><h5>kubernetes.operator.user.artifacts.disallow-restricted-hosts</h5></td>
+            <td style="word-wrap: break-word;">true</td>
+            <td>Boolean</td>
+            <td>If enabled, FlinkSessionJob jarURI hosts that resolve to 
loopback, link-local, site-local, wildcard or multicast addresses are rejected 
during validation. Disable only if the operator legitimately needs to fetch 
from such addresses.</td>
+        </tr>
         <tr>
             <td><h5>kubernetes.operator.watched.namespaces</h5></td>
             <td style="word-wrap: break-word;">"JOSDK_ALL_NAMESPACES"</td>
diff --git 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
index 5a47bbd2..cd30143a 100644
--- 
a/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
+++ 
b/flink-kubernetes-operator-api/src/test/java/org/apache/flink/kubernetes/operator/api/utils/BaseTestUtils.java
@@ -67,6 +67,7 @@ public class BaseTestUtils {
     public static final String IMAGE = String.format("flink:%s", 
FLINK_VERSION);
     public static final String IMAGE_POLICY = "IfNotPresent";
     public static final String SAMPLE_JAR = "local:///tmp/sample.jar";
+    public static final String SAMPLE_SESSION_JOB_JAR = 
"https://example.com/sample.jar";;
 
     public static FlinkDeployment buildSessionCluster() {
         return buildSessionCluster(FlinkVersion.v1_17);
@@ -154,7 +155,7 @@ public class BaseTestUtils {
                         .deploymentName(TEST_DEPLOYMENT_NAME)
                         .job(
                                 JobSpec.builder()
-                                        .jarURI(SAMPLE_JAR)
+                                        .jarURI(SAMPLE_SESSION_JOB_JAR)
                                         .parallelism(1)
                                         .upgradeMode(UpgradeMode.STATELESS)
                                         .state(jobState)
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
index 07333282..14da8747 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkOperatorConfiguration.java
@@ -34,6 +34,7 @@ import org.apache.commons.lang3.StringUtils;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -81,6 +82,8 @@ public class FlinkOperatorConfiguration {
     int reportedExceptionEventsMaxStackTraceLength;
     boolean manageIngress;
     Duration jobSubmissionTimeout;
+    List<String> jarUriAllowedSchemes;
+    boolean jarUriDisallowRestrictedHosts;
 
     public static FlinkOperatorConfiguration fromConfiguration(Configuration 
operatorConfig) {
         Duration reconcileInterval =
@@ -211,6 +214,13 @@ public class FlinkOperatorConfiguration {
         Duration jobSubmissionTimeout =
                 
operatorConfig.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_SUBMISSION_TIMEOUT);
 
+        List<String> jarUriAllowedSchemes =
+                
operatorConfig.get(KubernetesOperatorConfigOptions.JAR_URI_ALLOWED_SCHEMES);
+
+        boolean jarUriDisallowRestrictedHosts =
+                operatorConfig.get(
+                        
KubernetesOperatorConfigOptions.JAR_URI_DISALLOW_RESTRICTED_HOSTS);
+
         return new FlinkOperatorConfiguration(
                 reconcileInterval,
                 reconcilerMaxParallelism,
@@ -244,7 +254,9 @@ public class FlinkOperatorConfiguration {
                 reportedExceptionEventsMaxCount,
                 reportedExceptionEventsMaxStackTraceLength,
                 manageIngress,
-                jobSubmissionTimeout);
+                jobSubmissionTimeout,
+                jarUriAllowedSchemes,
+                jarUriDisallowRestrictedHosts);
     }
 
     private static GenericRetry getRetryConfig(Configuration conf) {
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
index 8e418c25..878f6ed4 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/KubernetesOperatorConfigOptions.java
@@ -31,6 +31,7 @@ import io.javaoperatorsdk.operator.api.reconciler.Constants;
 
 import java.time.Duration;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** This class holds configuration constants used by flink operator. */
@@ -332,6 +333,28 @@ public class KubernetesOperatorConfigOptions {
                             "Custom HTTP header for HttpArtifactFetcher. The 
header will be applied when getting the session job artifacts. "
                                     + "Expected format: 
headerKey1:headerValue1,headerKey2:headerValue2.");
 
+    @Documentation.Section(SECTION_SYSTEM)
+    public static final ConfigOption<List<String>> JAR_URI_ALLOWED_SCHEMES =
+            operatorConfig("user.artifacts.allowed-schemes")
+                    .stringType()
+                    .asList()
+                    .defaultValues("https")
+                    .withDescription(
+                            "Comma separated list of URI schemes that are 
allowed for the FlinkSessionJob jarURI. "
+                                    + "Only 'https' is allowed by default. 
Operators that need to fetch artifacts "
+                                    + "via other schemes (such as 's3' or 
'hdfs') can extend this list. "
+                                    + "Scheme matching is case-insensitive.");
+
+    @Documentation.Section(SECTION_SYSTEM)
+    public static final ConfigOption<Boolean> 
JAR_URI_DISALLOW_RESTRICTED_HOSTS =
+            operatorConfig("user.artifacts.disallow-restricted-hosts")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "If enabled, FlinkSessionJob jarURI hosts that 
resolve to loopback, link-local, "
+                                    + "site-local, wildcard or multicast 
addresses are rejected during validation. "
+                                    + "Disable only if the operator 
legitimately needs to fetch from such addresses.");
+
     @Documentation.Section(SECTION_DYNAMIC)
     public static final ConfigOption<Boolean> SNAPSHOT_RESOURCE_ENABLED =
             operatorConfig("snapshot.resource.enabled")
diff --git 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
index 225bc658..d0cef03a 100644
--- 
a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
+++ 
b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/validation/DefaultValidator.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.kubernetes.operator.validation;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.autoscaler.validation.AutoscalerValidator;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
@@ -57,11 +58,18 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.util.Collection;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 
 /** Default validator implementation for {@link FlinkDeployment}. */
 public class DefaultValidator implements FlinkResourceValidator {
@@ -290,6 +298,73 @@ public class DefaultValidator implements 
FlinkResourceValidator {
         return Optional.empty();
     }
 
+    @VisibleForTesting
+    static Optional<String> validateJarURI(
+            String jarURI, Collection<String> allowedSchemes, boolean 
disallowRestrictedHosts) {
+        if (jarURI == null) {
+            return Optional.empty();
+        }
+
+        URI uri;
+        try {
+            uri = new URI(jarURI);
+        } catch (URISyntaxException e) {
+            return Optional.of("jarURI is not a valid URI: " + e.getMessage());
+        }
+
+        String scheme = uri.getScheme();
+        if (scheme == null) {
+            return Optional.of("jarURI must include a scheme");
+        }
+
+        Set<String> normalizedAllowedSchemes =
+                allowedSchemes.stream()
+                        .map(s -> s.toLowerCase(Locale.ROOT))
+                        .collect(Collectors.toSet());
+        if 
(!normalizedAllowedSchemes.contains(scheme.toLowerCase(Locale.ROOT))) {
+            return Optional.of(
+                    String.format(
+                            "jarURI scheme '%s' is not in the allowlist %s. 
Configure '%s' to extend the allowlist.",
+                            scheme,
+                            normalizedAllowedSchemes,
+                            
KubernetesOperatorConfigOptions.JAR_URI_ALLOWED_SCHEMES.key()));
+        }
+
+        if (("http".equalsIgnoreCase(scheme) || 
"https".equalsIgnoreCase(scheme))
+                && disallowRestrictedHosts) {
+            String host = uri.getHost();
+            if (host == null || host.isEmpty()) {
+                return Optional.of("jarURI must include a host for http/https 
schemes");
+            }
+            InetAddress addr;
+            try {
+                addr = InetAddress.getByName(host);
+            } catch (UnknownHostException e) {
+                return Optional.of("jarURI host '" + host + "' cannot be 
resolved");
+            }
+            if (addr.isLoopbackAddress()
+                    || addr.isLinkLocalAddress()
+                    || addr.isSiteLocalAddress()
+                    || addr.isAnyLocalAddress()
+                    || addr.isMulticastAddress()) {
+                return Optional.of("jarURI host '" + host + "' resolves to a 
restricted address");
+            }
+        }
+        return Optional.empty();
+    }
+
+    private Optional<String> validateSessionJobJarURI(FlinkSessionJob 
sessionJob) {
+        var jobSpec = sessionJob.getSpec().getJob();
+        if (jobSpec == null) {
+            return Optional.empty();
+        }
+        var operatorConfiguration = configManager.getOperatorConfiguration();
+        return validateJarURI(
+                jobSpec.getJarURI(),
+                operatorConfiguration.getJarUriAllowedSchemes(),
+                operatorConfiguration.isJarUriDisallowRestrictedHosts());
+    }
+
     private Optional<String> validateJmSpec(JobManagerSpec jmSpec, Map<String, 
String> confMap) {
         Configuration conf = Configuration.fromMap(confMap);
         var jmMemoryDefined =
@@ -514,7 +589,8 @@ public class DefaultValidator implements 
FlinkResourceValidator {
         return firstPresent(
                 
validateDeploymentName(sessionJob.getSpec().getDeploymentName()),
                 validateJobNotEmpty(sessionJob),
-                validateSpecChange(sessionJob));
+                validateSpecChange(sessionJob),
+                validateSessionJobJarURI(sessionJob));
     }
 
     private Optional<String> validateSessionJobWithCluster(
diff --git 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
index 5738555b..bba875a5 100644
--- 
a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
+++ 
b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.java
@@ -61,6 +61,7 @@ import javax.annotation.Nullable;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
@@ -601,6 +602,118 @@ public class DefaultValidatorTest {
                 "InitialSavepointPath must not be empty for savepoint 
redeploymen");
     }
 
+    @Test
+    public void testJarUriSchemeValidation() {
+        var defaultAllowed = List.of("https");
+
+        // Allowed scheme is accepted.
+        Assertions.assertEquals(
+                Optional.empty(),
+                DefaultValidator.validateJarURI(
+                        "https://example.com/path/to/job.jar";, defaultAllowed, 
true));
+        // Null jarURI is allowed (e.g. for entryClass-only jobs).
+        Assertions.assertEquals(
+                Optional.empty(), DefaultValidator.validateJarURI(null, 
defaultAllowed, true));
+
+        // Disallowed schemes are rejected.
+        for (String disallowed :
+                List.of(
+                        "http://example.com/job.jar";,
+                        
"file:///var/run/secrets/kubernetes.io/serviceaccount/token",
+                        "s3://my-bucket/job.jar",
+                        "local:///tmp/sample.jar")) {
+            var error = DefaultValidator.validateJarURI(disallowed, 
defaultAllowed, true);
+            assertTrue(error.isPresent(), "expected error for " + disallowed);
+            assertTrue(
+                    error.get().startsWith("jarURI scheme '"),
+                    "unexpected message for " + disallowed + ": " + 
error.get());
+        }
+
+        // Missing scheme is rejected.
+        var noScheme = DefaultValidator.validateJarURI("/no/scheme/job.jar", 
defaultAllowed, true);
+        assertTrue(noScheme.isPresent());
+        assertTrue(noScheme.get().startsWith("jarURI must include a scheme"));
+
+        // Malformed URI is rejected.
+        var malformed = DefaultValidator.validateJarURI("ht tp://bad uri", 
defaultAllowed, true);
+        assertTrue(malformed.isPresent());
+        assertTrue(malformed.get().startsWith("jarURI is not a valid URI"));
+
+        // Operators can extend the allowlist (case-insensitive matching).
+        Assertions.assertEquals(
+                Optional.empty(),
+                DefaultValidator.validateJarURI(
+                        "S3://my-bucket/job.jar", List.of("https", "s3"), 
true));
+    }
+
+    @Test
+    public void testJarUriHostValidation() {
+        var defaultAllowed = List.of("https");
+
+        // Cloud-metadata link-local, loopback, site-local and wildcard 
addresses must be rejected.
+        for (String restricted :
+                List.of(
+                        
"https://169.254.169.254/latest/meta-data/iam/security-credentials/";,
+                        "https://127.0.0.1/job.jar";,
+                        "https://localhost/job.jar";,
+                        "https://10.0.0.1/job.jar";,
+                        "https://192.168.1.1/job.jar";)) {
+            var error = DefaultValidator.validateJarURI(restricted, 
defaultAllowed, true);
+            assertTrue(error.isPresent(), "expected error for " + restricted);
+            assertTrue(
+                    error.get().contains("resolves to a restricted address"),
+                    "unexpected message for " + restricted + ": " + 
error.get());
+        }
+
+        // Disabling the restricted-host check allows loopback.
+        Assertions.assertEquals(
+                Optional.empty(),
+                DefaultValidator.validateJarURI(
+                        "https://127.0.0.1/job.jar";, defaultAllowed, false));
+    }
+
+    @Test
+    public void testSessionJobJarUriValidationUsesOperatorConfig() {
+        // Operator-level config sets a custom allowlist; the CR cannot 
override it.
+        var operatorConf = new Configuration();
+        operatorConf.set(
+                KubernetesOperatorConfigOptions.JAR_URI_ALLOWED_SCHEMES, 
List.of("https", "s3"));
+        var customValidator = new DefaultValidator(new 
FlinkConfigManager(operatorConf));
+
+        // s3 is allowed by operator config.
+        var s3Job = TestUtils.buildSessionJob();
+        s3Job.getSpec().getJob().setJarURI("s3://my-bucket/job.jar");
+        Assertions.assertEquals(
+                Optional.empty(), customValidator.validateSessionJob(s3Job, 
Optional.empty()));
+
+        // file:// is still rejected.
+        var fileJob = TestUtils.buildSessionJob();
+        fileJob.getSpec().getJob().setJarURI("file:///etc/passwd");
+        var fileError = customValidator.validateSessionJob(fileJob, 
Optional.empty());
+        assertTrue(fileError.isPresent());
+        assertTrue(fileError.get().startsWith("jarURI scheme 'file'"));
+
+        // A CR-supplied override of the allowlist is ignored — the 
operator-level config wins.
+        var overrideJob = TestUtils.buildSessionJob();
+        overrideJob
+                .getSpec()
+                .setFlinkConfiguration(
+                        Map.of(
+                                
KubernetesOperatorConfigOptions.JAR_URI_ALLOWED_SCHEMES.key(),
+                                "https;file"));
+        overrideJob.getSpec().getJob().setJarURI("file:///etc/passwd");
+        var overrideError = customValidator.validateSessionJob(overrideJob, 
Optional.empty());
+        assertTrue(overrideError.isPresent());
+        assertTrue(overrideError.get().startsWith("jarURI scheme 'file'"));
+
+        // Default validator (https only) rejects the default 
https-but-link-local URI for sanity.
+        var loopbackJob = TestUtils.buildSessionJob();
+        
loopbackJob.getSpec().getJob().setJarURI("https://169.254.169.254/job.jar";);
+        var loopbackError = validator.validateSessionJob(loopbackJob, 
Optional.empty());
+        assertTrue(loopbackError.isPresent());
+        assertTrue(loopbackError.get().contains("resolves to a restricted 
address"));
+    }
+
     @ParameterizedTest
     @EnumSource(UpgradeMode.class)
     public void testFlinkVersionChangeValidation(UpgradeMode toUpgradeMode) {

Reply via email to