This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 33ab2e0 Rejigger SecretsProviderConfigurator interface to make checking more generic (#2933) 33ab2e0 is described below commit 33ab2e070d964084097b5242b5de6867f843408c Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Mon Nov 5 17:08:34 2018 -0800 Rejigger SecretsProviderConfigurator interface to make checking more generic (#2933) ### Motivation A SecretsConfigurator might want to do more checks than just check the secrets. This pr makes the configurator interface more generic --- .../runtime/KubernetesRuntimeFactory.java | 10 +-- .../pulsar/functions/runtime/RuntimeUtils.java | 12 ++-- .../functions/runtime/KubernetesRuntimeTest.java | 84 ++++++++++++++-------- .../functions/runtime/ProcessRuntimeTest.java | 4 +- .../KubernetesSecretsProviderConfigurator.java | 36 ++++++---- .../SecretsProviderConfigurator.java | 4 +- .../KubernetesSecretsProviderConfiguratorTest.java | 13 ++-- 7 files changed, 100 insertions(+), 63 deletions(-) diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java index 18d02e5..78eea16 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java @@ -20,8 +20,6 @@ package org.apache.pulsar.functions.runtime; import com.google.common.annotations.VisibleForTesting; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import io.kubernetes.client.ApiClient; import io.kubernetes.client.Configuration; import io.kubernetes.client.apis.AppsV1Api; @@ -40,7 +38,6 @@ import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import java.lang.reflect.Field; -import java.lang.reflect.Type; import java.util.Map; import java.util.Timer; import java.util.TimerTask; @@ -204,12 +201,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { @Override public void doAdmissionChecks(Function.FunctionDetails functionDetails) { KubernetesRuntime.doChecks(functionDetails); - if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) { - Type type = new TypeToken<Map<String, Object>>() { - }.getType(); - Map<String, Object> secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type); - secretsProviderConfigurator.validateSecretMap(secretsMap); - } + secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient, kubernetesInfo.getJobNamespace(), functionDetails); } @VisibleForTesting diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java index 2983ec6..e84ec20 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java @@ -169,11 +169,13 @@ class RuntimeUtils { args.add("--expected_healthcheck_interval"); args.add(String.valueOf(expectedHealthCheckInterval)); - args.add("--secrets_provider"); - args.add(secretsProviderClassName); - if (!StringUtils.isEmpty(secretsProviderConfig)) { - args.add("--secrets_provider_config"); - args.add("'" + secretsProviderConfig + "'"); + if (!StringUtils.isEmpty(secretsProviderClassName)) { + args.add("--secrets_provider"); + args.add(secretsProviderClassName); + if (!StringUtils.isEmpty(secretsProviderConfig)) { + args.add("--secrets_provider_config"); + args.add("'" + secretsProviderConfig + "'"); + } } return args; } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java index cf8c1c9..2e00983 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java @@ -20,6 +20,8 @@ package org.apache.pulsar.functions.runtime; import com.google.protobuf.util.JsonFormat; +import io.kubernetes.client.apis.AppsV1Api; +import io.kubernetes.client.apis.CoreV1Api; import io.kubernetes.client.models.V1PodSpec; import org.apache.commons.lang.StringUtils; import org.apache.pulsar.functions.instance.InstanceConfig; @@ -27,7 +29,6 @@ import org.apache.pulsar.functions.proto.Function; import org.apache.pulsar.functions.proto.Function.ConsumerSpec; import org.apache.pulsar.functions.proto.Function.FunctionDetails; import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider; -import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator; import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator; import org.apache.pulsar.functions.utils.FunctionDetailsUtils; import org.testng.annotations.AfterMethod; @@ -41,7 +42,6 @@ import java.util.Map; import static org.powermock.api.mockito.PowerMockito.doNothing; import static org.powermock.api.mockito.PowerMockito.spy; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.expectThrows; /** * Unit test of {@link ThreadRuntime}. @@ -68,10 +68,14 @@ public class KubernetesRuntimeTest { @Override public String getSecretsProviderClassName(FunctionDetails functionDetails) { - if (functionDetails.getRuntime() == FunctionDetails.Runtime.JAVA) { - return ClearTextSecretsProvider.class.getName(); + if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) { + if (functionDetails.getRuntime() == FunctionDetails.Runtime.JAVA) { + return ClearTextSecretsProvider.class.getName(); + } else { + return "secretsprovider.ClearTextSecretsProvider"; + } } else { - return "secretsprovider.ClearTextSecretsProvider"; + return null; } } @@ -98,7 +102,7 @@ public class KubernetesRuntimeTest { } @Override - public void validateSecretMap(Map<String, Object> secretMap) { + public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, FunctionDetails functionDetails) { } } @@ -155,7 +159,7 @@ public class KubernetesRuntimeTest { return factory; } - FunctionDetails createFunctionDetails(FunctionDetails.Runtime runtime) { + FunctionDetails createFunctionDetails(FunctionDetails.Runtime runtime, boolean addSecrets) { FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(); functionDetailsBuilder.setRuntime(runtime); functionDetailsBuilder.setTenant(TEST_TENANT); @@ -174,13 +178,16 @@ public class KubernetesRuntimeTest { .putAllInputSpecs(topicsToSchema) .setClassName("org.pulsar.pulsar.TestSource") .setTypeClassName(String.class.getName())); + if (addSecrets) { + functionDetailsBuilder.setSecretsMap("SomeMap"); + } return functionDetailsBuilder.build(); } - InstanceConfig createJavaInstanceConfig(FunctionDetails.Runtime runtime) { + InstanceConfig createJavaInstanceConfig(FunctionDetails.Runtime runtime, boolean addSecrets) { InstanceConfig config = new InstanceConfig(); - config.setFunctionDetails(createFunctionDetails(runtime)); + config.setFunctionDetails(createFunctionDetails(runtime, addSecrets)); config.setFunctionId(java.util.UUID.randomUUID().toString()); config.setFunctionVersion("1.0"); config.setInstanceId(0); @@ -191,26 +198,35 @@ public class KubernetesRuntimeTest { @Test public void testJavaConstructor() throws Exception { - InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA); + InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false); + + factory = createKubernetesRuntimeFactory(null); + + verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false); + } + + @Test + public void testJavaConstructorWithSecrets() throws Exception { + InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, true); factory = createKubernetesRuntimeFactory(null); - verifyJavaInstance(config, pulsarRootDir + "/instances/deps"); + verifyJavaInstance(config, pulsarRootDir + "/instances/deps", true); } @Test public void testJavaConstructorWithDeps() throws Exception { - InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA); + InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false); String extraDepsDir = "/path/to/deps/dir"; factory = createKubernetesRuntimeFactory(extraDepsDir); - verifyJavaInstance(config, extraDepsDir); + verifyJavaInstance(config, extraDepsDir, false); } - private void verifyJavaInstance(InstanceConfig config, String depsDir) throws Exception { + private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean secretsAttached) throws Exception { KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l); List<String> args = container.getProcessArgs(); @@ -221,12 +237,15 @@ public class KubernetesRuntimeTest { if (null != depsDir) { extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir; classpath = classpath + ":" + depsDir + "/*"; - totalArgs = 33; + totalArgs = 29; portArg = 24; } else { extraDepsEnv = ""; portArg = 23; - totalArgs = 32; + totalArgs = 28; + } + if (secretsAttached) { + totalArgs += 4; } assertEquals(args.size(), totalArgs, @@ -246,33 +265,35 @@ public class KubernetesRuntimeTest { + "' --pulsar_serviceurl " + pulsarServiceUrl + " --max_buffered_tuples 1024 --port " + args.get(portArg) + " --state_storage_serviceurl " + stateStorageServiceUrl - + " --expected_healthcheck_interval -1" - + " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider" - + " --secrets_provider_config '{\"Somevalue\":\"myvalue\"}'"; + + " --expected_healthcheck_interval -1"; + if (secretsAttached) { + expectedArgs += " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider" + + " --secrets_provider_config '{\"Somevalue\":\"myvalue\"}'"; + } assertEquals(String.join(" ", args), expectedArgs); } @Test public void testPythonConstructor() throws Exception { - InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.PYTHON); + InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.PYTHON, false); factory = createKubernetesRuntimeFactory(null); - verifyPythonInstance(config, pulsarRootDir + "/instances/deps"); + verifyPythonInstance(config, pulsarRootDir + "/instances/deps", false); } @Test public void testPythonConstructorWithDeps() throws Exception { - InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.PYTHON); + InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.PYTHON, false); String extraDepsDir = "/path/to/deps/dir"; factory = createKubernetesRuntimeFactory(extraDepsDir); - verifyPythonInstance(config, extraDepsDir); + verifyPythonInstance(config, extraDepsDir, false); } - private void verifyPythonInstance(InstanceConfig config, String extraDepsDir) throws Exception { + private void verifyPythonInstance(InstanceConfig config, String extraDepsDir, boolean secretsAttached) throws Exception { KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l); List<String> args = container.getProcessArgs(); @@ -281,16 +302,19 @@ public class KubernetesRuntimeTest { String pythonPath; int configArg; if (null == extraDepsDir) { - totalArgs = 36; + totalArgs = 32; portArg = 29; configArg = 9; pythonPath = ""; } else { - totalArgs = 37; + totalArgs = 33; portArg = 30; configArg = 10; pythonPath = "PYTHONPATH=${PYTHONPATH}:" + extraDepsDir + " "; } + if (secretsAttached) { + totalArgs += 4; + } assertEquals(args.size(), totalArgs, "Actual args : " + StringUtils.join(args, " ")); @@ -308,9 +332,11 @@ public class KubernetesRuntimeTest { + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails()) + "' --pulsar_serviceurl " + pulsarServiceUrl + " --max_buffered_tuples 1024 --port " + args.get(portArg) - + " --expected_healthcheck_interval -1" - + " --secrets_provider secretsprovider.ClearTextSecretsProvider" - + " --secrets_provider_config '{\"Somevalue\":\"myvalue\"}'"; + + " --expected_healthcheck_interval -1"; + if (secretsAttached) { + expectedArgs += " --secrets_provider secretsprovider.ClearTextSecretsProvider" + + " --secrets_provider_config '{\"Somevalue\":\"myvalue\"}'"; + } assertEquals(String.join(" ", args), expectedArgs); } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java index b88686a..63875fd 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java @@ -35,6 +35,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import io.kubernetes.client.apis.AppsV1Api; +import io.kubernetes.client.apis.CoreV1Api; import io.kubernetes.client.models.V1PodSpec; import org.apache.pulsar.functions.instance.InstanceConfig; import org.apache.pulsar.functions.proto.Function; @@ -88,7 +90,7 @@ public class ProcessRuntimeTest { } @Override - public void validateSecretMap(Map<String, Object> secretMap) { + public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, FunctionDetails functionDetails) { } } diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java index f35a35c..27b8309 100644 --- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java +++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java @@ -20,6 +20,8 @@ package org.apache.pulsar.functions.secretsproviderconfigurator; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; +import io.kubernetes.client.apis.AppsV1Api; +import io.kubernetes.client.apis.CoreV1Api; import io.kubernetes.client.models.*; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.functions.proto.Function; @@ -99,21 +101,27 @@ public class KubernetesSecretsProviderConfigurator implements SecretsProviderCon // The secret object should be of type Map<String, String> and it should contain "id" and "key" @Override - public void validateSecretMap(Map<String, Object> secretMap) { - for (Object object : secretMap.values()) { - if (object instanceof Map) { - Map<String, String> kubernetesSecret = (Map<String, String>) object; - if (kubernetesSecret.size() < 2) { - throw new IllegalArgumentException("Kubernetes Secret should contain id and key"); - } - if (!kubernetesSecret.containsKey(ID_KEY)) { - throw new IllegalArgumentException("Kubernetes Secret should contain id information"); - } - if (!kubernetesSecret.containsKey(KEY_KEY)) { - throw new IllegalArgumentException("Kubernetes Secret should contain key information"); + public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, Function.FunctionDetails functionDetails) { + if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) { + Type type = new TypeToken<Map<String, Object>>() { + }.getType(); + Map<String, Object> secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type); + + for (Object object : secretsMap.values()) { + if (object instanceof Map) { + Map<String, String> kubernetesSecret = (Map<String, String>) object; + if (kubernetesSecret.size() < 2) { + throw new IllegalArgumentException("Kubernetes Secret should contain id and key"); + } + if (!kubernetesSecret.containsKey(ID_KEY)) { + throw new IllegalArgumentException("Kubernetes Secret should contain id information"); + } + if (!kubernetesSecret.containsKey(KEY_KEY)) { + throw new IllegalArgumentException("Kubernetes Secret should contain key information"); + } + } else { + throw new IllegalArgumentException("Kubernetes Secret should be a Map containing id/key pairs"); } - } else { - throw new IllegalArgumentException("Kubernetes Secret should be a Map containing id/key pairs"); } } } diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java index adbd716..c31686f 100644 --- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java +++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.functions.secretsproviderconfigurator; +import io.kubernetes.client.apis.AppsV1Api; +import io.kubernetes.client.apis.CoreV1Api; import io.kubernetes.client.models.V1PodSpec; import org.apache.pulsar.functions.proto.Function; @@ -67,6 +69,6 @@ public interface SecretsProviderConfigurator { /** * Do config checks to see whether the secrets provided are conforming */ - default void validateSecretMap(Map<String, Object> secretMap) {} + default void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, Function.FunctionDetails functionDetails) {} } \ No newline at end of file diff --git a/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfiguratorTest.java b/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfiguratorTest.java index b5cbd6a..fc764cf 100644 --- a/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfiguratorTest.java +++ b/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfiguratorTest.java @@ -19,13 +19,15 @@ package org.apache.pulsar.functions.secretsproviderconfigurator; +import com.google.gson.Gson; +import org.apache.pulsar.functions.proto.Function; import org.testng.Assert; import org.testng.annotations.Test; import java.util.HashMap; /** - * Unit test of {@link Exceptions}. + * Unit test of {@link KubernetesSecretsProviderConfigurator}. */ public class KubernetesSecretsProviderConfiguratorTest { @@ -35,7 +37,8 @@ public class KubernetesSecretsProviderConfiguratorTest { try { HashMap<String, Object> map = new HashMap<String, Object>(); map.put("secretname", "randomsecret"); - provider.validateSecretMap(map); + Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setSecretsMap(new Gson().toJson(map)).build(); + provider.doAdmissionChecks(null, null, null, functionDetails); Assert.fail("Non conforming secret object should not validate"); } catch (Exception e) { } @@ -44,7 +47,8 @@ public class KubernetesSecretsProviderConfiguratorTest { HashMap<String, String> map1 = new HashMap<String, String>(); map1.put("secretname", "secretvalue"); map.put("secretname", map1); - provider.validateSecretMap(map); + Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setSecretsMap(new Gson().toJson(map)).build(); + provider.doAdmissionChecks(null, null, null, functionDetails); Assert.fail("Non conforming secret object should not validate"); } catch (Exception e) { } @@ -54,7 +58,8 @@ public class KubernetesSecretsProviderConfiguratorTest { map1.put("path", "secretvalue"); map1.put("key", "secretvalue"); map.put("secretname", map1); - provider.validateSecretMap(map); + Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setSecretsMap(new Gson().toJson(map)).build(); + provider.doAdmissionChecks(null, null, null, functionDetails); } catch (Exception e) { Assert.fail("Conforming secret object should validate"); }