This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a2db62  Fix: Exception when switch cluster from auth enabled to auth 
disabled (#4069)
6a2db62 is described below

commit 6a2db62f20853a4441d7d6c79a20524e6b8b6342
Author: Boyang Jerry Peng <jerry.boyang.p...@gmail.com>
AuthorDate: Wed Apr 17 23:11:38 2019 -0700

    Fix: Exception when switch cluster from auth enabled to auth disabled 
(#4069)
    
    * fix bug in source and sink cli update
    
    * fix import
    
    * fix logic
    
    * fix bug in auth when spec is not null but auth is turned off
    
    * clean up debug
    
    * cleanup
---
 .../main/java/org/apache/pulsar/PulsarStandaloneStarter.java |  8 +++++++-
 .../apache/pulsar/functions/runtime/KubernetesRuntime.java   |  9 ++++++---
 .../pulsar/functions/runtime/KubernetesRuntimeFactory.java   | 10 +++++++---
 .../org/apache/pulsar/functions/runtime/LocalRunner.java     |  2 +-
 .../pulsar/functions/runtime/ProcessRuntimeFactory.java      |  7 +++++--
 .../functions/runtime/KubernetesRuntimeFactoryTest.java      |  2 +-
 .../pulsar/functions/runtime/KubernetesRuntimeTest.java      |  2 +-
 .../apache/pulsar/functions/runtime/ProcessRuntimeTest.java  |  2 +-
 .../org/apache/pulsar/functions/worker/FunctionActioner.java | 12 +++++++-----
 .../pulsar/functions/worker/FunctionRuntimeManager.java      |  6 ++++--
 10 files changed, 40 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index c9aefa1..feef094 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -116,6 +116,12 @@ public class PulsarStandaloneStarter extends 
PulsarStandalone {
     public static void main(String args[]) throws Exception {
         // Start standalone
         PulsarStandaloneStarter standalone = new PulsarStandaloneStarter(args);
-        standalone.start();
+        try {
+            standalone.start();
+        } catch (Throwable th) {
+            log.error("Failed to start pulsar service.", th);
+            Runtime.getRuntime().exit(1);
+        }
+
     }
 }
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 6dbb0d3..c3fe161 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -116,6 +116,7 @@ public class KubernetesRuntime implements Runtime {
             )
     );
     private static final long GRPC_TIMEOUT_SECS = 5;
+    private final boolean authenticationEnabled;
 
     // The thread that invokes the function
     @Getter
@@ -160,7 +161,8 @@ public class KubernetesRuntime implements Runtime {
                       SecretsProviderConfigurator secretsProviderConfigurator,
                       Integer expectedMetricsCollectionInterval,
                       int percentMemoryPadding,
-                      KubernetesFunctionAuthProvider 
functionAuthDataCacheProvider) throws Exception {
+                      KubernetesFunctionAuthProvider 
functionAuthDataCacheProvider,
+                      boolean authenticationEnabled) throws Exception {
         this.appsClient = appsClient;
         this.coreClient = coreClient;
         this.instanceConfig = instanceConfig;
@@ -174,6 +176,7 @@ public class KubernetesRuntime implements Runtime {
         this.pulsarAdminUrl = pulsarAdminUrl;
         this.secretsProviderConfigurator = secretsProviderConfigurator;
         this.percentMemoryPadding = percentMemoryPadding;
+        this.authenticationEnabled = authenticationEnabled;
         String logConfigFile = null;
         String secretsProviderClassName = 
secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
         String secretsProviderConfig = null;
@@ -439,7 +442,7 @@ public class KubernetesRuntime implements Runtime {
     private void submitStatefulSet() throws Exception {
         final V1StatefulSet statefulSet = createStatefulSet();
         // Configure function authentication if needed
-        if (instanceConfig.getFunctionAuthenticationSpec() != null) {
+        if (authenticationEnabled && 
instanceConfig.getFunctionAuthenticationSpec() != null) {
             functionAuthDataCacheProvider.configureAuthDataStatefulSet(
                     statefulSet, 
getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec()));
         }
@@ -755,7 +758,7 @@ public class KubernetesRuntime implements Runtime {
     private List<String> getDownloadCommand(String bkPath, String 
userCodeFilePath) {
 
         // add auth plugin and parameters if necessary
-        if (authConfig != null) {
+        if (authenticationEnabled && authConfig != null) {
             if (isNotBlank(authConfig.getClientAuthenticationPlugin())
                     && 
isNotBlank(authConfig.getClientAuthenticationParameters())) {
                 return Arrays.asList(
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index 7fa67ce..3aed50e 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -92,6 +92,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
     private AppsV1Api appsClient;
     private CoreV1Api coreClient;
     private Resources functionInstanceMinResources;
+    private final boolean authenticationEnabled;
 
     @VisibleForTesting
     public KubernetesRuntimeFactory(String k8Uri,
@@ -114,7 +115,8 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
                                     String changeConfigMap,
                                     String changeConfigMapNamespace,
                                     Resources functionInstanceMinResources,
-                                    SecretsProviderConfigurator 
secretsProviderConfigurator) {
+                                    SecretsProviderConfigurator 
secretsProviderConfigurator,
+                                    boolean authenticationEnabled) {
         this.kubernetesInfo = new KubernetesInfo();
         this.kubernetesInfo.setK8Uri(k8Uri);
         if (!isEmpty(jobNamespace)) {
@@ -165,6 +167,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
         this.expectedMetricsCollectionInterval = 
expectedMetricsCollectionInterval == null ? -1 : 
expectedMetricsCollectionInterval;
         this.secretsProviderConfigurator = secretsProviderConfigurator;
         this.functionInstanceMinResources = functionInstanceMinResources;
+        this.authenticationEnabled = authenticationEnabled;
         try {
             setupClient();
         } catch (Exception e) {
@@ -195,7 +198,7 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
         }
 
         // adjust the auth config to support auth
-        if (instanceConfig.getFunctionAuthenticationSpec() != null) {
+        if (authenticationEnabled && 
instanceConfig.getFunctionAuthenticationSpec() != null) {
             getAuthProvider().configureAuthenticationConfig(authConfig, 
getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec()));
         }
 
@@ -223,7 +226,8 @@ public class KubernetesRuntimeFactory implements 
RuntimeFactory {
             secretsProviderConfigurator,
             expectedMetricsCollectionInterval,
             this.kubernetesInfo.getPercentMemoryPadding(),
-            getAuthProvider());
+            getAuthProvider(),
+            authenticationEnabled);
     }
 
     @Override
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
index 966f299..37efbee 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -179,7 +179,7 @@ public class LocalRunner {
             null, /* python instance file */
             null, /* log directory */
             null, /* extra dependencies dir */
-            new DefaultSecretsProviderConfigurator())) {
+            new DefaultSecretsProviderConfigurator(), false)) {
             List<RuntimeSpawner> spawners = new LinkedList<>();
             for (int i = 0; i < parallelism; ++i) {
                 InstanceConfig instanceConfig = new InstanceConfig();
diff --git 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 7090967..6976df5 100644
--- 
a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ 
b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -39,6 +39,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
 
     private final String pulsarServiceUrl;
     private final String stateStorageServiceUrl;
+    private final boolean authenticationEnabled;
     private AuthenticationConfig authConfig;
     private SecretsProviderConfigurator secretsProviderConfigurator;
     private String javaInstanceJarFile;
@@ -54,7 +55,8 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
                                  String pythonInstanceFile,
                                  String logDirectory,
                                  String extraDependenciesDir,
-                                 SecretsProviderConfigurator 
secretsProviderConfigurator) {
+                                 SecretsProviderConfigurator 
secretsProviderConfigurator,
+                                 boolean authenticationEnabled) {
         this.pulsarServiceUrl = pulsarServiceUrl;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.authConfig = authConfig;
@@ -63,6 +65,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
         this.pythonInstanceFile = pythonInstanceFile;
         this.extraDependenciesDir = extraDependenciesDir;
         this.logDirectory = logDirectory;
+        this.authenticationEnabled = authenticationEnabled;
 
         // if things are not specified, try to figure out by env properties
         if (this.javaInstanceJarFile == null) {
@@ -129,7 +132,7 @@ public class ProcessRuntimeFactory implements 
RuntimeFactory {
         }
 
         // configure auth if necessary
-        if (instanceConfig.getFunctionAuthenticationSpec() != null) {
+        if (authenticationEnabled && 
instanceConfig.getFunctionAuthenticationSpec() != null) {
             getAuthProvider().configureAuthenticationConfig(authConfig, 
getFunctionAuthData(instanceConfig.getFunctionAuthenticationSpec()));
         }
 
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
index 8f1172e..5ea75fa 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
@@ -145,7 +145,7 @@ public class KubernetesRuntimeFactoryTest {
             null,
             null,
                 minResources,
-                new TestSecretProviderConfigurator()));
+                new TestSecretProviderConfigurator(), false));
         doNothing().when(factory).setupClient();
         return factory;
     }
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index b1c3dfb..5128ff3 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -163,7 +163,7 @@ public class KubernetesRuntimeTest {
             null,
             null,
             null,
-                null, new TestSecretProviderConfigurator()));
+                null, new TestSecretProviderConfigurator(), false));
         doNothing().when(factory).setupClient();
         return factory;
     }
diff --git 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index e2c0499..842b0a3 100644
--- 
a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ 
b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -139,7 +139,7 @@ public class ProcessRuntimeTest {
             pythonInstanceFile,
             logDirectory,
             extraDependenciesDir, /* extra dependencies dir */
-            new TestSecretsProviderConfigurator());
+            new TestSecretsProviderConfigurator(), false);
     }
 
     FunctionDetails createFunctionDetails(FunctionDetails.Runtime runtime) {
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
index 175d6e5..7140fdc 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionActioner.java
@@ -154,11 +154,13 @@ public class FunctionActioner {
 
         FunctionDetails.Builder functionDetailsBuilder = 
FunctionDetails.newBuilder(functionMetaData.getFunctionDetails());
 
-        // check to make sure functionAuthenticationSpec has any data. If not 
set to null, since for protobuf,
+        // check to make sure functionAuthenticationSpec has any data and 
authentication is enabled.
+        // If not set to null, since for protobuf,
         // even if the field is not set its not going to be null. Have to use 
the "has" method to check
-        Function.FunctionAuthenticationSpec functionAuthenticationSpec
-                = instance.getFunctionMetaData().hasFunctionAuthSpec()
-                ? instance.getFunctionMetaData().getFunctionAuthSpec() : null;
+        Function.FunctionAuthenticationSpec functionAuthenticationSpec = null;
+        if (workerConfig.isAuthenticationEnabled() && 
instance.getFunctionMetaData().hasFunctionAuthSpec()) {
+            functionAuthenticationSpec = 
instance.getFunctionMetaData().getFunctionAuthSpec();
+        }
 
         InstanceConfig instanceConfig = 
createInstanceConfig(functionDetailsBuilder.build(),
                 functionAuthenticationSpec,
@@ -283,7 +285,7 @@ public class FunctionActioner {
             functionRuntimeInfo.getRuntimeSpawner().close();
 
             // cleanup any auth data cached
-            if 
(functionRuntimeInfo.getRuntimeSpawner().getInstanceConfig().getFunctionAuthenticationSpec()
 != null) {
+            if (workerConfig.isAuthenticationEnabled() && 
functionRuntimeInfo.getRuntimeSpawner().getInstanceConfig().getFunctionAuthenticationSpec()
 != null) {
                 try {
                     log.info("{}-{} Cleaning up authentication data for 
function...", fqfn,functionRuntimeInfo.getFunctionInstance().getInstanceId());
                     functionRuntimeInfo.getRuntimeSpawner()
diff --git 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index d55898f..534710d 100644
--- 
a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ 
b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -150,7 +150,8 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                     
workerConfig.getProcessContainerFactory().getPythonInstanceLocation(),
                     
workerConfig.getProcessContainerFactory().getLogDirectory(),
                     
workerConfig.getProcessContainerFactory().getExtraFunctionDependenciesDir(),
-                    secretsProviderConfigurator);
+                    secretsProviderConfigurator,
+                    workerConfig.isAuthenticationEnabled());
         } else if (workerConfig.getKubernetesContainerFactory() != null){
             this.runtimeFactory = new KubernetesRuntimeFactory(
                     workerConfig.getKubernetesContainerFactory().getK8Uri(),
@@ -173,7 +174,8 @@ public class FunctionRuntimeManager implements 
AutoCloseable{
                     
workerConfig.getKubernetesContainerFactory().getChangeConfigMap(),
                     
workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace(),
                     workerConfig.getFunctionInstanceMinResources(),
-                    secretsProviderConfigurator);
+                    secretsProviderConfigurator,
+                    workerConfig.isAuthenticationEnabled());
         } else {
             throw new RuntimeException("Either Thread, Process or Kubernetes 
Container Factory need to be set");
         }

Reply via email to