[ 
https://issues.apache.org/jira/browse/HADOOP-18709?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17724473#comment-17724473
 ] 

ASF GitHub Bot commented on HADOOP-18709:
-----------------------------------------

szilard-nemeth commented on code in PR #5638:
URL: https://github.com/apache/hadoop/pull/5638#discussion_r1199527654


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java:
##########
@@ -503,4 +644,50 @@ private void setJaasConfiguration(ZKClientConfig 
zkClientConfig) throws IOExcept
       zkClientConfig.setProperty(ZKClientConfig.LOGIN_CONTEXT_NAME_KEY, 
JAAS_CLIENT_ENTRY);
     }
   }
-}
\ No newline at end of file
+
+  /**
+   * Helper class to contain the Truststore/Keystore paths for the ZK client 
connection over
+   * SSL/TLS.
+   */
+  public static class TruststoreKeystore{
+    private static String keystoreLocation;
+    private static String keystorePassword;
+    private static String truststoreLocation;
+    private static String truststorePassword;
+    /** Configuration for the ZooKeeper connection when SSL/TLS is enabled.
+     * When a value is not configured, ensure that empty string is set instead 
of null.
+     * @param conf ZooKeeper Client configuration
+     */
+    public TruststoreKeystore(Configuration conf){
+
+      keystoreLocation =
+          
StringUtils.defaultString(conf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION,

Review Comment:
   Why the StringUtils.defaultString is needed? 
   I mean, conf.get() will return an empty string if the config is not found, 
given that you passed empty strings for all conf.get calls already.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,

Review Comment:
   Extracting these (at least 1, 100, and 10) to static finals would make this 
more readable and straightforward.



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java:
##########
@@ -452,21 +502,50 @@ public static class HadoopZookeeperFactory implements 
ZookeeperFactory {
     private final String zkPrincipal;
     private final String kerberosPrincipal;
     private final String kerberosKeytab;
+    private final Boolean sslEnabled;
 
+    /**
+     * Constructor for the helper class to configure the ZooKeeper client 
connection.
+     * @param zkPrincipal Optional.
+     */
     public HadoopZookeeperFactory(String zkPrincipal) {
       this(zkPrincipal, null, null);
     }
-
+    /**
+     * Constructor for the helper class to configure the ZooKeeper client 
connection.
+     * @param zkPrincipal Optional.
+     * @param kerberosPrincipal Optional. Use along with kerberosKeytab.
+     * @param kerberosKeytab Optional. Use along with kerberosPrincipal.
+     */
     public HadoopZookeeperFactory(String zkPrincipal, String kerberosPrincipal,
         String kerberosKeytab) {
+      this(zkPrincipal, kerberosPrincipal, kerberosKeytab, false,
+          new TruststoreKeystore(new Configuration())); }
+
+    /**
+     * Constructor for the helper class to configure the ZooKeeper client 
connection.
+     * @param zkPrincipal Optional.
+     * @param kerberosPrincipal Optional. Use along with kerberosKeytab.
+     * @param kerberosKeytab Optional. Use along with kerberosPrincipal.
+     * @param sslEnabled Flag to enable SSL/TLS ZK client connection for each 
component
+     *                   independently.
+     * @param truststoreKeystore TruststoreKeystore object containing the 
keystoreLocation,
+     *                           keystorePassword, truststoreLocation, 
truststorePassword for
+     *                           SSL/TLS connection when sslEnabled is set to 
true.
+     */
+    public HadoopZookeeperFactory(String zkPrincipal,
+        String kerberosPrincipal,
+        String kerberosKeytab,
+        boolean sslEnabled,
+        TruststoreKeystore truststoreKeystore) {

Review Comment:
   the parameter called 'truststoreKeystore' is unused.
   Please remove it



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,
+        1,
+        100,
+        10,
+        customConfiguration);
+    this.server = new TestingServer(spec, true);
+    this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, 
this.server.getConnectString());
+    this.curator = new ZKCuratorManager(this.hadoopConf);
+    this.curator.start(new ArrayList<>(), true);
+  }
+
+  public static Configuration setUpSecure() throws Exception {
+    return 
setUpSecure("src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/");
+  }
+  public static Configuration setUpSecure(String testDataPath) throws 
Exception {
+    Configuration conf = new Configuration();
+    System.setProperty("zookeeper.serverCnxnFactory", 
"org.apache.zookeeper.server" +
+        ".NettyServerCnxnFactory");
+
+    System.setProperty("zookeeper.ssl.keyStore.location", testDataPath + 
"keystore.jks");
+    System.setProperty("zookeeper.ssl.keyStore.password", "password");
+    System.setProperty("zookeeper.ssl.trustStore.location", testDataPath + 
"truststore.jks");
+    System.setProperty("zookeeper.ssl.trustStore.password", "password");
+    System.setProperty("zookeeper.request.timeout", "12345");
+
+    System.setProperty("jute.maxbuffer", JUTE_MAXBUFFER.toString());
+
+    System.setProperty("javax.net.debug", "ssl");
+    System.setProperty("zookeeper.authProvider.x509", 
"org.apache.zookeeper.server.auth" +
+        ".X509AuthenticationProvider");
+
+
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION, testDataPath +
+        "keystore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD, "password");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION, testDataPath +
+        "truststore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD, "password");
+    return conf;
+  }
+
+  @After
+  public void teardown() throws Exception {
+    this.curator.close();
+    if (this.server != null) {
+      this.server.close();
+      this.server = null;
+    }
+  }
+
+  @Test
+  public void testSecureZKConfiguration() throws Exception {
+    LOG.info("Entered to the testSecureZKConfiguration test case.");
+    // Validate that HadoopZooKeeperFactory will set ZKConfig with given 
principals
+    ZKCuratorManager.HadoopZookeeperFactory factory =
+        new ZKCuratorManager.HadoopZookeeperFactory(
+            null,
+            null,
+            null,
+            true,
+            new ZKCuratorManager.TruststoreKeystore(hadoopConf));
+    ZooKeeper zk = factory.newZooKeeper(this.server.getConnectString(), 1000, 
null, false);
+    validateSSLConfiguration(
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION),
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD),
+        zk);
+  }
+
+  private void validateSSLConfiguration(
+      String keystoreLocation,
+      String keystorePassword,
+      String truststoreLocation,
+      String truststorePassword,
+      ZooKeeper zk){
+    try (ClientX509Util x509Util = new ClientX509Util()) {
+      //testing if custom values are set properly
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystorePasswdProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststorePasswdProperty()));
+    }
+    //testing if constant values hardcoded into the code are set properly
+    assertEquals("Validate that expected clientConfig is set in ZK config", 
"true",
+        zk.getClientConfig().getProperty(ZKClientConfig.SECURE_CLIENT));
+    assertEquals("Validate that expected clientConfig is set in ZK config",
+        "org.apache.zookeeper.ClientCnxnSocketNetty",
+        
zk.getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET));
+  }
+
+  @Test
+  public void testTruststoreKeystoreConfiguration(){
+    LOG.info("Entered to the testTruststoreKeystoreConfiguration test case.");
+    /**

Review Comment:
   nit: Should be a block comment, now it's a dangling javadoc like this.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,
+        1,
+        100,
+        10,
+        customConfiguration);
+    this.server = new TestingServer(spec, true);
+    this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, 
this.server.getConnectString());
+    this.curator = new ZKCuratorManager(this.hadoopConf);
+    this.curator.start(new ArrayList<>(), true);
+  }
+
+  public static Configuration setUpSecure() throws Exception {
+    return 
setUpSecure("src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/");
+  }
+  public static Configuration setUpSecure(String testDataPath) throws 
Exception {
+    Configuration conf = new Configuration();
+    System.setProperty("zookeeper.serverCnxnFactory", 
"org.apache.zookeeper.server" +
+        ".NettyServerCnxnFactory");
+
+    System.setProperty("zookeeper.ssl.keyStore.location", testDataPath + 
"keystore.jks");
+    System.setProperty("zookeeper.ssl.keyStore.password", "password");
+    System.setProperty("zookeeper.ssl.trustStore.location", testDataPath + 
"truststore.jks");
+    System.setProperty("zookeeper.ssl.trustStore.password", "password");
+    System.setProperty("zookeeper.request.timeout", "12345");
+
+    System.setProperty("jute.maxbuffer", JUTE_MAXBUFFER.toString());
+
+    System.setProperty("javax.net.debug", "ssl");
+    System.setProperty("zookeeper.authProvider.x509", 
"org.apache.zookeeper.server.auth" +
+        ".X509AuthenticationProvider");
+
+
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION, testDataPath +
+        "keystore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD, "password");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION, testDataPath +
+        "truststore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD, "password");
+    return conf;
+  }
+
+  @After
+  public void teardown() throws Exception {
+    this.curator.close();
+    if (this.server != null) {
+      this.server.close();
+      this.server = null;
+    }
+  }
+
+  @Test
+  public void testSecureZKConfiguration() throws Exception {
+    LOG.info("Entered to the testSecureZKConfiguration test case.");
+    // Validate that HadoopZooKeeperFactory will set ZKConfig with given 
principals
+    ZKCuratorManager.HadoopZookeeperFactory factory =
+        new ZKCuratorManager.HadoopZookeeperFactory(
+            null,
+            null,
+            null,
+            true,
+            new ZKCuratorManager.TruststoreKeystore(hadoopConf));
+    ZooKeeper zk = factory.newZooKeeper(this.server.getConnectString(), 1000, 
null, false);
+    validateSSLConfiguration(
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION),
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD),
+        zk);
+  }
+
+  private void validateSSLConfiguration(
+      String keystoreLocation,
+      String keystorePassword,
+      String truststoreLocation,
+      String truststorePassword,
+      ZooKeeper zk){
+    try (ClientX509Util x509Util = new ClientX509Util()) {
+      //testing if custom values are set properly
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystorePasswdProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststorePasswdProperty()));
+    }
+    //testing if constant values hardcoded into the code are set properly
+    assertEquals("Validate that expected clientConfig is set in ZK config", 
"true",
+        zk.getClientConfig().getProperty(ZKClientConfig.SECURE_CLIENT));
+    assertEquals("Validate that expected clientConfig is set in ZK config",
+        "org.apache.zookeeper.ClientCnxnSocketNetty",
+        
zk.getClientConfig().getProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET));
+  }
+
+  @Test
+  public void testTruststoreKeystoreConfiguration(){
+    LOG.info("Entered to the testTruststoreKeystoreConfiguration test case.");
+    /**
+     * By default the truststore/keystore configurations are not set, hence 
the values are null.
+     * Validate that the null values are converted into empty strings by the 
class.
+     */
+    Configuration conf = new Configuration();
+    new ZKCuratorManager.TruststoreKeystore(conf);
+
+    assertEquals("Validate that null value is converted to empty string.", "",
+        ZKCuratorManager.TruststoreKeystore.getKeystoreLocation());
+    assertEquals("Validate that null value is converted to empty string.", "",
+        ZKCuratorManager.TruststoreKeystore.getKeystorePassword());
+    assertEquals("Validate that null value is converted to empty string.", "",
+        ZKCuratorManager.TruststoreKeystore.getTruststoreLocation());
+    assertEquals("Validate that null value is converted to empty string.", "",
+        ZKCuratorManager.TruststoreKeystore.getTruststorePassword());
+
+    //Validate that non-null values will remain intact
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION, 
"/keystore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD, 
"keystorePassword");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION, 
"/truststore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD, 
"truststorePassword");
+    new ZKCuratorManager.TruststoreKeystore(conf);

Review Comment:
   It doesn't make too much sense to invoke a constructor of TruststoreKeystore 
and store all values from the config as static fields.
   Static is not really for this.
   I would just convert all the 4 static fields to instance fields in 
TruststoreKeystore. 
   Funnily I only noticed this weirdness from the usage of this class in here 
in the testcase.
   So basically, what will be changed here is to create a new instance of 
TruststoreKeystore with the conf and use the same getters, but on that object 
rather than in a static fashion like what you have currently.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,
+        1,
+        100,
+        10,
+        customConfiguration);
+    this.server = new TestingServer(spec, true);
+    this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, 
this.server.getConnectString());
+    this.curator = new ZKCuratorManager(this.hadoopConf);
+    this.curator.start(new ArrayList<>(), true);
+  }
+
+  public static Configuration setUpSecure() throws Exception {
+    return 
setUpSecure("src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/");
+  }
+  public static Configuration setUpSecure(String testDataPath) throws 
Exception {
+    Configuration conf = new Configuration();
+    System.setProperty("zookeeper.serverCnxnFactory", 
"org.apache.zookeeper.server" +
+        ".NettyServerCnxnFactory");
+
+    System.setProperty("zookeeper.ssl.keyStore.location", testDataPath + 
"keystore.jks");
+    System.setProperty("zookeeper.ssl.keyStore.password", "password");
+    System.setProperty("zookeeper.ssl.trustStore.location", testDataPath + 
"truststore.jks");
+    System.setProperty("zookeeper.ssl.trustStore.password", "password");
+    System.setProperty("zookeeper.request.timeout", "12345");
+
+    System.setProperty("jute.maxbuffer", JUTE_MAXBUFFER.toString());
+
+    System.setProperty("javax.net.debug", "ssl");
+    System.setProperty("zookeeper.authProvider.x509", 
"org.apache.zookeeper.server.auth" +
+        ".X509AuthenticationProvider");
+
+
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION, testDataPath +
+        "keystore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD, "password");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION, testDataPath +
+        "truststore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD, "password");
+    return conf;
+  }
+
+  @After
+  public void teardown() throws Exception {
+    this.curator.close();
+    if (this.server != null) {
+      this.server.close();
+      this.server = null;
+    }
+  }
+
+  @Test
+  public void testSecureZKConfiguration() throws Exception {
+    LOG.info("Entered to the testSecureZKConfiguration test case.");
+    // Validate that HadoopZooKeeperFactory will set ZKConfig with given 
principals
+    ZKCuratorManager.HadoopZookeeperFactory factory =
+        new ZKCuratorManager.HadoopZookeeperFactory(
+            null,
+            null,
+            null,
+            true,
+            new ZKCuratorManager.TruststoreKeystore(hadoopConf));
+    ZooKeeper zk = factory.newZooKeeper(this.server.getConnectString(), 1000, 
null, false);
+    validateSSLConfiguration(
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION),
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD),
+        zk);
+  }
+
+  private void validateSSLConfiguration(
+      String keystoreLocation,
+      String keystorePassword,
+      String truststoreLocation,
+      String truststorePassword,
+      ZooKeeper zk){
+    try (ClientX509Util x509Util = new ClientX509Util()) {
+      //testing if custom values are set properly
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystorePasswdProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststorePasswdProperty()));
+    }
+    //testing if constant values hardcoded into the code are set properly
+    assertEquals("Validate that expected clientConfig is set in ZK config", 
"true",

Review Comment:
   Nit: "true" --> Boolean.TRUE.toString()



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;

Review Comment:
   could be primitive int.
   Moreover, the name 'defaultValue' does not tell me anything.
   Please add 2 static final ints and name them that resembles quorum port  and 
election port.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,
+        1,
+        100,
+        10,
+        customConfiguration);
+    this.server = new TestingServer(spec, true);
+    this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, 
this.server.getConnectString());
+    this.curator = new ZKCuratorManager(this.hadoopConf);
+    this.curator.start(new ArrayList<>(), true);
+  }
+
+  public static Configuration setUpSecure() throws Exception {

Review Comment:
   Maybe setUpSecureConfig is a better name.



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/curator/ZKCuratorManager.java:
##########
@@ -478,10 +557,72 @@ public ZooKeeper newZooKeeper(String connectString, int 
sessionTimeout,
       if (zkClientConfig.isSaslClientEnabled() && 
!isJaasConfigurationSet(zkClientConfig)) {
         setJaasConfiguration(zkClientConfig);
       }
+      if (sslEnabled) {
+        setSslConfiguration(zkClientConfig);
+      }
       return new ZooKeeper(connectString, sessionTimeout, watcher,
           canBeReadOnly, zkClientConfig);
     }
 
+    /**
+     * Configure ZooKeeper Client with SSL/TLS connection.
+     * @param zkClientConfig ZooKeeper Client configuration
+     * */
+    private void setSslConfiguration(ZKClientConfig zkClientConfig) throws 
ConfigurationException {
+      this.setSslConfiguration(zkClientConfig, new ClientX509Util());
+    }
+
+    private void setSslConfiguration(ZKClientConfig zkClientConfig, 
ClientX509Util x509Util)
+        throws ConfigurationException {
+      validateSslConfiguration(
+          TruststoreKeystore.keystoreLocation,
+          TruststoreKeystore.keystorePassword,
+          TruststoreKeystore.truststoreLocation,
+          TruststoreKeystore.truststorePassword);
+      LOG.info("Configuring the ZooKeeper client to use SSL/TLS encryption for 
connecting to the " +
+          "ZooKeeper server.");
+      LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
+          TruststoreKeystore.keystoreLocation,
+          CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION);
+      LOG.debug("Configuring the ZooKeeper client with {} location: {}.",
+          TruststoreKeystore.truststoreLocation,
+          CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION);
+
+      zkClientConfig.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
+      zkClientConfig.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
+          "org.apache.zookeeper.ClientCnxnSocketNetty");
+      zkClientConfig.setProperty(x509Util.getSslKeystoreLocationProperty(),
+          TruststoreKeystore.keystoreLocation);
+      zkClientConfig.setProperty(x509Util.getSslKeystorePasswdProperty(),
+          TruststoreKeystore.keystorePassword);
+      zkClientConfig.setProperty(x509Util.getSslTruststoreLocationProperty(),
+          TruststoreKeystore.truststoreLocation);
+      zkClientConfig.setProperty(x509Util.getSslTruststorePasswdProperty(),
+          TruststoreKeystore.truststorePassword);
+    }
+
+    private void validateSslConfiguration(String keystoreLocation, String 
keystorePassword,

Review Comment:
   All parameters are unused in this method. Please remove them.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,
+        1,
+        100,
+        10,
+        customConfiguration);
+    this.server = new TestingServer(spec, true);
+    this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, 
this.server.getConnectString());
+    this.curator = new ZKCuratorManager(this.hadoopConf);
+    this.curator.start(new ArrayList<>(), true);
+  }
+
+  public static Configuration setUpSecure() throws Exception {
+    return 
setUpSecure("src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/");
+  }
+  public static Configuration setUpSecure(String testDataPath) throws 
Exception {
+    Configuration conf = new Configuration();
+    System.setProperty("zookeeper.serverCnxnFactory", 
"org.apache.zookeeper.server" +

Review Comment:
   Nit: Getting the name of the class from the Class object of 
NettyServerCnxnFactory would be cleaner.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,
+        1,
+        100,
+        10,
+        customConfiguration);
+    this.server = new TestingServer(spec, true);
+    this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, 
this.server.getConnectString());
+    this.curator = new ZKCuratorManager(this.hadoopConf);
+    this.curator.start(new ArrayList<>(), true);
+  }
+
+  public static Configuration setUpSecure() throws Exception {
+    return 
setUpSecure("src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/");
+  }
+  public static Configuration setUpSecure(String testDataPath) throws 
Exception {
+    Configuration conf = new Configuration();
+    System.setProperty("zookeeper.serverCnxnFactory", 
"org.apache.zookeeper.server" +
+        ".NettyServerCnxnFactory");
+
+    System.setProperty("zookeeper.ssl.keyStore.location", testDataPath + 
"keystore.jks");
+    System.setProperty("zookeeper.ssl.keyStore.password", "password");
+    System.setProperty("zookeeper.ssl.trustStore.location", testDataPath + 
"truststore.jks");
+    System.setProperty("zookeeper.ssl.trustStore.password", "password");
+    System.setProperty("zookeeper.request.timeout", "12345");
+
+    System.setProperty("jute.maxbuffer", JUTE_MAXBUFFER.toString());
+
+    System.setProperty("javax.net.debug", "ssl");
+    System.setProperty("zookeeper.authProvider.x509", 
"org.apache.zookeeper.server.auth" +
+        ".X509AuthenticationProvider");
+
+
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION, testDataPath +
+        "keystore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD, "password");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION, testDataPath +
+        "truststore.jks");
+    conf.set(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD, "password");
+    return conf;
+  }
+
+  @After
+  public void teardown() throws Exception {
+    this.curator.close();
+    if (this.server != null) {
+      this.server.close();
+      this.server = null;
+    }
+  }
+
+  @Test
+  public void testSecureZKConfiguration() throws Exception {
+    LOG.info("Entered to the testSecureZKConfiguration test case.");
+    // Validate that HadoopZooKeeperFactory will set ZKConfig with given 
principals
+    ZKCuratorManager.HadoopZookeeperFactory factory =
+        new ZKCuratorManager.HadoopZookeeperFactory(
+            null,
+            null,
+            null,
+            true,
+            new ZKCuratorManager.TruststoreKeystore(hadoopConf));
+    ZooKeeper zk = factory.newZooKeeper(this.server.getConnectString(), 1000, 
null, false);
+    validateSSLConfiguration(
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_LOCATION),
+        this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_KEYSTORE_PASSWORD),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_LOCATION),
+        
this.hadoopConf.get(CommonConfigurationKeys.ZK_SSL_TRUSTSTORE_PASSWORD),
+        zk);
+  }
+
+  private void validateSSLConfiguration(
+      String keystoreLocation,
+      String keystorePassword,
+      String truststoreLocation,
+      String truststorePassword,
+      ZooKeeper zk){
+    try (ClientX509Util x509Util = new ClientX509Util()) {
+      //testing if custom values are set properly
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
keystorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslKeystorePasswdProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststoreLocation,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststoreLocationProperty()));
+      assertEquals("Validate that expected clientConfig is set in ZK config", 
truststorePassword,
+          
zk.getClientConfig().getProperty(x509Util.getSslTruststorePasswdProperty()));
+    }
+    //testing if constant values hardcoded into the code are set properly
+    assertEquals("Validate that expected clientConfig is set in ZK config", 
"true",
+        zk.getClientConfig().getProperty(ZKClientConfig.SECURE_CLIENT));
+    assertEquals("Validate that expected clientConfig is set in ZK config",
+        "org.apache.zookeeper.ClientCnxnSocketNetty",

Review Comment:
   Nit: Getting the name from the Class object will be cleaner.



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/curator/TestSecureZKCuratorManager.java:
##########
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util.curator;
+
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.hadoop.fs.FileContext.LOG;
+import static org.junit.Assert.assertEquals;
+
+
+/**
+ * Test the manager for ZooKeeper Curator when SSL/TLS is enabled for the ZK 
server-client
+ * connection.
+ */
+public class TestSecureZKCuratorManager {
+
+  private TestingServer server;
+  private ZKCuratorManager curator;
+  private Configuration hadoopConf;
+  static final Integer SECURE_CLIENT_PORT = 2281;
+  static final Integer JUTE_MAXBUFFER = 400000000;
+  static final File ZK_DATA_DIR = new File("testZkSSLClientConnectionDataDir");
+
+  @Before
+  public void setup() throws Exception {
+    Integer defaultValue = -1;
+    Map<String, Object> customConfiguration = new HashMap<>();
+    customConfiguration.put("secureClientPort", SECURE_CLIENT_PORT.toString());
+    customConfiguration.put("audit.enable", true);
+    this.hadoopConf = setUpSecure();
+    InstanceSpec spec = new InstanceSpec(ZK_DATA_DIR, SECURE_CLIENT_PORT,
+        defaultValue,
+        defaultValue,
+        true,
+        1,
+        100,
+        10,
+        customConfiguration);
+    this.server = new TestingServer(spec, true);
+    this.hadoopConf.set(CommonConfigurationKeys.ZK_ADDRESS, 
this.server.getConnectString());
+    this.curator = new ZKCuratorManager(this.hadoopConf);
+    this.curator.start(new ArrayList<>(), true);
+  }
+
+  public static Configuration setUpSecure() throws Exception {
+    return 
setUpSecure("src/test/java/org/apache/hadoop/util/curator/resources/data/ssl/");
+  }
+  public static Configuration setUpSecure(String testDataPath) throws 
Exception {

Review Comment:
   This method does not trow any Exception according to IntelliJ





> Add curator based ZooKeeper communication support over SSL/TLS into the 
> common library
> --------------------------------------------------------------------------------------
>
>                 Key: HADOOP-18709
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18709
>             Project: Hadoop Common
>          Issue Type: Improvement
>            Reporter: Ferenc Erdelyi
>            Assignee: Ferenc Erdelyi
>            Priority: Major
>              Labels: pull-request-available
>
> With HADOOP-16579 the ZooKeeper client is capable of securing communication 
> with SSL. 
> To follow the convention introduced in HADOOP-14741, proposing to add to the 
> core-default.xml the following configurations, as the groundwork for the 
> components to enable encrypted communication between the individual 
> components and ZooKeeper:
>  * hadoop.zk.ssl.keystore.location
>  * hadoop.zk.ssl.keystore.password
>  * hadoop.zk.ssl.truststore.location
>  * hadoop.zk.ssl.truststore.password
> These parameters along with the component-specific ssl.client.enable option 
> (e.g. yarn.zookeeper.ssl.client.enable) should be passed to the 
> ZKCuratorManager to build the CuratorFramework. The ZKCuratorManager needs a 
> new overloaded start() method to build the encrypted communication.
>  * The secured ZK Client uses Netty, hence the dependency is included in the 
> pom.xml. Added netty-handler and netty-transport-native-epoll dependency to 
> the pom.xml based on ZOOKEEPER-3494 - "No need to depend on netty-all (SSL)".
>  * The change was exclusively tested with the unit test, which is a kind of 
> integration test, as a ZK Server was brought up and the communication tested 
> between the client and the server.
>  * This code change is in the common code base and there is no component 
> calling it yet. Once YARN-11468 - "Zookeeper SSL/TLS support" is implemented, 
> we can test it in a real cluster environment.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to