This is an automated email from the ASF dual-hosted git repository. mmarshall pushed a commit to branch branch-2.11 in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.11 by this push: new 8c9c6e92928 [fix][fn] Configure pulsar admin for TLS (#20533) 8c9c6e92928 is described below commit 8c9c6e929287e33147c4d8d7217a0aeaef02373d Author: Michael Marshall <mmarsh...@apache.org> AuthorDate: Wed Jun 7 17:31:10 2023 -0500 [fix][fn] Configure pulsar admin for TLS (#20533) ### Motivation This PR is a combination of https://github.com/apache/pulsar/pull/20482 and https://github.com/apache/pulsar/pull/20513 because cherry picking those PRs produced too many conflicts. ### Modifications * Made the same addition as the source PRs, though the code has changed a bit, so I had to make a few extra changes. ### Documentation - [x] `doc-not-needed` --- .../runtime/kubernetes/KubernetesRuntime.java | 110 +++++++++------------ .../runtime/kubernetes/KubernetesRuntimeTest.java | 19 ++++ 2 files changed, 67 insertions(+), 62 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java index a40feb09253..2917cc4d8c0 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java @@ -866,80 +866,66 @@ public class KubernetesRuntime implements Runtime { } private List<String> getDownloadCommand(String tenant, String namespace, String name, String userCodeFilePath) { + List<String> result = new ArrayList<>(); + result.add(pulsarRootDir + configAdminCLI); // add auth plugin and parameters if necessary if (authenticationEnabled && authConfig != null) { - if (isNotBlank(authConfig.getClientAuthenticationPlugin()) - && isNotBlank(authConfig.getClientAuthenticationParameters()) - && instanceConfig.getFunctionAuthenticationSpec() != null) { - return Arrays.asList( - pulsarRootDir + configAdminCLI, - "--auth-plugin", - authConfig.getClientAuthenticationPlugin(), - "--auth-params", - authConfig.getClientAuthenticationParameters(), - "--admin-url", - pulsarAdminUrl, - "functions", - "download", - "--tenant", - tenant, - "--namespace", - namespace, - "--name", - name, - "--destination-file", - userCodeFilePath); - } + result.addAll(getAuthenticationParams(authConfig)); } - - return Arrays.asList( - pulsarRootDir + configAdminCLI, - "--admin-url", - pulsarAdminUrl, - "functions", - "download", - "--tenant", - tenant, - "--namespace", - namespace, - "--name", - name, - "--destination-file", - userCodeFilePath); + result.add("--admin-url"); + result.add(pulsarAdminUrl); + result.add("functions"); + result.add("download"); + result.add("--tenant"); + result.add(tenant); + result.add("--namespace"); + result.add(namespace); + result.add("--name"); + result.add(name); + result.add("--destination-file"); + result.add(userCodeFilePath); + return result; } private List<String> getPackageDownloadCommand(String packageName, String userCodeFilePath) { + List<String> result = new ArrayList<>(); + result.add(pulsarRootDir + configAdminCLI); // add auth plugin and parameters if necessary if (authenticationEnabled && authConfig != null) { - if (isNotBlank(authConfig.getClientAuthenticationPlugin()) + result.addAll(getAuthenticationParams(authConfig)); + } + result.add("--admin-url"); + result.add(pulsarAdminUrl); + result.add("packages"); + result.add("download"); + result.add(packageName); + result.add("--path"); + result.add(userCodeFilePath); + return result; + } + + private List<String> getAuthenticationParams(AuthenticationConfig authConfig) { + List<String> result = new ArrayList<>(); + if (isNotBlank(authConfig.getClientAuthenticationPlugin()) && isNotBlank(authConfig.getClientAuthenticationParameters()) && instanceConfig.getFunctionAuthenticationSpec() != null) { - return Arrays.asList( - pulsarRootDir + configAdminCLI, - "--auth-plugin", - authConfig.getClientAuthenticationPlugin(), - "--auth-params", - authConfig.getClientAuthenticationParameters(), - "--admin-url", - pulsarAdminUrl, - "packages", - "download", - packageName, - "--path", - userCodeFilePath); - } + result.add("--auth-plugin"); + result.add(authConfig.getClientAuthenticationPlugin()); + result.add("--auth-params"); + result.add(authConfig.getClientAuthenticationParameters()); } - - return Arrays.asList( - pulsarRootDir + configAdminCLI, - "--admin-url", - pulsarAdminUrl, - "packages", - "download", - packageName, - "--path", - userCodeFilePath); + if (authConfig.isTlsAllowInsecureConnection()) { + result.add("--tls-allow-insecure"); + } + if (authConfig.isTlsHostnameVerificationEnable()) { + result.add("--tls-enable-hostname-verification"); + } + if (isNotBlank(authConfig.getTlsTrustCertsFilePath())) { + result.add("--tls-trust-cert-path"); + result.add(authConfig.getTlsTrustCertsFilePath()); + } + return result; } private static String setShardIdEnvironmentVariableCommand() { diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java index 7ede3577ad6..f216de19a59 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java @@ -1151,6 +1151,25 @@ public class KubernetesRuntimeTest { String containerCommand = spec.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand().get(2); assertTrue(containerCommand.contains(expectedDownloadCommand)); } + @Test + public void testCustomKubernetesDownloadWithTLSConfig() throws Exception { + String downloadDirectory = "download/pulsar_functions"; + InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false); + config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, false, (fb) -> { + return fb.setPackageUrl("function://public/default/test@v1"); + })); + + factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(), downloadDirectory); + factory.setAuthenticationEnabled(true); + factory.setAuthConfig(AuthenticationConfig.builder().tlsHostnameVerificationEnable(true).build()); + + KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l); + V1StatefulSet spec = container.createStatefulSet(); + String expectedDownloadCommand = "pulsar-admin --tls-enable-hostname-verification --admin-url http://localhost:8080 packages download " + + "function://public/default/test@v1 --path " + factory.getDownloadDirectory() + "/" + userJarFile; + String containerCommand = spec.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand().get(2); + assertTrue(containerCommand.contains(expectedDownloadCommand)); + } @Test public void shouldUseConfiguredMetricsPort() throws Exception {