[FLINK-6117] [security] Make setting of 'zookeeper.sasl.disable' work correctly

This closes #3600


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eef85e09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eef85e09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eef85e09

Branch: refs/heads/master
Commit: eef85e095a8a0e4c4553631b74ba7b9f173cebf0
Parents: daf4038
Author: zcb <[email protected]>
Authored: Thu Mar 23 03:44:10 2017 +0800
Committer: Stephan Ewen <[email protected]>
Committed: Fri Apr 21 10:37:44 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/configuration/SecurityOptions.java   |  4 ++++
 .../org/apache/flink/runtime/security/SecurityUtils.java  |  7 +++++++
 .../flink/runtime/security/modules/ZooKeeperModule.java   | 10 ++++++++++
 .../org/apache/flink/test/util/SecureTestEnvironment.java |  1 +
 4 files changed, 22 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eef85e09/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
index 95cf0c7..3763198 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java
@@ -55,6 +55,10 @@ public class SecurityOptions {
        //  ZooKeeper Security Options
        // 
------------------------------------------------------------------------
 
+       public static final ConfigOption<Boolean> ZOOKEEPER_SASL_DISABLE =
+               key("zookeeper.sasl.disable")
+                       .defaultValue(false);
+
        public static final ConfigOption<String> ZOOKEEPER_SASL_SERVICE_NAME =
                key("zookeeper.sasl.service-name")
                        .defaultValue("zookeeper");

http://git-wip-us.apache.org/repos/asf/flink/blob/eef85e09/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
index d76e7a5..7a09c32 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/SecurityUtils.java
@@ -125,6 +125,8 @@ public class SecurityUtils {
 
                private final org.apache.hadoop.conf.Configuration hadoopConf;
 
+               private final boolean isZkSaslDisable;
+
                private final boolean useTicketCache;
 
                private final String keytab;
@@ -164,6 +166,7 @@ public class SecurityUtils {
                                org.apache.hadoop.conf.Configuration hadoopConf,
                                List<? extends Class<? extends SecurityModule>> 
securityModules) {
                        this.hadoopConf = checkNotNull(hadoopConf);
+                       this.isZkSaslDisable = 
flinkConf.getBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE);
                        this.keytab = 
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
                        this.principal = 
flinkConf.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
                        this.useTicketCache = 
flinkConf.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);
@@ -175,6 +178,10 @@ public class SecurityUtils {
                        validate();
                }
 
+               public boolean isZkSaslDisable() {
+                       return isZkSaslDisable;
+               }
+
                public String getKeytab() {
                        return keytab;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/eef85e09/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
index c0ba4a5..216bdde 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/ZooKeeperModule.java
@@ -41,6 +41,8 @@ public class ZooKeeperModule implements SecurityModule {
         */
        private static final String ZK_LOGIN_CONTEXT_NAME = 
"zookeeper.sasl.clientconfig";
 
+       private String priorSaslEnable;
+
        private String priorServiceName;
 
        private String priorLoginContextName;
@@ -48,6 +50,9 @@ public class ZooKeeperModule implements SecurityModule {
        @Override
        public void install(SecurityUtils.SecurityConfiguration configuration) 
throws SecurityInstallException {
 
+               priorSaslEnable = System.getProperty(ZK_ENABLE_CLIENT_SASL, 
null);
+               System.setProperty(ZK_ENABLE_CLIENT_SASL, 
String.valueOf(!configuration.isZkSaslDisable()));
+
                priorServiceName = System.getProperty(ZK_SASL_CLIENT_USERNAME, 
null);
                if 
(!"zookeeper".equals(configuration.getZooKeeperServiceName())) {
                        System.setProperty(ZK_SASL_CLIENT_USERNAME, 
configuration.getZooKeeperServiceName());
@@ -61,6 +66,11 @@ public class ZooKeeperModule implements SecurityModule {
 
        @Override
        public void uninstall() throws SecurityInstallException {
+               if(priorSaslEnable != null) {
+                       System.setProperty(ZK_ENABLE_CLIENT_SASL, 
priorSaslEnable);
+               } else {
+                       System.clearProperty(ZK_ENABLE_CLIENT_SASL);
+               }
                if(priorServiceName != null) {
                        System.setProperty(ZK_SASL_CLIENT_USERNAME, 
priorServiceName);
                } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/eef85e09/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
index febd074..98deee6 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/SecureTestEnvironment.java
@@ -138,6 +138,7 @@ public class SecureTestEnvironment {
                        //ctx.setHadoopConfiguration() for the UGI 
implementation to work properly.
                        //See Yarn test case module for reference
                        Configuration flinkConfig = 
GlobalConfiguration.loadConfiguration();
+                       
flinkConfig.setBoolean(SecurityOptions.ZOOKEEPER_SASL_DISABLE, false);
                        
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_KEYTAB, testKeytab);
                        
flinkConfig.setBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE, false);
                        
flinkConfig.setString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL, testPrincipal);

Reply via email to