This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 6a2db62 Fix: Exception when switch cluster from auth enabled to auth disabled (#4069) 6a2db62 is described below commit 6a2db62f20853a4441d7d6c79a20524e6b8b6342 Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com> AuthorDate: Wed Apr 17 23:11:38 2019 -0700 Fix: Exception when switch cluster from auth enabled to auth disabled (#4069) * fix bug in source and sink cli update * fix import * fix logic * fix bug in auth when spec is not null but auth is turned off * clean up debug * cleanup --- .../main/java/org/apache/pulsar/PulsarStandaloneStarter.java | 8 +++++++- .../apache/pulsar/functions/runtime/KubernetesRuntime.java | 9 ++++++--- .../pulsar/functions/runtime/KubernetesRuntimeFactory.java | 10 +++++++--- .../org/apache/pulsar/functions/runtime/LocalRunner.java | 2 +- .../pulsar/functions/runtime/ProcessRuntimeFactory.java | 7 +++++-- .../functions/runtime/KubernetesRuntimeFactoryTest.java | 2 +- .../pulsar/functions/runtime/KubernetesRuntimeTest.java | 2 +- .../apache/pulsar/functions/runtime/ProcessRuntimeTest.java | 2 +- .../org/apache/pulsar/functions/worker/FunctionActioner.java | 12 +++++++----- .../pulsar/functions/worker/FunctionRuntimeManager.java | 6 ++++-- 10 files changed, 40 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java index c9aefa1..feef094 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java @@ -116,6 +116,12 @@ public class PulsarStandaloneStarter extends PulsarStandalone { public static void main(String args[]) throws Exception { // Start standalone PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args); - standalone.start(); + try { + standalone.start(); + } catch (Throwable th) { + log.error("Failed to start pulsar service.", th); + Runtime.getRuntime().exit(1); + } + } } diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java index 6dbb0d3..c3fe161 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java @@ -116,6 +116,7 @@ public class KubernetesRuntime implements Runtime { ) ); private static final long GRPC_TIMEOUT_SECS = 5; + private final boolean authenticationEnabled; // The thread that invokes the function @Getter @@ -160,7 +161,8 @@ public class KubernetesRuntime implements Runtime { SecretsProviderConfigurator secretsProviderConfigurator, Integer expectedMetricsCollectionInterval, int percentMemoryPadding, - KubernetesFunctionAuthProvider functionAuthDataCacheProvider) throws Exception { + KubernetesFunctionAuthProvider functionAuthDataCacheProvider, + boolean authenticationEnabled) throws Exception { this.appsClient = appsClient; this.coreClient = coreClient; this.instanceConfig = instanceConfig; @@ -174,6 +176,7 @@ public class KubernetesRuntime implements Runtime { this.pulsarAdminUrl = pulsarAdminUrl; this.secretsProviderConfigurator = secretsProviderConfigurator; this.percentMemoryPadding = percentMemoryPadding; + this.authenticationEnabled = authenticationEnabled; String logConfigFile = null; String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails()); String secretsProviderConfig = null; @@ -439,7 +442,7 @@ public class KubernetesRuntime implements Runtime { private void submitStatefulSet() throws Exception { final V1StatefulSet statefulSet = createStatefulSet(); // Configure function authentication if needed - if (instanceConfig.getFunctionAuthenticationSpec() != null) { + if (authenticationEnabled && instanceConfig.getFunctionAuthenticationSpec() != null) { functionAuthDataCacheProvider.configureAuthDataStatefulSet( statefulSet, getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec())); } @@ -755,7 +758,7 @@ public class KubernetesRuntime implements Runtime { private List<String> getDownloadCommand(String bkPath, String userCodeFilePath) { // add auth plugin and parameters if necessary - if (authConfig != null) { + if (authenticationEnabled && authConfig != null) { if (isNotBlank(authConfig.getClientAuthenticationPlugin()) && isNotBlank(authConfig.getClientAuthenticationParameters())) { return Arrays.asList( diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java index 7fa67ce..3aed50e 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java @@ -92,6 +92,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { private AppsV1Api appsClient; private CoreV1Api coreClient; private Resources functionInstanceMinResources; + private final boolean authenticationEnabled; @VisibleForTesting public KubernetesRuntimeFactory(String k8Uri, @@ -114,7 +115,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { String changeConfigMap, String changeConfigMapNamespace, Resources functionInstanceMinResources, - SecretsProviderConfigurator secretsProviderConfigurator) { + SecretsProviderConfigurator secretsProviderConfigurator, + boolean authenticationEnabled) { this.kubernetesInfo = new KubernetesInfo(); this.kubernetesInfo.setK8Uri(k8Uri); if (!isEmpty(jobNamespace)) { @@ -165,6 +167,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval; this.secretsProviderConfigurator = secretsProviderConfigurator; this.functionInstanceMinResources = functionInstanceMinResources; + this.authenticationEnabled = authenticationEnabled; try { setupClient(); } catch (Exception e) { @@ -195,7 +198,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { } // adjust the auth config to support auth - if (instanceConfig.getFunctionAuthenticationSpec() != null) { + if (authenticationEnabled && instanceConfig.getFunctionAuthenticationSpec() != null) { getAuthProvider().configureAuthenticationConfig(authConfig, getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec())); } @@ -223,7 +226,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory { secretsProviderConfigurator, expectedMetricsCollectionInterval, this.kubernetesInfo.getPercentMemoryPadding(), - getAuthProvider()); + getAuthProvider(), + authenticationEnabled); } @Override diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java index 966f299..37efbee 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java @@ -179,7 +179,7 @@ public class LocalRunner { null, /* python instance file */ null, /* log directory */ null, /* extra dependencies dir */ - new DefaultSecretsProviderConfigurator())) { + new DefaultSecretsProviderConfigurator(), false)) { List<RuntimeSpawner> spawners = new LinkedList<>(); for (int i = 0; i < parallelism; ++i) { InstanceConfig instanceConfig = new InstanceConfig(); diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java index 7090967..6976df5 100644 --- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java +++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java @@ -39,6 +39,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory { private final String pulsarServiceUrl; private final String stateStorageServiceUrl; + private final boolean authenticationEnabled; private AuthenticationConfig authConfig; private SecretsProviderConfigurator secretsProviderConfigurator; private String javaInstanceJarFile; @@ -54,7 +55,8 @@ public class ProcessRuntimeFactory implements RuntimeFactory { String pythonInstanceFile, String logDirectory, String extraDependenciesDir, - SecretsProviderConfigurator secretsProviderConfigurator) { + SecretsProviderConfigurator secretsProviderConfigurator, + boolean authenticationEnabled) { this.pulsarServiceUrl = pulsarServiceUrl; this.stateStorageServiceUrl = stateStorageServiceUrl; this.authConfig = authConfig; @@ -63,6 +65,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory { this.pythonInstanceFile = pythonInstanceFile; this.extraDependenciesDir = extraDependenciesDir; this.logDirectory = logDirectory; + this.authenticationEnabled = authenticationEnabled; // if things are not specified, try to figure out by env properties if (this.javaInstanceJarFile == null) { @@ -129,7 +132,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory { } // configure auth if necessary - if (instanceConfig.getFunctionAuthenticationSpec() != null) { + if (authenticationEnabled && instanceConfig.getFunctionAuthenticationSpec() != null) { getAuthProvider().configureAuthenticationConfig(authConfig, getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec())); } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java index 8f1172e..5ea75fa 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java @@ -145,7 +145,7 @@ public class KubernetesRuntimeFactoryTest { null, null, minResources, - new TestSecretProviderConfigurator())); + new TestSecretProviderConfigurator(), false)); doNothing().when(factory).setupClient(); return factory; } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java index b1c3dfb..5128ff3 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java @@ -163,7 +163,7 @@ public class KubernetesRuntimeTest { null, null, null, - null, new TestSecretProviderConfigurator())); + null, new TestSecretProviderConfigurator(), false)); doNothing().when(factory).setupClient(); return factory; } diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java index e2c0499..842b0a3 100644 --- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java +++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java @@ -139,7 +139,7 @@ public class ProcessRuntimeTest { pythonInstanceFile, logDirectory, extraDependenciesDir, /* extra dependencies dir */ - new TestSecretsProviderConfigurator()); + new TestSecretsProviderConfigurator(), false); } FunctionDetails createFunctionDetails(FunctionDetails.Runtime runtime) { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java index 175d6e5..7140fdc 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java @@ -154,11 +154,13 @@ public class FunctionActioner { FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder(functionMetaData.getFunctionDetails()); - // check to make sure functionAuthenticationSpec has any data. If not set to null, since for protobuf, + // check to make sure functionAuthenticationSpec has any data and authentication is enabled. + // If not set to null, since for protobuf, // even if the field is not set its not going to be null. Have to use the "has" method to check - Function.FunctionAuthenticationSpec functionAuthenticationSpec - = instance.getFunctionMetaData().hasFunctionAuthSpec() - ? instance.getFunctionMetaData().getFunctionAuthSpec() : null; + Function.FunctionAuthenticationSpec functionAuthenticationSpec = null; + if (workerConfig.isAuthenticationEnabled() && instance.getFunctionMetaData().hasFunctionAuthSpec()) { + functionAuthenticationSpec = instance.getFunctionMetaData().getFunctionAuthSpec(); + } InstanceConfig instanceConfig = createInstanceConfig(functionDetailsBuilder.build(), functionAuthenticationSpec, @@ -283,7 +285,7 @@ public class FunctionActioner { functionRuntimeInfo.getRuntimeSpawner().close(); // cleanup any auth data cached - if (functionRuntimeInfo.getRuntimeSpawner().getInstanceConfig().getFunctionAuthenticationSpec() != null) { + if (workerConfig.isAuthenticationEnabled() && functionRuntimeInfo.getRuntimeSpawner().getInstanceConfig().getFunctionAuthenticationSpec() != null) { try { log.info("{}-{} Cleaning up authentication data for function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId()); functionRuntimeInfo.getRuntimeSpawner() diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java index d55898f..534710d 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java @@ -150,7 +150,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ workerConfig.getProcessContainerFactory().getPythonInstanceLocation(), workerConfig.getProcessContainerFactory().getLogDirectory(), workerConfig.getProcessContainerFactory().getExtraFunctionDependenciesDir(), - secretsProviderConfigurator); + secretsProviderConfigurator, + workerConfig.isAuthenticationEnabled()); } else if (workerConfig.getKubernetesContainerFactory() != null){ this.runtimeFactory = new KubernetesRuntimeFactory( workerConfig.getKubernetesContainerFactory().getK8Uri(), @@ -173,7 +174,8 @@ public class FunctionRuntimeManager implements AutoCloseable{ workerConfig.getKubernetesContainerFactory().getChangeConfigMap(), workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace(), workerConfig.getFunctionInstanceMinResources(), - secretsProviderConfigurator); + secretsProviderConfigurator, + workerConfig.isAuthenticationEnabled()); } else { throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set"); }