This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 8c9c6e92928 [fix][fn] Configure pulsar admin for TLS (#20533)
8c9c6e92928 is described below

commit 8c9c6e929287e33147c4d8d7217a0aeaef02373d
Author: Michael Marshall <mmarsh...@apache.org>
AuthorDate: Wed Jun 7 17:31:10 2023 -0500

    [fix][fn] Configure pulsar admin for TLS (#20533)
    
    ### Motivation
    
    This PR is a combination of https://github.com/apache/pulsar/pull/20482 and 
https://github.com/apache/pulsar/pull/20513 because cherry picking those PRs 
produced too many conflicts.
    
    ### Modifications
    
    * Made the same addition as the source PRs, though the code has changed a 
bit, so I had to make a few extra changes.
    ### Documentation
    
    - [x] `doc-not-needed`
---
 .../runtime/kubernetes/KubernetesRuntime.java      | 110 +++++++++------------
 .../runtime/kubernetes/KubernetesRuntimeTest.java  |  19 ++++
 2 files changed, 67 insertions(+), 62 deletions(-)

diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index a40feb09253..2917cc4d8c0 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -866,80 +866,66 @@ public class KubernetesRuntime implements Runtime {
     }
 
     private List<String> getDownloadCommand(String tenant, String namespace, 
String name, String userCodeFilePath) {
+        List<String> result = new ArrayList<>();
+        result.add(pulsarRootDir + configAdminCLI);
 
         // add auth plugin and parameters if necessary
         if (authenticationEnabled && authConfig != null) {
-            if (isNotBlank(authConfig.getClientAuthenticationPlugin())
-                    && 
isNotBlank(authConfig.getClientAuthenticationParameters())
-                    && instanceConfig.getFunctionAuthenticationSpec() != null) 
{
-                return Arrays.asList(
-                        pulsarRootDir + configAdminCLI,
-                        "--auth-plugin",
-                        authConfig.getClientAuthenticationPlugin(),
-                        "--auth-params",
-                        authConfig.getClientAuthenticationParameters(),
-                        "--admin-url",
-                        pulsarAdminUrl,
-                        "functions",
-                        "download",
-                        "--tenant",
-                        tenant,
-                        "--namespace",
-                        namespace,
-                        "--name",
-                        name,
-                        "--destination-file",
-                        userCodeFilePath);
-            }
+            result.addAll(getAuthenticationParams(authConfig));
         }
-
-        return Arrays.asList(
-                pulsarRootDir + configAdminCLI,
-                "--admin-url",
-                pulsarAdminUrl,
-                "functions",
-                "download",
-                "--tenant",
-                tenant,
-                "--namespace",
-                namespace,
-                "--name",
-                name,
-                "--destination-file",
-                userCodeFilePath);
+        result.add("--admin-url");
+        result.add(pulsarAdminUrl);
+        result.add("functions");
+        result.add("download");
+        result.add("--tenant");
+        result.add(tenant);
+        result.add("--namespace");
+        result.add(namespace);
+        result.add("--name");
+        result.add(name);
+        result.add("--destination-file");
+        result.add(userCodeFilePath);
+        return result;
     }
 
     private List<String> getPackageDownloadCommand(String packageName, String 
userCodeFilePath) {
+        List<String> result = new ArrayList<>();
+        result.add(pulsarRootDir + configAdminCLI);
         // add auth plugin and parameters if necessary
         if (authenticationEnabled && authConfig != null) {
-            if (isNotBlank(authConfig.getClientAuthenticationPlugin())
+            result.addAll(getAuthenticationParams(authConfig));
+        }
+        result.add("--admin-url");
+        result.add(pulsarAdminUrl);
+        result.add("packages");
+        result.add("download");
+        result.add(packageName);
+        result.add("--path");
+        result.add(userCodeFilePath);
+        return result;
+    }
+
+    private List<String> getAuthenticationParams(AuthenticationConfig 
authConfig) {
+        List<String> result = new ArrayList<>();
+        if (isNotBlank(authConfig.getClientAuthenticationPlugin())
                 && isNotBlank(authConfig.getClientAuthenticationParameters())
                 && instanceConfig.getFunctionAuthenticationSpec() != null) {
-                return Arrays.asList(
-                    pulsarRootDir + configAdminCLI,
-                    "--auth-plugin",
-                    authConfig.getClientAuthenticationPlugin(),
-                    "--auth-params",
-                    authConfig.getClientAuthenticationParameters(),
-                    "--admin-url",
-                    pulsarAdminUrl,
-                    "packages",
-                    "download",
-                    packageName,
-                    "--path",
-                    userCodeFilePath);
-            }
+            result.add("--auth-plugin");
+            result.add(authConfig.getClientAuthenticationPlugin());
+            result.add("--auth-params");
+            result.add(authConfig.getClientAuthenticationParameters());
         }
-
-        return Arrays.asList(
-            pulsarRootDir + configAdminCLI,
-            "--admin-url",
-            pulsarAdminUrl,
-            "packages",
-            "download",
-            packageName,
-            "--path",
-            userCodeFilePath);
+        if (authConfig.isTlsAllowInsecureConnection()) {
+            result.add("--tls-allow-insecure");
+        }
+        if (authConfig.isTlsHostnameVerificationEnable()) {
+            result.add("--tls-enable-hostname-verification");
+        }
+        if (isNotBlank(authConfig.getTlsTrustCertsFilePath())) {
+            result.add("--tls-trust-cert-path");
+            result.add(authConfig.getTlsTrustCertsFilePath());
+        }
+        return result;
     }
 
     private static String setShardIdEnvironmentVariableCommand() {
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 7ede3577ad6..f216de19a59 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -1151,6 +1151,25 @@ public class KubernetesRuntimeTest {
         String containerCommand = 
spec.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
         assertTrue(containerCommand.contains(expectedDownloadCommand));
     }
+    @Test
+    public void testCustomKubernetesDownloadWithTLSConfig() throws Exception {
+        String downloadDirectory = "download/pulsar_functions";
+        InstanceConfig config = 
createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+        
config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, 
false, (fb) -> {
+            return fb.setPackageUrl("function://public/default/test@v1");
+        }));
+
+        factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, 
Optional.empty(), downloadDirectory);
+        factory.setAuthenticationEnabled(true);
+        
factory.setAuthConfig(AuthenticationConfig.builder().tlsHostnameVerificationEnable(true).build());
+
+        KubernetesRuntime container = factory.createContainer(config, 
userJarFile, userJarFile, 30l);
+        V1StatefulSet spec = container.createStatefulSet();
+        String expectedDownloadCommand = "pulsar-admin 
--tls-enable-hostname-verification --admin-url http://localhost:8080 packages 
download "
+                + "function://public/default/test@v1 --path " + 
factory.getDownloadDirectory() + "/" + userJarFile;
+        String containerCommand = 
spec.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
+        assertTrue(containerCommand.contains(expectedDownloadCommand));
+    }
 
     @Test
     public void shouldUseConfiguredMetricsPort() throws Exception {

Reply via email to